Source code for main

"""Main entry point for the TEG Gateway.

This module implements the primary runtime loop of the Edge Gateway. It is
responsible for initializing all subsystems, establishing communication with
ThingsBoard, and coordinating message flow between the controller, local
persistence layers, and remote services.

Responsibilities
----------------
- Parse command-line arguments and perform self-provisioning if required.
- Initialize local SQLite databases used for buffering and archiving.
- Start and supervise the MQTT client connection to ThingsBoard.
- Dispatch incoming MQTT messages to RPC, OTA, and remote file management handlers.
- Persist and forward controller telemetry and log messages.
- Supervise the controller container and trigger restarts if required.
- Publish auxiliary health and timing telemetry.

Notes
-----
- The main loop is intentionally single-threaded for deterministic behavior.
- Background daemon threads are used only for MQTT I/O and file change detection.
- Fatal errors result in a graceful shutdown followed by forced termination if
  necessary.
"""

import json
import os
import signal
import sys
import threading
from logging import error
from time import sleep, time_ns
from typing import Any, Optional

from db_schemas.controller_archive_table import *
from db_schemas.controller_messages_table import *
from db_schemas.pending_messages_table import *
from modules.file_writer import GatewayFileWriter
from modules.logging import info, warn, debug
import utils.paths
import utils.misc
from args import parse_args
from modules import sqlite
from modules.docker_client import GatewayDockerClient
from modules.git_client import GatewayGitClient
from modules.mqtt import GatewayMqttClient
from on_mqtt_msg.check_for_file_content_update import on_msg_check_for_file_content_update
from on_mqtt_msg.check_for_file_hashes_update import on_msg_check_for_file_hashes_update, FILE_HASHES_TB_KEY
from on_mqtt_msg.check_for_files_definition_update import on_msg_check_for_files_definition_update
from on_mqtt_msg.check_for_ota_updates import on_msg_check_for_ota_update
from on_mqtt_msg.on_rpc_request import on_rpc_request
from self_provisioning import self_provisioning_get_access_token
from utils.controller_restart import restart_controller_if_needed
from utils.misc import get_maybe

global_mqtt_client: Optional[GatewayMqttClient] = None
archive_sqlite_db: Optional[sqlite.SqliteConnection] = None
communication_sqlite_db: Optional[sqlite.SqliteConnection] = None
gateway_logs_buffer_db: Optional[sqlite.SqliteConnection] = None
STOP_MAINLOOP = False
AUX_DATA_PUBLISH_INTERVAL_MS = 20_000 # every 20 seconds
aux_data_publish_ts = None


