Source code for on_mqtt_msg.check_for_file_content_update

"""Handle remote file content updates received via MQTT.

This module processes ThingsBoard shared attribute updates of the form
``FILE_CONTENT_<file_key>`` and applies the corresponding file changes on the
Edge Gateway filesystem.

It implements the *apply* side of the Remote File Management workflow:
- Decode incoming file content according to its declared encoding.
- Write updated file contents to disk.
- Recalculate and publish file hashes (``FILE_HASHES``).
- Mirror updated file contents back via ``FILE_READ_<file_key>`` attributes.
- Optionally trigger a controller restart if required by the file definition.

Notes
-----
- File metadata (paths, encodings, flags) is defined in the ``FILES`` shared attribute.
- Hash synchronization logic is coordinated with
  ``on_mqtt_msg.check_for_file_hashes_update``.
"""

import json

from modules.docker_client import GatewayDockerClient
from modules.file_writer import GatewayFileWriter, write_file_content_to_client_attribute
from modules.logging import info, error
from typing import Any

from modules.mqtt import GatewayMqttClient
from on_mqtt_msg.check_for_file_hashes_update import FILE_HASHES_TB_KEY
from utils.misc import file_exists, get_maybe

"""Prefix used to identify file content updates in shared attributes."""
FILE_CONTENT_PREFIX = "FILE_CONTENT_"

[docs] def on_msg_check_for_file_content_update(msg_payload: Any) -> bool: """Process an incoming file content update message. Args: msg_payload: MQTT message payload containing shared attributes. Returns: ``True`` if the message was handled (even if no update was applied), ``False`` if the message is not related to file content updates. """ payload = get_maybe(msg_payload, "shared") or msg_payload file_id = None # Identify the first FILE_CONTENT_<file_key> entry in the payload for key in payload: if key.startswith(FILE_CONTENT_PREFIX): file_id = key.replace(FILE_CONTENT_PREFIX, "") break if file_id is None: return False files_definitions = GatewayFileWriter().get_files() if file_id not in files_definitions: error(f"File definition for {file_id} not found") return False file_definition = files_definitions[file_id] input_file_content = get_maybe(payload, FILE_CONTENT_PREFIX + file_id) if input_file_content is None: error("Invalid file content update received") return False # Decode incoming file content according to the declared encoding file_encoding = get_maybe(file_definition, "encoding") or "text" if file_encoding == "json": if isinstance(input_file_content, dict): file_content_bytes = json.dumps(input_file_content).encode("utf-8") else: error(f"Invalid file content for {file_id}, expected JSON object") return False elif file_encoding == "base64": import base64 if isinstance(input_file_content, str): try: file_content_bytes = base64.b64decode(input_file_content) except Exception as e: error(f"Failed to decode base64 content for {file_id}: {e}") return False else: error(f"Invalid file content for {file_id}, expected base64 string") return False elif file_encoding == "text": if isinstance(input_file_content, str): # encode as utf-8 bytes file_content_bytes = input_file_content.encode("utf-8") else: error(f"Invalid file content for {file_id}, expected text string") return False else: error(f"Unknown content encoding for {file_id}: {file_encoding}") return False file_write_version = get_maybe(file_definition, "write_version") file_path = GatewayFileWriter().expand_file_path(get_maybe(file_definition, "path")) create_if_not_exist = get_maybe(file_definition, "create_if_not_exist") in [None, True, "True"] restart_controller_on_change = get_maybe(file_definition, "restart_controller_on_change") in [True, "True"] if file_path is None: error(f"File definition for {file_id} has no path, unable to write file content update") return True # check if file already exists, if not, check if it should be created if file_exists(file_path) or create_if_not_exist: info(f"Writing file {file_id} at path: {file_path}") try: # write content to file with open(file_path, "wb") as f: f.write(file_content_bytes) # calculate new file hash and update it to ThingsBoard file_content_hash = GatewayFileWriter().calc_file_hash(file_path) file_hashes = GatewayFileWriter().get_tb_file_hashes() if file_hashes is None: error(f"File hashes are not available, cannot update hash for {file_id}") else: old_hash = get_maybe(file_hashes, file_id, "hash") file_hashes[file_id] = {"hash": file_content_hash} if file_write_version not in [None, ""]: file_hashes[file_id]["write_version"] = file_write_version GatewayMqttClient().publish_message_raw("v1/devices/me/attributes", json.dumps({ FILE_HASHES_TB_KEY: file_hashes })) GatewayFileWriter().set_tb_hashes(file_hashes) # update file content READ attribute if old_hash != file_content_hash: info(f"File {file_id} content updated, updating attribute") write_file_content_to_client_attribute(file_id, input_file_content) GatewayFileWriter().did_file_change(file_path) # update internal state if restart_controller_on_change: info(f"Restarting controller due to file content change") GatewayDockerClient().stop_controller() else: info(f"File {file_id} content unchanged, not updating attribute") # request file definitions again to verify everything is correct GatewayMqttClient().request_attributes({"sharedKeys": f"FILES"}) except Exception as e: error(f"Failed to create file {file_id} at path {file_path}: {e}") return True else: error(f"File {file_id} does not exist at path: {file_path} and 'create_if_not_exist' is set to false") return True return True