Source code for modules.docker_client

"""Docker runtime management for the Edge Gateway controller.

This module provides the :class:`~modules.docker_client.GatewayDockerClient`, a thin
wrapper around the Docker Engine API used by the Edge Gateway to manage the
*controller* container.

Responsibilities
----------------
- Detect whether the controller container is running and determine its version.
- Start/stop the controller container in a controlled way.
- Build controller images from a Git tag or commit hash if the image is not available.
- Publish ThingsBoard OTA-related software state updates via MQTT (``sw_state``).

The controller is deployed as a Docker container (``teg_controller``) with images
tagged as ``teg-controller-<version>:latest``. Versions may be Git tags (e.g. ``v1.2.3``)
or full commit hashes.

Notes
-----
- This client is implemented as a process-level singleton to avoid repeated Docker
  client initialization.
- The implementation assumes host networking and a privileged container, as required
  for the deployed Raspberry Pi environment.

"""

import datetime
import os
from time import sleep
from typing import Any, Optional

import docker
from docker import DockerClient
from docker.types import LogConfig

from modules.git_client import GatewayGitClient
from modules.logging import debug, info, warn, error
from modules.mqtt import GatewayMqttClient
from utils.paths import CONTROLLER_GIT_PATH, GATEWAY_DATA_PATH, CONTROLLER_LOGS_PATH, CONTROLLER_DATA_PATH, \
    CONTROLLER_DOCKERCONTEXT_PATH, CONTROLLER_DOCKERFILE_PATH

CONTROLLER_CONTAINER_NAME: str = "teg_controller"
CONTROLLER_IMAGE_PREFIX: str = "teg-controller-"

singleton_instance: Optional["GatewayDockerClient"] = None