# Set up signal handling for safe shutdown
[docs] def shutdown_handler(sig: Any, _frame: Any) -> None: """Handle graceful shutdown of the Edge Gateway. This handler is invoked on SIGINT and SIGTERM. It attempts to shut down all subsystems cleanly, including MQTT connections and SQLite databases, before terminating the process. Args: sig: Received signal number. _frame: Current stack frame (unused). """ global STOP_MAINLOOP # Set a timer to force exit if a graceful shutdown fails signal.setitimer(signal.ITIMER_REAL, 20) info("GRACEFUL SHUTDOWN") STOP_MAINLOOP = True if global_mqtt_client is not None: global_mqtt_client.graceful_exit() if archive_sqlite_db is not None: archive_sqlite_db.close() if communication_sqlite_db is not None: communication_sqlite_db.close() if gateway_logs_buffer_db is not None: gateway_logs_buffer_db.close() sys.stdout.flush() sys.exit(sig)
# Set up signal handling for forced shutdown in case graceful shutdown fails
[docs] def forced_shutdown_handler(_sig: Any, _frame: Any) -> None: """Handle forced shutdown if graceful shutdown fails. This handler is triggered by a SIGALRM when the graceful shutdown timeout is exceeded and terminates the process immediately. """ warn("FORCEFUL SHUTDOWN") sys.stdout.flush() os._exit(1)
[docs] def get_last_controller_health_check_ts() -> int: """Return the timestamp of the last controller health check. Returns: Unix timestamp in milliseconds of the last recorded health check, or ``0`` if no health check has been recorded. """ if communication_sqlite_db is None: return 0 if communication_sqlite_db.do_table_values_exist(sqlite.SqliteTables.HEALTH_CHECK.value): last_controller_health_check_ts_result = communication_sqlite_db.execute( "SELECT timestamp_ms FROM health_check WHERE id = 1") if len(last_controller_health_check_ts_result) > 0: return last_controller_health_check_ts_result[0][0] return 0
signal.signal(signal.SIGALRM, forced_shutdown_handler) signal.signal(signal.SIGINT, shutdown_handler) signal.signal(signal.SIGTERM, shutdown_handler) try: if __name__ == '__main__': # --- Startup and initialization --- # setup docker_client: GatewayDockerClient = GatewayDockerClient() git_client: GatewayGitClient = GatewayGitClient() args = parse_args() debug(f"Args: {args}") provisioned, access_token = self_provisioning_get_access_token(args) # initialize sqlite database connections archive_sqlite_db = sqlite.SqliteConnection(utils.paths.GATEWAY_ARCHIVE_DB_PATH) communication_sqlite_db = sqlite.SqliteConnection(utils.paths.COMMUNICATION_QUEUE_DB_PATH) gateway_logs_buffer_db = sqlite.SqliteConnection(utils.paths.GATEWAY_LOGS_BUFFER_DB_PATH) archive_sqlite_db.execute(CREATE_CONTROLLER_ARCHIVE_TABLE_QUERY) archive_sqlite_db.execute(CREATE_CONTROLLER_ARCHIVE_INDEX_QUERY) communication_sqlite_db.execute(CREATE_CONTROLLER_MESSAGES_TABLE_QUERY) communication_sqlite_db.execute(CREATE_PENDING_MESSAGES_TABLE_QUERY) # --- MQTT client startup --- # create and run the mqtt client in a separate thread mqtt_client = GatewayMqttClient().init(access_token) try: mqtt_client.connect(args.tb_host, args.tb_port) except Exception as e: error(f"Failed to connect to ThingsBoard: {e}") mqtt_client_thread: threading.Thread = threading.Thread( target=lambda: mqtt_client.loop_forever()) mqtt_client_thread.start() global_mqtt_client = mqtt_client sleep(5) info("Gateway started successfully") sleep(10) if provisioned: info("Gateway is provisioned for first time, initializing attributes...") GatewayMqttClient().publish_message_raw("v1/devices/me/attributes", json.dumps({ FILE_HASHES_TB_KEY: {}})) # --- Background file change detection thread --- # daemon thread for updating file content client attributes every 30 seconds def file_update_check_daemon(): """Daemon thread to check for file updates every 30 seconds.""" while True: sleep(30) try: debug("Checking for file changes...") file_definitions = GatewayFileWriter().get_files() for file_id in file_definitions: if GatewayFileWriter().did_file_change(get_maybe(file_definitions, file_id, "path")): info(f"File {file_definitions[file_id]} changed on disk - requesting update") GatewayMqttClient().request_attributes({"clientKeys": FILE_HASHES_TB_KEY}) except Exception as ex: warn(f"Error checking for file changes: {ex}") file_update_thread = threading.Thread(target=file_update_check_daemon, daemon=True) file_update_thread.start() # --- Main event loop --- info("Entering main loop...") # *** main loop *** while not STOP_MAINLOOP: # check if there are any new incoming mqtt messages in the queue, process them if not mqtt_client.message_queue.empty(): msg = mqtt_client.message_queue.get() topic = get_maybe(msg, "topic") or "unknown" msg_payload = utils.misc.get_maybe(msg, "payload") # check for incoming RPC requests if "v1/devices/me/rpc/request" in topic: rpc_method = get_maybe(msg_payload, "method") rpc_params = get_maybe(msg_payload, "params") rpc_msg_id = topic.split("/")[-1] on_rpc_request(rpc_msg_id, rpc_method, rpc_params) # check for attribute updates elif "v1/devices/me/attributes" in topic: if not any([ on_msg_check_for_ota_update(msg_payload), on_msg_check_for_files_definition_update(msg_payload), on_msg_check_for_file_hashes_update(msg_payload), on_msg_check_for_file_content_update(msg_payload), ]): warn("[MAIN] Got invalid message: " + str(msg)) warn("[MAIN] Skipping invalid message...") continue # process the next message # automatically restart the controller's docker container if it is not running if restart_controller_if_needed(): continue if not mqtt_client_thread.is_alive() or not mqtt_client.is_connected(): if not mqtt_client.is_connected(): warn("MQTT client not connected, exiting in 30 seconds...") else: warn("MQTT client thread died, exiting in 30 seconds...") sleep(30) utils.misc.fatal_error("MQTT client thread died") # check if there is any buffered outgoing log message in the buffer sqlite db if gateway_logs_buffer_db.do_table_values_exist("log_buffer"): # fetch the next message (lowest `id`) from the queue and send it message = gateway_logs_buffer_db.execute( f"SELECT id, log_level, message, timestamp_ms FROM {"log_buffer"} ORDER BY id LIMIT 1") if len(message) > 0: debug('Sending buffered log message: ' + str(message[0])) if not mqtt_client.publish_log(message[0][1], message[0][2], message[0][3]): continue gateway_logs_buffer_db.execute(f"DELETE FROM {"log_buffer"} WHERE id = {message[0][0]}") continue if communication_sqlite_db.do_table_values_exist(sqlite.SqliteTables.CONTROLLER_MESSAGES.value): # fetch the next message (lowest `id`) from the queue and process it message = communication_sqlite_db.execute( f"SELECT id, type, message FROM {sqlite.SqliteTables.CONTROLLER_MESSAGES.value} ORDER BY id LIMIT 1" ) if len(message) > 0: message_type = message[0][1] message_obj = json.loads(message[0][2]) message_timestamp_ms = message_obj["ts"] message_values = message_obj["values"] # archive controller messages in the archive sqlite db, except for log messages if not "log" in message_type: archive_sqlite_db.execute( "INSERT INTO controller_archive (timestamp_ms, message) VALUES (?, ?)", (message_timestamp_ms, json.dumps(message_values))) # add message to sqlite table containing pending outgoing mqtt messages communication_sqlite_db.execute( "INSERT INTO " + sqlite.SqliteTables.PENDING_MQTT_MESSAGES.value + " (type, message) VALUES (?, ?)", (message[0][1], message[0][2])) # remove the published message from the queue communication_sqlite_db.execute( f"DELETE FROM {sqlite.SqliteTables.CONTROLLER_MESSAGES.value} WHERE id = {message[0][0]}") continue # check if there are any new outgoing mqtt messages in the sqlite db if communication_sqlite_db.do_table_values_exist(sqlite.SqliteTables.PENDING_MQTT_MESSAGES.value): # fetch the next message (lowest `id`) from the queue and send it message = communication_sqlite_db.execute( f"SELECT id, type, message FROM {sqlite.SqliteTables.PENDING_MQTT_MESSAGES.value} ORDER BY id LIMIT 1" ) if len(message) > 0: message_type = message[0][1] message_obj = json.loads(message[0][2]) debug('Sending controller message: ' + str(message[0])) if not mqtt_client.publish_telemetry(message[0][2]): continue # remove the published message from the queue communication_sqlite_db.execute( f"DELETE FROM {sqlite.SqliteTables.PENDING_MQTT_MESSAGES.value} WHERE id = {message[0][0]}") continue # --- Controller supervision --- controller_running_since_ts = docker_client.get_edge_startup_timestamp_ms() or 0 last_controller_health_check_ts = get_last_controller_health_check_ts() # --- Auxiliary health telemetry --- # publish controller startup time and health check time to mqtt if aux_data_publish_ts is None or int(time_ns() / 1_000_000) - aux_data_publish_ts > AUX_DATA_PUBLISH_INTERVAL_MS: aux_data_publish_ts = int(time_ns() / 1_000_000) mqtt_client.publish_telemetry(json.dumps({ "ts": aux_data_publish_ts, "values": { "ms_since_controller_startup": aux_data_publish_ts - controller_running_since_ts, "ms_since_last_controller_health_check": aux_data_publish_ts - last_controller_health_check_ts } })) if (max(last_controller_health_check_ts, controller_running_since_ts) < int(time_ns() / 1_000_000) - (6 * 3600_000) and docker_client.is_controller_running()): warn("Controller did not send health check in the last 6 hours, stopping container...") docker_client.stop_controller() continue # if nothing happened this iteration, sleep for a while sleep(5) except Exception as e: utils.misc.fatal_error(f"An error occurred in gateway main loop: {e}")