# REX : On Battery Project using RustiqIOT Designing and working on battery powered designed project, broad several concerns, and needs: - Maintaining and sustaining the power **only on operative conditions** - Power / Battery Supervision measurement - Managing communication and recovery for transmission loss, intermittent communications In this use case, power management is key. There are major risks of device loss if : - The power stay, because of a software issue -> this drain the battery ## Block diagram of typical battery powered project On a large view of the device software organization, several service are involved : **Supervisor** : the main logic of the device. this service communicate with the powermanagement to periodically ask for staying in power while the regular logic is up (watchdog). **PowerManagement** : this service communicate with the hardware, and bring back : measure, and time credits for deep sleep and autonomous wake up. **ConnexionManager**: setup up the configuration and external system communication strategy. (wifi, radio, lora, ... ) **TransferService**: manage the commands and file transfers, with recovery **Scheduler**: this service maintain some unique or periodic tasks **Device Display Service** : service used for display ![](battery_powered_device.drawio.png) ## Hardware concerns To achieve the process, and save power, the hardware organization is as follow : - a dedicated power unit, using a small microcontroller, with deep sleep capabilities (MicroAmps consumption), is used to wake up the main unit. - the mainunit loop do its tasks and ask for deep sleep when down ![](battery_powered_device_hardwareview.drawio.png) ## Main unit Implementation view In a simple implementation, the supervisor may look like : - Check enrollment, and setup enrollment if not already done - Start the supervisor loop, with the following steps : - setting up a watchdog on power service to stay it live - ask for administrative commands, and process them - look for scheduled operations, and execute them - send status report - and trigger the next wakeup, using the scheduler ## Supervisor code example This example implementation is done in python, this has been used on prototyping cycles, to validate some concerns and device behaviours. ```python import logging from datetime import datetime, timezone, timedelta import threading from typing import Optional from .config import SupervisorConfig from .power_manager import PowerManager from .network_manager import NetworkManager from .display_manager import DisplayManager from .enrollment_manager import EnrollmentManager import traceback import json import time import xmlrpc import xmlrpc.client import xmlrpc.server from xmlrpc.client import Transport from http.client import HTTPConnection logger = logging.getLogger(__name__) class ProxiedTransport(xmlrpc.client.Transport): def __init__(self, timeout = 1000): super().__init__() self.timeout = timeout def make_connection(self, host): connection = HTTPConnection(host, timeout=self.timeout) logger.debug(f"making connection to {host}") self._connection = (host, connection) return connection class Supervisor: def __init__(self): self.config = SupervisorConfig.from_env() self.admin_client = self._setup_admin_client() self.admin_client_fast = self._setup_admin_client_fast() self.power_manager = PowerManager(self.admin_client, self.admin_client_fast) self.network_manager = NetworkManager(self.admin_client) self.display_manager = DisplayManager(self.admin_client) self.enrollment_manager = EnrollmentManager( self.config, self.admin_client, self.display_manager, self.network_manager, self.admin_client_fast ) def _setup_admin_client(self): transport = ProxiedTransport(timeout=40000) admin_client = xmlrpc.client.ServerProxy("http://localhost:8003/", transport=transport) return admin_client def _setup_admin_client_fast(self): transport = ProxiedTransport(timeout=1000) admin_client = xmlrpc.client.ServerProxy("http://localhost:8003/", transport=transport) return admin_client def _get_machine_id(self): try: machine_id = json.loads(self.admin_client.get_machine_id()) return machine_id except Exception as e: logger.error(f"SUPERVISOR : error in getting machine id: {e}") raise def run(self): try: machine_id = self._get_machine_id() logger.info(f"SUPERVISOR : machine id: {machine_id}") if not self.enrollment_manager.is_enrolled(): logger.info("Device not enrolled, starting enrollment") self.enrollment_manager.enroll_device(machine_id) return else: logger.info("Device enrolled, starting supervisor cycle") self._run_supervisor_cycle() except Exception as e: logger.error(f"Supervisor failed: {e}") traceback.print_exc() raise def _run_supervisor_cycle(self): if not self.power_manager.is_power_module_available(): logger.info("SUPERVISOR : power module not available, skipping supervisor cycle") return if not self.config.force_powered_mode: logger.info("SUPERVISOR : ensuring battery mode") self.power_manager.ensure_battery_mode() logger.info("SUPERVISOR : starting watchdog") self.power_manager.start_watchdog() try: self._process_remote_commands() self._process_scheduled_tasks() self._update_monitoring_status() next_wake_time = self._calculate_next_wake_time() if next_wake_time is None: next_wake_time = datetime.now(timezone.utc) + timedelta(hours=24) logger.info(f"SUPERVISOR : next wake time: {next_wake_time}") if not self.config.force_powered_mode: if next_wake_time is None: # define a default next wake time next_wake_time = datetime.now(timezone.utc) + timedelta(hours=24) # if next wake time is more than 2 days ahead, set it to 2 days if next_wake_time > datetime.now(timezone.utc) + timedelta(days=2): next_wake_time = datetime.now(timezone.utc) + timedelta(days=2) logger.info(f"SUPERVISOR : scheduling powerdown and recovery at {next_wake_time}") self.power_manager.schedule_powerdown(next_wake_time) finally: if not self.config.force_powered_mode: self.power_manager.stop_watchdog() # we wait a bit for power mode to not overload the server # on battery mode, the powerdown command will kill all processes print("SUPERVISOR : wait 60 seconds") time.sleep(10) def _process_remote_commands(self): try: result = self.admin_client.call_service("monitoring", "process_administrative_commands") print(f"SUPERVISOR : Monitoring process administrative commands result: {result}") except Exception as e: print(f"SUPERVISOR : error in processing remote commands: {e}") traceback.print_exc() def _process_scheduled_tasks(self): try: next_job_time_json = self.admin_client.call_service("scheduler", "get_next_execution_time_for_all_jobs") next_job_time = json.loads(next_job_time_json) current_time = datetime.now(timezone.utc) for job in next_job_time: try: # see if we are in the time slots to do the job print(f"SUPERVISOR : job to do : {job}") assert "job_id" in job assert "next_execution_time" in job assert "name" in job assert "executed_expression" in job # get the job time job_time = datetime.fromisoformat(job["next_execution_time"]) job_command = job["executed_expression"] print(f"SUPERVISOR : current time: {current_time}") if current_time > job_time: print(f"SUPERVISOR : job {job['name']} is due") print(f"SUPERVISOR EXECUTING JOB {job['name']} - {job_command}") result = "" iserror = 0 try: result = self.admin_client.execute_command(job_command) except Exception as e: iserror = 1 result = "ERROR:" + str(e) print(f"SUPERVISOR : error in executing job: {job_command}") traceback.print_exc() # ack the job realization self.admin_client.call_service('scheduler', 'commit_job_execution', job["job_id"], datetime.now(timezone.utc).isoformat(), iserror, result) else: print(f"SUPERVISOR : job {job['name']} is not DUE, skipped") except Exception as e: print(f"SUPERVISOR : error in job: {job}") traceback.print_exc() except Exception as e: print(f"SUPERVISOR : error in processing scheduled tasks: {e}") traceback.print_exc() def _update_monitoring_status(self): try: print(f"SUPERVISOR : uploading monitoring") result = self.admin_client.call_service("monitoring", "upload_monitoring") print(f"SUPERVISOR : uploading monitoring done") except Exception as e: print(f"SUPERVISOR : error in uploading monitoring: {e}") traceback.print_exc() def _calculate_next_wake_time(self) -> datetime: # Calculate next wake time implementation... try: next_job_time_json = self.admin_client.call_service("scheduler", "get_next_execution_time_for_all_jobs") next_job_time = json.loads(next_job_time_json) if next_job_time is not None: if len(next_job_time) > 0: job_time = datetime.fromisoformat(next_job_time[0]["next_execution_time"]) return job_time return None except Exception as e: print(f"SUPERVISOR : error in calculating next wake time: {e}") traceback.print_exc() return None ```