meta-netmodule-bsp/recipes-connectivity/gnss-save-on-shutdown/files/gnss-save-on-shutdown.py

225 lines
9.3 KiB
Python
Executable File

#!/usr/bin/env python3
import subprocess
import re
import sys
import getopt
import os
from os import path
default_device = r"/dev/gnss0"
default_config_file = r"/etc/gnss/gnss0.conf"
config_file_prepath = r"/etc/gnss/"
config_file_end = r".conf"
def out(str, use_systemd_cat=None, level=None):
if use_systemd_cat:
cmd = "systemd-cat -p %s echo \"%s\"" % (level, str)
os.system(cmd)
else:
print("%s" % str)
return
def read_baud_rate(config_file, use_systemd_cat=None):
global output
if not path.exists(config_file):
cmd = "systemd-cat -p err \"config file %s doesn't exists\"" % config_file
os.system(cmd)
sys.exit(0)
with open(config_file, 'r') as file:
file_contents = file.readlines()
file_contents = ''.join(file_contents)
regex_baud_rate = r"^b(aud)? ?r(ate)? ?= ?(?P<baud_rate>\d+)"
pattern_baud_rate = re.compile(regex_baud_rate, flags=re.MULTILINE)
match_baud_rate = pattern_baud_rate.search(file_contents)
if match_baud_rate is not None:
return match_baud_rate.group('baud_rate')
else:
out("unable to read baud rate from %s" % config_file, use_systemd_cat=use_systemd_cat, level='err')
sys.exit(0)
class UbxtoolWrapper:
def __init__(self, tool, device, speed, verbosity):
self.tool = tool
self.device = " -f " + device
self.speed = " -s " + speed
self.verbosity = " -v " + str(verbosity)
def poll_preset(self, preset):
preset = " -p " + preset
ubx_call = self.tool + self.device + self.speed + self.verbosity + preset
process = subprocess.Popen(ubx_call.split(), stdout=subprocess.PIPE)
process.wait()
return process.communicate()[0].decode('utf-8')
def send_command(self, ubx_class, ubx_id, payload=None):
if payload is None:
command = " -c " + ubx_class + "," + ubx_id
else:
command = " -c " + ubx_class + "," + ubx_id + "," + payload
ubx_call = self.tool + self.device + self.speed + self.verbosity + command
process = subprocess.Popen(ubx_call.split(), stdout=subprocess.PIPE)
process.wait()
try:
return process.communicate()[0].decode('utf-8')
except 'ubxtool busy':
print('communication with ubxtool failed - retry')
#cmd = "systemd-cat -p err \communication with ubxtool failed - retry\""
#os.system(cmd)
exit(0)
def find_sos_response(ubx_response, command):
get_command = re.compile(r"command (?P<command>\d) reserved\d x[\w ]+")
get_response = re.compile(r"response (?P<response>\d) reserved\d x[\w ]+")
# check if persist was successful
matches_command = re.finditer(get_command, ubx_response)
match_response = None
for match in matches_command:
if match.group('command') == command:
match_response = re.search(get_response, ubx_response[match.end():])
break
if match_response is None:
# no response found
return None
else:
return match_response.group('response')
# persits receiver state by dumping (parts of) receiver RAM to its flash
def persist_receiver_state(ubx, use_systemd_cat=None):
# Controlled GNSS stop & Hotstart
ubx_response = ubx.send_command(ubx_class='06', ubx_id='04', payload='00,00,08,00')
# save contents of BBR to flash
ubx_response = ubx.send_command(ubx_class='09', ubx_id='14', payload='00,00,00,00')
# check if persist was successful
response = find_sos_response(ubx_response, '2') # 2 = Backup File Creation Acknowledge
if response is None:
out("unable to verify if receiver state is persistant", use_systemd_cat=use_systemd_cat, level='err')
sys.exit(0)
elif response == '1': # ACK -> receiver state persisted
out("successfully persisted receiver state", use_systemd_cat=use_systemd_cat, level='info')
return
else:
out("unable to verify if receiver state is persistant", use_systemd_cat=use_systemd_cat, level='err')
sys.exit(0)
# verifies loading of receiver state from flash to RAM
def verify_load_of_receiver_state(ubx, use_systemd_cat=None):
# Controlled GNSS start & Hotstart
ubx_response = ubx.send_command(ubx_class='06', ubx_id='04', payload='00,00,09,00')
# check if restore was successful
ubx_response = ubx.send_command(ubx_class='09', ubx_id='14')
response = find_sos_response(ubx_response, '3') # 3 = System Restored from Backup
if response is None:
out("unable to verify restoration of receiver state", use_systemd_cat=use_systemd_cat, level='err')
sys.exit(0)
elif response == '0': # Unknown
out("unable to verify restoration of receiver state", use_systemd_cat=use_systemd_cat, level='err')
sys.exit(0)
elif response == '1': # Failed restoring from backup file
out("restoring receiver state failed", use_systemd_cat=use_systemd_cat, level='err')
sys.exit(0)
elif response == '2': # restored from backup
out("successfully restored receiver state", use_systemd_cat=use_systemd_cat, level='info')
return
elif response == '3': # not restored (no backup)
out("there was no persistant receiver state to restore", use_systemd_cat=use_systemd_cat, level='err')
sys.exit(0)
else:
out("unable to verify restoration of receiver state", use_systemd_cat=use_systemd_cat, level='err')
sys.exit(0)
# clears backup
def clear_perisited_receiver_state(ubx, use_systemd_cat=None):
# clear backup (recommended by ublox)
ubx_response = ubx.send_command(ubx_class='09', ubx_id='14', payload='01,00,00,00')
def main(argv):
use_default_device = True
device = default_device
config_file = default_config_file
do_persist = False
do_verify = False
do_clear = False
use_systemd_cat = False
help_message = "\nOptions:\n" \
" -p, --persist Save current GNSS receiver state to flash\n" \
" -v, --verify Verify that peristed receiver state was loaded upon boot\n" \
" -c, --clear Clear saved receiver state from flash (recommended after restore)\n" \
" -d, --device </path/to/device> Specify the device to be used\n" \
" -j, --journal Use systemd-cat instead of print, recommended when running as service\n" \
" to get highlighted error output in systemd journal\n" \
" -h, --help Print help\n"
try:
# Note to self: arguments that require an option are followed with :, long version requite =
opts, args = getopt.getopt(argv, "hpvcjd:", ["help", "persist", "verify", "clear", "journal", "device="])
except getopt.GetoptError:
print("invalid user input")
print(help_message)
sys.exit(0)
if len(opts) < 1:
print("no user input")
print(help_message)
sys.exit(0)
for opt, arg in opts:
if opt in ("-h", "--help"):
print(help_message)
sys.exit(0)
elif opt in ("-p", "--persist"):
do_persist = True
elif opt in ("-v", "--verify"):
do_verify = True
elif opt in ("-c", "--clear"):
do_clear = True
elif opt in ("-j", "--journal"):
use_systemd_cat = True
# logs from this point on get output to journalctl was -j flag is set
elif opt in ("-d", "-device"):
use_default_device = False
device = arg
if not path.exists(device):
out("invalid device", use_systemd_cat=use_systemd_cat, level='err')
sys.exit(0)
regex_device_name = re.compile(r"\/dev\/(?P<device_name>[\w]+)")
device_name = regex_device_name.search(device).group('device_name')
possible_config_file = config_file_prepath + device_name + config_file_end
if not path.exists(possible_config_file):
out("no config file for %s found at %s, using default config file %s instead" % (device_name, possible_config_file, default_config_file), use_systemd_cat=use_systemd_cat, level='warning')
else:
out("config file used for device %s: %s" % (device_name, possible_config_file), use_systemd_cat=use_systemd_cat, level='debug')
config_file = possible_config_file
if do_persist and do_verify:
out("You can't persist the receiver state and verify a restore at the same time.", use_systemd_cat=use_systemd_cat, level='err')
sys.exit(0)
if use_default_device:
out("no device specified, using default %s" % default_device, use_systemd_cat=use_systemd_cat, level='warning')
baud_rate = read_baud_rate(config_file, use_systemd_cat=use_systemd_cat)
ubx = UbxtoolWrapper(tool="ubxtool", device=device, speed=str(baud_rate), verbosity=2)
if do_persist:
persist_receiver_state(ubx, use_systemd_cat=use_systemd_cat)
if do_clear:
clear_perisited_receiver_state(ubx, use_systemd_cat=use_systemd_cat)
if do_verify:
verify_load_of_receiver_state(ubx, use_systemd_cat=use_systemd_cat)
if __name__ == "__main__":
main(sys.argv[1:])