Source code for self_provisioning
"""Self-provisioning logic for the Edge Gateway.
This module implements MQTT-based self-provisioning against ThingsBoard to obtain
an access token for the Edge Gateway at first startup.
If an access token already exists locally, self-provisioning is skipped. Otherwise,
the gateway connects using the ThingsBoard provisioning credentials and requests
a new device access token, which is persisted locally for future use.
Notes
-----
- Provisioning uses the ThingsBoard MQTT provisioning API.
- TLS is always enabled; a custom CA certificate can be supplied via environment
variables.
- Provisioning is a blocking operation and is expected to run only during startup.
"""
import json
import os
import socket
import ssl
from random import randrange
from time import sleep
import argparse
from typing import Tuple
from paho.mqtt.client import Client
from modules.logging import debug
# Global variable used to store the provisioning response payload
provision_reply = None
[docs]
def self_provisioning_get_access_token(args: argparse.Namespace) -> Tuple[bool, str]:
"""Obtain a ThingsBoard access token via self-provisioning if required.
The function first checks whether an access token already exists locally. If so,
it returns the existing token. Otherwise, it performs MQTT-based self-provisioning
to request a new access token from ThingsBoard.
Args:
args: Parsed command-line arguments containing ThingsBoard connection details.
Returns:
Tuple ``(created, access_token)``, where ``created`` indicates whether a new
token was generated during this call.
"""
global provision_reply
# check if access token exists
access_token_path_env_var = os.environ.get("THINGSBOARD_ACCESS_TOKEN") or "./tb_access_token"
if os.path.exists(access_token_path_env_var):
with open(access_token_path_env_var, "r") as f:
access_token = f.read()
if access_token is not None and len(access_token) > 3:
debug(f"Access token found in file {access_token_path_env_var}")
return False, access_token
# else, perform self-provisioning
debug("No access token found, performing self-provisioning...")
mqtt_client = Client()
mqtt_client.username_pw_set("provision", None)
if os.environ.get("THINGSBOARD_CA_CERT") is not None:
print("Using custom CA cert")
mqtt_client.tls_set(cert_reqs=ssl.CERT_REQUIRED, ca_certs=os.environ.get("THINGSBOARD_CA_CERT"))
else:
mqtt_client.tls_set(cert_reqs=ssl.CERT_REQUIRED)
mqtt_client.on_connect = (lambda client, userdata, flags, rc: debug(f"Connected to ThingsBoard for self-provisioning with result code {rc}"))
mqtt_client.connect(args.tb_host, args.tb_port)
# set up the callback to receive the reply from the self-provisioning request
def on_message_callback(client, userdata, message):
global provision_reply
provision_reply = message.payload
mqtt_client.on_message = on_message_callback
mqtt_client.subscribe("/provision/response", qos=1)
mqtt_client.loop_start()
sleep(0.1)
mqtt_client.publish("/provision/request",
payload=json.dumps({
"deviceName": get_device_name(args),
"provisionDeviceKey": os.environ.get("THINGSBOARD_PROVISION_DEVICE_KEY"),
"provisionDeviceSecret": os.environ.get("THINGSBOARD_PROVISION_DEVICE_SECRET")
}), qos=1 )
for _ in range(100):
if provision_reply is not None:
break
sleep(0.1)
mqtt_client.loop_stop()
mqtt_client.disconnect()
if provision_reply is not None:
# parse string to json
provision_reply = json.loads(provision_reply)
# check for error
status = provision_reply.get("status")
if status is not None and status == "FAILURE":
debug(f"Self-provisioning failed with error: {provision_reply.get("errorMsg")}")
exit(1)
credentials_type = provision_reply.get("credentialsType")
if credentials_type is not None and credentials_type == "ACCESS_TOKEN":
credentials_value = provision_reply.get("credentialsValue")
if credentials_value is not None:
debug("Self-provisioning successful.")
debug(f"Writing access token to file: {access_token_path_env_var}")
with open(access_token_path_env_var, "w") as f:
f.write(credentials_value)
return True, credentials_value
debug("Self-provisioning failed")
debug(f"Reply: {provision_reply}")
exit(1)
[docs]
def get_device_name(args: argparse.Namespace) -> str:
"""Determine the ThingsBoard device name for this gateway.
The device name is resolved in the following order:
1. Explicit ``--device-name`` CLI argument (if present)
2. ``THINGSBOARD_DEVICE_NAME`` environment variable
3. Local hostname
4. Randomly generated fallback name
Args:
args: Parsed command-line arguments.
Returns:
Resolved device name string.
"""
return (
getattr(args, "device_name", None)
or os.environ.get("THINGSBOARD_DEVICE_NAME")
or socket.gethostname()
or ("teg-" + str(randrange(1000000, 9999999, 1)))
)