REX : On Battery Project using RustiqIOT

Designing and working on battery powered designed project, broad several concerns, and needs:

  • Maintaining and sustaining the power only on operative conditions

  • Power / Battery Supervision measurement

  • Managing communication and recovery for transmission loss, intermittent communications

In this use case, power management is key. There are major risks of device loss if :

  • The power stay, because of a software issue -> this drain the battery

Block diagram of typical battery powered project

On a large view of the device software organization, several service are involved :

Supervisor : the main logic of the device. this service communicate with the powermanagement to periodically ask for staying in power while the regular logic is up (watchdog).

PowerManagement : this service communicate with the hardware, and bring back : measure, and time credits for deep sleep and autonomous wake up.

ConnexionManager: setup up the configuration and external system communication strategy. (wifi, radio, lora, … )

TransferService: manage the commands and file transfers, with recovery

Scheduler: this service maintain some unique or periodic tasks

Device Display Service : service used for display

Hardware concerns

To achieve the process, and save power, the hardware organization is as follow :

  • a dedicated power unit, using a small microcontroller, with deep sleep capabilities (MicroAmps consumption), is used to wake up the main unit.

  • the mainunit loop do its tasks and ask for deep sleep when down

Main unit Implementation view

In a simple implementation, the supervisor may look like :

  • Check enrollment, and setup enrollment if not already done

  • Start the supervisor loop, with the following steps :

    • setting up a watchdog on power service to stay it live

    • ask for administrative commands, and process them

    • look for scheduled operations, and execute them

    • send status report

    • and trigger the next wakeup, using the scheduler

Supervisor code example

This example implementation is done in python, this has been used on prototyping cycles, to validate some concerns and device behaviours.

import logging
from datetime import datetime, timezone, timedelta
import threading
from typing import Optional

from .config import SupervisorConfig
from .power_manager import PowerManager
from .network_manager import NetworkManager
from .display_manager import DisplayManager
from .enrollment_manager import EnrollmentManager

import traceback
import json
import time

import xmlrpc
import xmlrpc.client
import xmlrpc.server
from xmlrpc.client import Transport
from http.client import HTTPConnection



logger = logging.getLogger(__name__)


class ProxiedTransport(xmlrpc.client.Transport):

    def __init__(self, timeout = 1000):
        super().__init__()
        self.timeout = timeout

    def make_connection(self, host):
        connection = HTTPConnection(host, timeout=self.timeout)
        logger.debug(f"making connection to {host}")
        self._connection = (host, connection)
        return connection