[docs] class GatewayDockerClient: """Manage the controller Docker container for an Edge Gateway device. The gateway and controller are intentionally separated: the Edge Gateway remains stable while controller versions can be updated independently (e.g. via OTA packages in ThingsBoard). This class wraps Docker operations (list, stop, run, build) and integrates with the gateway’s Git and MQTT clients to support automated controller updates. Attributes ---------- last_launched_version: Cached version string of the last successfully launched controller. docker_client: Docker SDK client instance created via :func:`docker.from_env`, or ``None`` if Docker is unavailable. """ last_launched_version: Optional[str] = None docker_client: Optional[DockerClient] = None def __init__(self) -> None: global singleton_instance if singleton_instance is None: debug("[DOCKER-CLIENT] Initializing GatewayDockerClient") super().__init__() singleton_instance = self try: self.docker_client = docker.from_env() except Exception as e: error("[DOCKER-CLIENT] Failed to initialize GatewayDockerClient: {}".format(e)) self.docker_client = None # Singleton pattern def __new__(cls: Any) -> Any: global singleton_instance if singleton_instance is not None: return singleton_instance return super(GatewayDockerClient, cls).__new__(cls)
[docs] def get_last_launched_controller_version(self) -> Optional[str]: """Return the last launched controller version. The value is cached in-memory and persisted in ``$GATEWAY_DATA_PATH/last_launched_controller_version.txt``. If the file is not available, the environment variable ``TEG_DEFAULT_CONTROLLER_VERSION`` is used as a fallback. Returns: The version tag/commit hash, or ``None`` if unknown. """ if self.last_launched_version is not None: return self.last_launched_version # read controller version from file try: with open(os.path.join(GATEWAY_DATA_PATH, "last_launched_controller_version.txt"), "r") as file: last_launched_controller_version = file.read() self.last_launched_version = last_launched_controller_version return last_launched_controller_version except: default_version = os.environ.get("TEG_DEFAULT_CONTROLLER_VERSION") if default_version is not None: return default_version return None
[docs] def set_last_launched_controller_version(self, last_launched_controller_version: str): """Persist the last launched controller version. Args: last_launched_controller_version: Git tag or commit hash to store. """ self.last_launched_version = last_launched_controller_version # write to file try: with open(os.path.join(GATEWAY_DATA_PATH, "last_launched_controller_version.txt"), "w") as file: file.write(last_launched_controller_version) except Exception as e: error("[DOCKER-CLIENT] Failed to write last launched controller version: {}".format(e))
[docs] def is_controller_running(self) -> bool: """Check whether the controller container is running. Returns: ``True`` if the container exists and is running, otherwise ``False``. """ if self.docker_client is None: error("[DOCKER-CLIENT] is_controller_running: Docker client not initialized") return False containers = self.docker_client.containers.list() for container in containers: if container.name == CONTROLLER_CONTAINER_NAME: return container.attrs["State"]["Running"] return False
[docs] def is_image_available(self, image_tag: str) -> bool: """Check whether a Docker image tag exists locally. Args: image_tag: Full Docker image tag to look up (e.g. ``teg-controller-v1.0.0:latest``). Returns: ``True`` if the image tag exists locally, otherwise ``False``. """ if self.docker_client is None: error("[DOCKER-CLIENT] is_image_available: Docker client not initialized") return False for image in self.docker_client.images.list(): if image_tag in image.tags: return True return False
[docs] def get_controller_version(self) -> Optional[str]: """Return the controller version inferred from the running container image. Returns: The Git tag/commit hash parsed from the image name, or ``None`` if the controller is not running or the version cannot be determined. """ if self.docker_client is None: error("[DOCKER-CLIENT] get_controller_version: Docker client not initialized") return None if self.is_controller_running(): containers = self.docker_client.containers.list() for container in containers: if container.name == CONTROLLER_CONTAINER_NAME: version = container.attrs["Config"]["Image"].split("-")[-1] if version.__len__() > 0 and (version[0] == "v" or version.__len__() == 40): if version.endswith(":latest"): version = version[:-7] return version return None
[docs] def get_edge_startup_timestamp_ms(self) -> Optional[int]: """Return the controller container start time as Unix milliseconds. Returns: Start timestamp in milliseconds (UTC) if the controller is running, else ``None``. """ if self.docker_client is None: error("[DOCKER-CLIENT] get_edge_startup_timestamp_ms: Docker client not initialized") return None if self.is_controller_running(): containers = self.docker_client.containers.list() for container in containers: if container.name == CONTROLLER_CONTAINER_NAME: return int( datetime.datetime.strptime(container.attrs["State"]["StartedAt"][:-4], "%Y-%m-%dT%H:%M:%S.%f" ) .replace(tzinfo=datetime.timezone.utc) .timestamp() * 1000 ) return None
[docs] def stop_controller(self) -> None: """Stop the running controller container (if any). This stores the currently running controller version as the last-launched version before stopping the container. Returns: None """ if self.docker_client is None: error("[DOCKER-CLIENT] stop_controller: Docker client not initialized") return None if self.is_controller_running(): containers = self.docker_client.containers.list() for container in containers: if container.name == CONTROLLER_CONTAINER_NAME: running_controller_version = self.get_controller_version() if running_controller_version is not None: self.set_last_launched_controller_version(running_controller_version) info("[DOCKER-CLIENT] Stopping controller container...") container.stop(timeout=60) info("[DOCKER-CLIENT] Stopped controller container") else: info("[DOCKER-CLIENT] Controller container is not running")
[docs] def prune_containers(self) -> None: """Remove stopped containers to keep the Docker environment clean.""" if self.docker_client is None: error("[DOCKER-CLIENT] prune_containers: Docker client not initialized") return None self.docker_client.containers.prune() info("[DOCKER-CLIENT] Pruned containers")
[docs] def start_controller_safely(self, version_to_launch: str): """Start the controller and suppress unexpected exceptions. Args: version_to_launch: Git tag or commit hash to run. """ try: self.start_controller(version_to_launch) except Exception as e: warn("[DOCKER-CLIENT] Failed to start controller: {}".format(e))
[docs] def start_controller(self, version_to_launch: str) -> None: """Start the controller container for a given version. If the requested version is already running, the method is a no-op. If the required image is not available locally, the controller repository is fetched, reset to the referenced commit, and a Docker image is built from the local Docker context. During the update lifecycle, the method publishes OTA-related states via MQTT (e.g. ``DOWNLOADING``, ``DOWNLOADED``, ``UPDATING``, ``UPDATED``). Args: version_to_launch: Git tag (e.g. ``v1.0.0``) or full commit hash. Returns: None """ if self.docker_client is None: error("[DOCKER-CLIENT] start_controller: Docker client not initialized") return None if self.is_controller_running(): running_controller_version = self.get_controller_version() if running_controller_version != version_to_launch: self.stop_controller() self.start_controller(version_to_launch) else: info("[DOCKER-CLIENT] Software already running with version " + version_to_launch) self.set_last_launched_controller_version(running_controller_version) return image_tag : str = CONTROLLER_IMAGE_PREFIX + version_to_launch + ":latest" # edge container is not running # check if the image is available already, if not build it if not self.is_image_available(image_tag): error("[DOCKER-CLIENT] Image '" + image_tag + "' not available") info("[DOCKER-CLIENT] Building image for version '" + version_to_launch + "'") GatewayMqttClient().publish_sw_state(version_to_launch, "DOWNLOADING") GatewayGitClient().execute_fetch() commit_hash = GatewayGitClient().get_commit_from_hash_or_tag(version_to_launch) if commit_hash is None: error("[DOCKER-CLIENT] Unable to get commit hash for version '" + version_to_launch + "'") return info("[DOCKER-CLIENT] Building image for commit " + commit_hash) if GatewayGitClient().execute_reset_to_commit(commit_hash) \ and GatewayGitClient().get_current_commit() == commit_hash: info("[DOCKER-CLIENT] Successfully reset to commit " + commit_hash) else: error("[DOCKER-CLIENT] Unable to reset to commit " + commit_hash) return GatewayMqttClient().publish_sw_state(version_to_launch, "DOWNLOADED") self.docker_client.images.build( path=CONTROLLER_DOCKERCONTEXT_PATH, dockerfile=CONTROLLER_DOCKERFILE_PATH, tag=CONTROLLER_IMAGE_PREFIX + version_to_launch + ":latest" ) info("[DOCKER-CLIENT] Built image for commit " + commit_hash + " with tag " + CONTROLLER_IMAGE_PREFIX + version_to_launch) GatewayMqttClient().publish_sw_state(version_to_launch, "UPDATING") # remove old containers and start the new one self.prune_containers() self.docker_client.containers.run( image_tag, detach=True, name=CONTROLLER_CONTAINER_NAME, restart_policy={ "MaximumRetryCount": 3, "Name": "on-failure" }, log_config=LogConfig(type=LogConfig.types.JSON, config={ "max-size": "10m", "max-file": "5" }), privileged=True, network_mode="host", volumes={ "/bin/vcgencmd": { "bind": "/bin/vcgencmd", "mode": "ro" }, "/bin/uptime": { "bind": "/bin/uptime", "mode": "ro" }, "/bin/pigs": { "bind": "/bin/pigs", "mode": "ro" }, CONTROLLER_DATA_PATH: { "bind": "/root/data", "mode": "rw" }, CONTROLLER_LOGS_PATH: { "bind": "/root/logs", "mode": "rw" }, } ) self.set_last_launched_controller_version(version_to_launch) GatewayMqttClient().publish_sw_state(version_to_launch, "UPDATED") info("[DOCKER-CLIENT] Started container with version '" + version_to_launch + "'")