MQTT Communication¶
This section covers all communication with ThingsBoard via MQTT, including telemetry, attributes, and inbound message routing.
MQTT Client¶
MQTT client for ThingsBoard communication.
This module implements GatewayMqttClient, a thin wrapper
around the Paho MQTT client used by the Edge Gateway to communicate with
ThingsBoard.
The client is responsible for:
- Establishing a TLS-secured MQTT connection using a ThingsBoard access token.
- Subscribing to RPC, attribute, and OTA update topics.
- Publishing telemetry and attributes (including OTA software state sw_state).
- Providing a thread-safe inbound message queue for the gateway main loop.
Notes
The client follows a singleton pattern to avoid multiple concurrent MQTT clients in the gateway process.
The
init()method configures callbacks and resets internal state; it is not the same as object construction.
- class modules.mqtt.GatewayMqttClient[source]¶
Bases:
ClientMQTT client used by the Edge Gateway to communicate with ThingsBoard.
The client subscribes to RPC requests, shared/client attributes, and OTA update topics and exposes helper methods to publish telemetry, attributes, and software update state.
Incoming MQTT messages are placed into
message_queueas dictionaries withtopicand parsed JSONpayloadfields.- attribute_request_id: int = 0¶
- connected: bool = False¶
- init(access_token)[source]¶
Configure the MQTT client for ThingsBoard access.
- Parameters:
access_token (
str) – ThingsBoard device access token.- Returns:
The configured client instance (self).
- initialized: bool = False¶
- message_queue: Queue = <queue.Queue object>¶
- publish_log(log_level, log_message, timestamp_ms=None)[source]¶
Publish a log record as telemetry.
- Parameters:
log_level – Severity string.
log_message – Log message text. The gateway prefixes the message with
GATEWAY -followed by a space before publishing.timestamp_ms – Optional Unix timestamp in milliseconds. If not provided, a timestamp is generated locally.
- Return type:
bool- Returns:
Trueif the publish succeeded, otherwiseFalse.
- publish_message_raw(topic, message)[source]¶
Publish a raw MQTT message to a topic.
- Parameters:
topic (
str) – MQTT topic.message (
str) – JSON-encoded payload string.
- Return type:
bool- Returns:
Trueif the publish succeeded, otherwiseFalse.
- publish_sw_state(version, state, msg=None)[source]¶
Publish ThingsBoard OTA software state telemetry.
- Parameters:
version (
str) – Controller version tag/commit hash reported as current software.state (
str) – OTA state string (e.g.DOWNLOADING,UPDATED).msg (
Optional[str]) – Optional error message (published assw_error).
- Return type:
None
- publish_telemetry(message)[source]¶
Publish a telemetry payload to ThingsBoard.
- Parameters:
message (
str) – JSON-encoded telemetry payload.- Return type:
bool- Returns:
Trueif the publish succeeded, otherwiseFalse.
Message Handlers¶
The following modules process inbound MQTT messages and dispatch them to the appropriate subsystems.
File Definition Update¶
Handle remote file definition updates received via MQTT.
This module processes ThingsBoard shared attribute updates of the FILES object
and validates the remote file definitions used by the Edge Gateway for Remote
File Management.
Responsibilities¶
Parse and validate incoming
FILESdefinitions.Verify required properties such as file paths and encodings.
Reject invalid or unsupported file definitions early.
Update the active file definitions in
GatewayFileWriter.Trigger a file hash synchronization request (
FILE_HASHES).
Notes
File content updates and hash reconciliation are handled by
check_for_file_content_updateandcheck_for_file_hashes_updaterespectively.This module performs validation only; it does not apply file contents.
- on_mqtt_msg.check_for_files_definition_update.on_msg_check_for_files_definition_update(msg_payload)[source]
Process an incoming file definition update message.
- Parameters:
msg_payload (
Any) – MQTT message payload containing shared attributes.- Return type:
bool- Returns:
Trueif the message was handled as a file definition update,Falseotherwise.
File Hashes Update¶
Handle remote file hash synchronization updates received via MQTT.
This module processes ThingsBoard client attribute updates for FILE_HASHES and
implements the synchronization side of the Remote File Management workflow.
Responsibilities¶
Compare local file hashes with hashes reported by ThingsBoard.
Detect missing, modified, or outdated files on the Edge Gateway.
Request file content updates via
FILE_CONTENT_<file_key>when required.Mirror local file state back to ThingsBoard using
FILE_READ_<file_key>.Publish updated
FILE_HASHESclient attributes after reconciliation.
Notes
File metadata (paths, encodings, write versions) is defined via the
FILESshared attribute.This module complements
on_mqtt_msg.check_for_file_content_update, which applies incoming file content updates.
- on_mqtt_msg.check_for_file_hashes_update.on_msg_check_for_file_hashes_update(msg_payload)[source]
Process an incoming file hash synchronization message.
- Parameters:
msg_payload (
Any) – MQTT message payload containing client attributes.- Return type:
bool- Returns:
Trueif the message was handled as a file hash update,Falseotherwise.
File Content Update¶
Handle remote file content updates received via MQTT.
This module processes ThingsBoard shared attribute updates of the form
FILE_CONTENT_<file_key> and applies the corresponding file changes on the
Edge Gateway filesystem.
It implements the apply side of the Remote File Management workflow:
- Decode incoming file content according to its declared encoding.
- Write updated file contents to disk.
- Recalculate and publish file hashes (FILE_HASHES).
- Mirror updated file contents back via FILE_READ_<file_key> attributes.
- Optionally trigger a controller restart if required by the file definition.
Notes
File metadata (paths, encodings, flags) is defined in the
FILESshared attribute.Hash synchronization logic is coordinated with
on_mqtt_msg.check_for_file_hashes_update.
- on_mqtt_msg.check_for_file_content_update.on_msg_check_for_file_content_update(msg_payload)[source]
Process an incoming file content update message.
- Parameters:
msg_payload (
Any) – MQTT message payload containing shared attributes.- Return type:
bool- Returns:
Trueif the message was handled (even if no update was applied),Falseif the message is not related to file content updates.
OTA Update Check¶
Handle OTA software update notifications received via MQTT.
This module processes ThingsBoard OTA-related attribute updates (sw_title,
sw_version and legacy sf_* keys) and triggers controller software updates
on the Edge Gateway when a new version is available.
It represents the decision and orchestration stage of the OTA workflow: - Detect whether a software update is available. - Compare the requested version against the currently running controller. - Trigger a safe controller restart/update via the Docker client. - Record the last successfully launched controller version.
Notes
The actual Docker image build and container lifecycle are handled by
modules.docker_client.GatewayDockerClient.OTA state reporting (
sw_state) is published by the Docker client during the update lifecycle.
- on_mqtt_msg.check_for_ota_updates.on_msg_check_for_ota_update(msg_payload)[source]
Process an incoming OTA update notification.
- Parameters:
msg_payload (
Optional[Any]) – MQTT message payload containing OTA-related attributes.- Return type:
bool- Returns:
Trueif the message was handled as an OTA update notification,Falseotherwise.