REX : On Battery Project using RustiqIOT¶
Designing and working on battery powered designed project, broad several concerns, and needs:
Maintaining and sustaining the power only on operative conditions
Power / Battery Supervision measurement
Managing communication and recovery for transmission loss, intermittent communications
In this use case, power management is key. There are major risks of device loss if :
The power stay, because of a software issue -> this drain the battery
Block diagram of typical battery powered project¶
On a large view of the device software organization, several service are involved :
Supervisor : the main logic of the device. this service communicate with the powermanagement to periodically ask for staying in power while the regular logic is up (watchdog).
PowerManagement : this service communicate with the hardware, and bring back : measure, and time credits for deep sleep and autonomous wake up.
ConnexionManager: setup up the configuration and external system communication strategy. (wifi, radio, lora, … )
TransferService: manage the commands and file transfers, with recovery
Scheduler: this service maintain some unique or periodic tasks
Device Display Service : service used for display

Hardware concerns¶
To achieve the process, and save power, the hardware organization is as follow :
a dedicated power unit, using a small microcontroller, with deep sleep capabilities (MicroAmps consumption), is used to wake up the main unit.
the mainunit loop do its tasks and ask for deep sleep when down

Main unit Implementation view¶
In a simple implementation, the supervisor may look like :
Check enrollment, and setup enrollment if not already done
Start the supervisor loop, with the following steps :
setting up a watchdog on power service to stay it live
ask for administrative commands, and process them
look for scheduled operations, and execute them
send status report
and trigger the next wakeup, using the scheduler
Supervisor code example¶
This example implementation is done in python, this has been used on prototyping cycles, to validate some concerns and device behaviours.
import logging
from datetime import datetime, timezone, timedelta
import threading
from typing import Optional
from .config import SupervisorConfig
from .power_manager import PowerManager
from .network_manager import NetworkManager
from .display_manager import DisplayManager
from .enrollment_manager import EnrollmentManager
import traceback
import json
import time
import xmlrpc
import xmlrpc.client
import xmlrpc.server
from xmlrpc.client import Transport
from http.client import HTTPConnection
logger = logging.getLogger(__name__)
class ProxiedTransport(xmlrpc.client.Transport):
def __init__(self, timeout = 1000):
super().__init__()
self.timeout = timeout
def make_connection(self, host):
connection = HTTPConnection(host, timeout=self.timeout)
logger.debug(f"making connection to {host}")
self._connection = (host, connection)
return connection
class Supervisor:
def __init__(self):
self.config = SupervisorConfig.from_env()
self.admin_client = self._setup_admin_client()
self.admin_client_fast = self._setup_admin_client_fast()
self.power_manager = PowerManager(self.admin_client, self.admin_client_fast)
self.network_manager = NetworkManager(self.admin_client)
self.display_manager = DisplayManager(self.admin_client)
self.enrollment_manager = EnrollmentManager(
self.config,
self.admin_client,
self.display_manager,
self.network_manager,
self.admin_client_fast
)
def _setup_admin_client(self):
transport = ProxiedTransport(timeout=40000)
admin_client = xmlrpc.client.ServerProxy("http://localhost:8003/", transport=transport)
return admin_client
def _setup_admin_client_fast(self):
transport = ProxiedTransport(timeout=1000)
admin_client = xmlrpc.client.ServerProxy("http://localhost:8003/", transport=transport)
return admin_client
def _get_machine_id(self):
try:
machine_id = json.loads(self.admin_client.get_machine_id())
return machine_id
except Exception as e:
logger.error(f"SUPERVISOR : error in getting machine id: {e}")
raise
def run(self):
try:
machine_id = self._get_machine_id()
logger.info(f"SUPERVISOR : machine id: {machine_id}")
if not self.enrollment_manager.is_enrolled():
logger.info("Device not enrolled, starting enrollment")
self.enrollment_manager.enroll_device(machine_id)
return
else:
logger.info("Device enrolled, starting supervisor cycle")
self._run_supervisor_cycle()
except Exception as e:
logger.error(f"Supervisor failed: {e}")
traceback.print_exc()
raise
def _run_supervisor_cycle(self):
if not self.power_manager.is_power_module_available():
logger.info("SUPERVISOR : power module not available, skipping supervisor cycle")
return
if not self.config.force_powered_mode:
logger.info("SUPERVISOR : ensuring battery mode")
self.power_manager.ensure_battery_mode()
logger.info("SUPERVISOR : starting watchdog")
self.power_manager.start_watchdog()
try:
self._process_remote_commands()
self._process_scheduled_tasks()
self._update_monitoring_status()
next_wake_time = self._calculate_next_wake_time()
if next_wake_time is None:
next_wake_time = datetime.now(timezone.utc) + timedelta(hours=24)
logger.info(f"SUPERVISOR : next wake time: {next_wake_time}")
if not self.config.force_powered_mode:
if next_wake_time is None:
# define a default next wake time
next_wake_time = datetime.now(timezone.utc) + timedelta(hours=24)
# if next wake time is more than 2 days ahead, set it to 2 days
if next_wake_time > datetime.now(timezone.utc) + timedelta(days=2):
next_wake_time = datetime.now(timezone.utc) + timedelta(days=2)
logger.info(f"SUPERVISOR : scheduling powerdown and recovery at {next_wake_time}")
self.power_manager.schedule_powerdown(next_wake_time)
finally:
if not self.config.force_powered_mode:
self.power_manager.stop_watchdog()
# we wait a bit for power mode to not overload the server
# on battery mode, the powerdown command will kill all processes
print("SUPERVISOR : wait 60 seconds")
time.sleep(10)
def _process_remote_commands(self):
try:
result = self.admin_client.call_service("monitoring", "process_administrative_commands")
print(f"SUPERVISOR : Monitoring process administrative commands result: {result}")
except Exception as e:
print(f"SUPERVISOR : error in processing remote commands: {e}")
traceback.print_exc()
def _process_scheduled_tasks(self):
try:
next_job_time_json = self.admin_client.call_service("scheduler", "get_next_execution_time_for_all_jobs")
next_job_time = json.loads(next_job_time_json)
current_time = datetime.now(timezone.utc)
for job in next_job_time:
try:
# see if we are in the time slots to do the job
print(f"SUPERVISOR : job to do : {job}")
assert "job_id" in job
assert "next_execution_time" in job
assert "name" in job
assert "executed_expression" in job
# get the job time
job_time = datetime.fromisoformat(job["next_execution_time"])
job_command = job["executed_expression"]
print(f"SUPERVISOR : current time: {current_time}")
if current_time > job_time:
print(f"SUPERVISOR : job {job['name']} is due")
print(f"SUPERVISOR EXECUTING JOB {job['name']} - {job_command}")
result = ""
iserror = 0
try:
result = self.admin_client.execute_command(job_command)
except Exception as e:
iserror = 1
result = "ERROR:" + str(e)
print(f"SUPERVISOR : error in executing job: {job_command}")
traceback.print_exc()
# ack the job realization
self.admin_client.call_service('scheduler',
'commit_job_execution',
job["job_id"],
datetime.now(timezone.utc).isoformat(),
iserror,
result)
else:
print(f"SUPERVISOR : job {job['name']} is not DUE, skipped")
except Exception as e:
print(f"SUPERVISOR : error in job: {job}")
traceback.print_exc()
except Exception as e:
print(f"SUPERVISOR : error in processing scheduled tasks: {e}")
traceback.print_exc()
def _update_monitoring_status(self):
try:
print(f"SUPERVISOR : uploading monitoring")
result = self.admin_client.call_service("monitoring", "upload_monitoring")
print(f"SUPERVISOR : uploading monitoring done")
except Exception as e:
print(f"SUPERVISOR : error in uploading monitoring: {e}")
traceback.print_exc()
def _calculate_next_wake_time(self) -> datetime:
# Calculate next wake time implementation...
try:
next_job_time_json = self.admin_client.call_service("scheduler", "get_next_execution_time_for_all_jobs")
next_job_time = json.loads(next_job_time_json)
if next_job_time is not None:
if len(next_job_time) > 0:
job_time = datetime.fromisoformat(next_job_time[0]["next_execution_time"])
return job_time
return None
except Exception as e:
print(f"SUPERVISOR : error in calculating next wake time: {e}")
traceback.print_exc()
return None