class Supervisor:
    def __init__(self):
        self.config = SupervisorConfig.from_env()
        self.admin_client = self._setup_admin_client()
        self.admin_client_fast = self._setup_admin_client_fast()
        
        self.power_manager = PowerManager(self.admin_client, self.admin_client_fast)
        self.network_manager = NetworkManager(self.admin_client)
        self.display_manager = DisplayManager(self.admin_client)
        self.enrollment_manager = EnrollmentManager(
            self.config,
            self.admin_client,
            self.display_manager,
            self.network_manager,
            self.admin_client_fast
        )

    def _setup_admin_client(self):
        transport = ProxiedTransport(timeout=40000)
        admin_client = xmlrpc.client.ServerProxy("http://localhost:8003/", transport=transport)
        return admin_client
      
    def _setup_admin_client_fast(self):
        transport = ProxiedTransport(timeout=1000)
        admin_client = xmlrpc.client.ServerProxy("http://localhost:8003/", transport=transport)
        return admin_client
    
    def _get_machine_id(self):
        try:
            machine_id = json.loads(self.admin_client.get_machine_id())
            return machine_id
        except Exception as e:
            logger.error(f"SUPERVISOR : error in getting machine id: {e}")
            raise

    def run(self):
        try:
            machine_id = self._get_machine_id()
            logger.info(f"SUPERVISOR : machine id: {machine_id}")
            
            if not self.enrollment_manager.is_enrolled():
                logger.info("Device not enrolled, starting enrollment")
                self.enrollment_manager.enroll_device(machine_id)
                return
            else:
                logger.info("Device enrolled, starting supervisor cycle")

            self._run_supervisor_cycle()
        except Exception as e:
            logger.error(f"Supervisor failed: {e}")
            traceback.print_exc()
            raise

    def _run_supervisor_cycle(self):
        
        if not self.power_manager.is_power_module_available():
            logger.info("SUPERVISOR : power module not available, skipping supervisor cycle")
            return

        if not self.config.force_powered_mode:
            logger.info("SUPERVISOR : ensuring battery mode")
            self.power_manager.ensure_battery_mode()
            logger.info("SUPERVISOR : starting watchdog")
            self.power_manager.start_watchdog()

        try:
            self._process_remote_commands()
            self._process_scheduled_tasks()
            self._update_monitoring_status()
            
            next_wake_time = self._calculate_next_wake_time()
            if next_wake_time is None:
                next_wake_time = datetime.now(timezone.utc) + timedelta(hours=24)

            logger.info(f"SUPERVISOR : next wake time: {next_wake_time}")
            
            if not self.config.force_powered_mode:
                if next_wake_time is None:
                    # define a default next wake time
                    next_wake_time = datetime.now(timezone.utc) + timedelta(hours=24)
                # if next wake time is more than 2 days ahead, set it to 2 days
                if next_wake_time > datetime.now(timezone.utc) + timedelta(days=2):
                    next_wake_time = datetime.now(timezone.utc) + timedelta(days=2)

                logger.info(f"SUPERVISOR : scheduling powerdown and recovery at {next_wake_time}")

                self.power_manager.schedule_powerdown(next_wake_time)
        finally:
            if not self.config.force_powered_mode:
                self.power_manager.stop_watchdog()

        # we wait a bit for power mode to not overload the server
        # on battery mode, the powerdown command will kill all processes
        print("SUPERVISOR : wait 60 seconds")
        time.sleep(10)

    def _process_remote_commands(self):
        try:
            result = self.admin_client.call_service("monitoring", "process_administrative_commands")
            print(f"SUPERVISOR : Monitoring process administrative commands result: {result}")
        except Exception as e:
            print(f"SUPERVISOR : error in processing remote commands: {e}")
            traceback.print_exc()

    def _process_scheduled_tasks(self):
        try:
            next_job_time_json = self.admin_client.call_service("scheduler", "get_next_execution_time_for_all_jobs")
            next_job_time = json.loads(next_job_time_json)

            current_time = datetime.now(timezone.utc)
            for job in next_job_time:
                try:
                    # see if we are in the time slots to do the job
                    print(f"SUPERVISOR : job to do : {job}")

                    assert "job_id" in job
                    assert "next_execution_time" in job
                    assert "name" in job
                    assert "executed_expression" in job

                    # get the job time
                    job_time = datetime.fromisoformat(job["next_execution_time"])
                    job_command = job["executed_expression"]   

                    print(f"SUPERVISOR : current time: {current_time}")
                    if current_time > job_time:
                        print(f"SUPERVISOR : job {job['name']} is due")
                        print(f"SUPERVISOR EXECUTING JOB {job['name']} - {job_command}")

                        result = ""
                        iserror = 0
                        try:
                            result = self.admin_client.execute_command(job_command)
                        except Exception as e:
                            iserror = 1
                            result = "ERROR:" + str(e)
                            print(f"SUPERVISOR : error in executing job: {job_command}")
                            traceback.print_exc()

                        # ack the job realization
                        self.admin_client.call_service('scheduler', 
                                            'commit_job_execution', 
                                            job["job_id"], 
                                            datetime.now(timezone.utc).isoformat(), 
                                            iserror,
                                            result)
                        
                    else:
                        print(f"SUPERVISOR : job {job['name']} is not DUE, skipped")

                except Exception as e:
                    print(f"SUPERVISOR : error in job: {job}")
                    traceback.print_exc()

        except Exception as e:
            print(f"SUPERVISOR : error in processing scheduled tasks: {e}")
            traceback.print_exc()

    def _update_monitoring_status(self):
        try:
            print(f"SUPERVISOR : uploading monitoring")
            result = self.admin_client.call_service("monitoring", "upload_monitoring")
            print(f"SUPERVISOR : uploading monitoring done")
        except Exception as e:
            print(f"SUPERVISOR : error in uploading monitoring: {e}")
            traceback.print_exc()

    def _calculate_next_wake_time(self) -> datetime:
        # Calculate next wake time implementation...
        try:
            next_job_time_json = self.admin_client.call_service("scheduler", "get_next_execution_time_for_all_jobs")
            next_job_time = json.loads(next_job_time_json)

            if next_job_time is not None:
                if len(next_job_time) > 0:
                    job_time = datetime.fromisoformat(next_job_time[0]["next_execution_time"])
                    return job_time
                
            return None
            
        except Exception as e:
            print(f"SUPERVISOR : error in calculating next wake time: {e}")
            traceback.print_exc()

        return None