hitl_tester.modules.bms.bms_serial
Gives access to runtime flags
(c) 2020-2024 TurnAround Factor, Inc.
#
CUI DISTRIBUTION CONTROL
Controlled by: DLA J68 R&D SBIP
CUI Category: Small Business Research and Technology
Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
POC: GOV SBIP Program Manager Denise Price, 571-767-0111
Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
Fort Belvoir, VA 22060-6221
#
SBIR DATA RIGHTS
Contract No.:SP4701-23-C-0083
Contractor Name: TurnAround Factor, Inc.
Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
Expiration of SBIR Data Rights Period: September 24, 2029
The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
the markings.
1""" 2Gives access to runtime flags 3 4# (c) 2020-2024 TurnAround Factor, Inc. 5# 6# CUI DISTRIBUTION CONTROL 7# Controlled by: DLA J68 R&D SBIP 8# CUI Category: Small Business Research and Technology 9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, 15# Fort Belvoir, VA 22060-6221 16# 17# SBIR DATA RIGHTS 18# Contract No.:SP4701-23-C-0083 19# Contractor Name: TurnAround Factor, Inc. 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 21# Expiration of SBIR Data Rights Period: September 24, 2029 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce 27# the markings. 28""" 29 30from __future__ import annotations 31 32import csv 33import datetime 34import struct 35import threading 36import time 37from collections import deque 38from contextlib import suppress 39from enum import IntEnum 40from functools import reduce 41from operator import xor 42from pathlib import Path 43from typing import Iterator, ClassVar, cast 44 45import pytest 46import serial 47import serial.tools.list_ports 48from typing_extensions import Self 49 50from hitl_tester.modules.bms_types import BMSFlags, PacketError, BMSState, ControlStatusRegister, CellState 51from hitl_tester.modules.logger import logger 52 53# Do not initialize if we are generating documentation. 54if hasattr(pytest, "flags") and isinstance(pytest.flags, BMSFlags): 55 if not pytest.flags.doc_generation: 56 from hitl_tester.modules.bms.hitl_data import ( 57 FlagProcessor, 58 Struct, 59 runtime_data_format, 60 FLAG_COLLECTION_SIZE, 61 FLAGS_SECTION, 62 ) 63 64 FLAG_PROC = FlagProcessor(FLAG_COLLECTION_SIZE, FLAGS_SECTION) 65 REPORT_PATH = Path(f"{pytest.flags.report_filename}_serial.csv") 66 67NUM_FLAGS = 63 # Total number of flags to iterate over. 68VERSION = 8 # Serial version 69 70 71class SerialPort: 72 """Manager for serial port.""" 73 74 BAUD = 115200 75 instance: ClassVar[Self | None] = None 76 77 def __new__(cls): 78 """Make BMS hardware a singleton.""" 79 if cls.instance is None: 80 cls.instance = super().__new__(cls) 81 return cls.instance 82 83 def __init__(self) -> None: 84 self.index = 0 85 self.buffer: list[int] = [] 86 87 # Get serial port 88 vid = pid = serial_number = None 89 if hasattr(pytest, "flags") and isinstance(pytest.flags, BMSFlags): 90 vid = cast(dict[str, str], pytest.flags.config["bms_serial"])["vendor_id"] 91 pid = cast(dict[str, str], pytest.flags.config["bms_serial"])["product_id"] 92 serial_number = cast(dict[str, str], pytest.flags.config["bms_serial"])["serial_id"] 93 for x in serial.tools.list_ports.comports(): 94 if (x.vid, x.pid, x.serial_number) == (vid, pid, serial_number): 95 serial_path = x.device 96 break 97 else: 98 raise RuntimeError(f"Serial device (vid:pid:serial) {vid}:{pid}:{serial_number} could not be found.") 99 self.serial = serial.Serial(port=serial_path, baudrate=SerialPort.BAUD, timeout=0) 100 101 def peek(self): 102 """Fetch the current byte.""" 103 while self.serial.in_waiting > 0 or self.index >= len(self.buffer): 104 if self.serial.in_waiting > 0: 105 # logger.write_debug_to_report(f"System serial buffer has {self.serial.in_waiting} bytes.") 106 try: 107 self.buffer += list(self.serial.read(self.serial.in_waiting)) 108 except serial.serialutil.SerialException as e: 109 logger.write_debug_to_report(f"Got error: {e}") 110 time.sleep(5) 111 time.sleep(0.1) 112 return self.buffer[self.index] 113 114 def consume(self): 115 """Fetch the current byte and advance.""" 116 result = self.peek() 117 self.index += 1 118 return result 119 120 def clear_history(self): 121 """Discard data before the current index.""" 122 self.buffer = self.buffer[self.index :] 123 self.index = 0 124 125 126class SerialMonitor: 127 """Collect data from the serial buffer.""" 128 129 instance: ClassVar[Self | None] = None 130 131 def __new__(cls, packet_buffer_size: int = 4096): 132 """Make SerialMonitor a singleton.""" 133 if cls.instance is None: 134 cls.instance = super().__new__(cls) 135 return cls.instance 136 137 def __init__(self, packet_buffer_size: int = 4096): 138 self.dropped_packets = 0 139 self.written_packets = 1 140 self.latest_packet: deque[dict[str, int | bool | str]] = deque(maxlen=1) 141 self.packet_deque: deque[dict[str, int | bool | str]] = deque(maxlen=packet_buffer_size) 142 143 if ( 144 hasattr(pytest, "flags") 145 and isinstance(pytest.flags, BMSFlags) 146 and not (pytest.flags.doc_generation or pytest.flags.dry_run) 147 and "bms_serial" in pytest.flags.config 148 ): 149 # Create log file 150 with open(REPORT_PATH, "a", encoding="UTF-8") as csvfile: 151 csv.writer(csvfile).writerow(self.parse_serial_data(dict.fromkeys(self.get_headers(), VERSION)).keys()) 152 153 # Start serial monitor thread 154 self.serial_port = SerialPort() 155 self.daemon = threading.Thread(target=self.runtime, daemon=True) 156 self.daemon.start() 157 158 def runtime(self): 159 """Fetch packets as quickly as possible.""" 160 161 class Command(IntEnum): 162 """The commands for decoding.""" 163 164 START = 0x7C 165 ESCAPE = 0x7D 166 END = 0x7E 167 168 def fetch_data() -> bytes: 169 """Attempt to fetch a valid packet.""" 170 packet_data: list[int] = [] 171 packet_size = struct.calcsize(runtime_data_format.to_Python()) + 1 # Data + CRC 172 173 # Fetch until we have enough data 174 self.serial_port.clear_history() 175 while len(packet_data) <= packet_size: 176 if (byte := self.serial_port.consume()) == Command.START: 177 raise PacketError("Packet was truncated by another.") 178 179 if byte == Command.ESCAPE: 180 if self.serial_port.peek() ^ 0x20 in iter(Command): 181 packet_data.append(self.serial_port.consume() ^ 0x20) 182 else: 183 escaped_flags = ", ".join([f"0x{flag ^ 0x20:02X}" for flag in Command]) 184 raise PacketError(f"Escape was 0x{byte:02X}, expected one of {escaped_flags}.") 185 186 elif byte == Command.END: 187 if len(packet_data) < packet_size: 188 raise PacketError(f"Packet was too short, {len(packet_data)}/{packet_size} bytes.") 189 break 190 191 else: 192 packet_data.append(byte) 193 else: 194 raise PacketError(f"Packet was larger than {packet_size} bytes.") 195 196 # Validate CRC 197 if reduce(xor, packet_data): 198 raise PacketError(f"Invalid CRC, 0x{packet_data[-1]:02X}.") 199 return bytes(packet_data[:-1]) # Remove CRC 200 201 def fetch_packet() -> bytes: 202 """Decode serial stream to a valid packet.""" 203 while True: 204 while self.serial_port.consume() != Command.START: # Find start flag 205 logger.write_debug_to_report( 206 "Discarding bytes: " + str(self.serial_port.buffer[: self.serial_port.index]) 207 ) 208 self.serial_port.clear_history() 209 210 try: # Fetch packet data 211 # logger.write_debug_to_report("waiting for data...") 212 result = fetch_data() 213 # logger.write_debug_to_report("got data") 214 return result 215 except PacketError: 216 logger.write_debug_to_report(f"Index: {self.serial_port.index}, Buffer: {self.serial_port.buffer}") 217 self.serial_port.index = 0 218 219 runtime_data = runtime_data_format("runtime_data") 220 while True: 221 decoded_packet = fetch_packet() 222 unpacked_data = struct.unpack(runtime_data_format.to_Python(), decoded_packet) 223 c_variables = runtime_data.set_values(unpacked_data) 224 if len(self.packet_deque) == self.packet_deque.maxlen: # Deque is full 225 self.dropped_packets += 1 226 packet_data = self.create_dictionary(c_variables) | { 227 "encoded_packet": f"0x{Command.START:02X}" 228 f"{bytes(self.serial_port.buffer[:self.serial_port.index]).hex().upper()}", 229 "decoded_packet": f"0x{decoded_packet.hex().upper()}", 230 } 231 232 # Save to file 233 with open(REPORT_PATH, "a", encoding="UTF-8") as csvfile: 234 csv.writer(csvfile).writerow(self.parse_serial_data(packet_data).values()) 235 self.written_packets += 1 236 logger.write_debug_to_report(f"Serial #{self.written_packets}") 237 238 # Save to deque 239 self.packet_deque.append(packet_data) 240 self.latest_packet.append(packet_data) 241 242 time.sleep(0.1) # Packets only come in every 0.5 seconds at most 243 244 def create_dictionary(self, c_variables: list[Struct]) -> dict[str, int | bool]: 245 """Gather all runtime flags into a dictionary.""" 246 bms_dictionary: dict[str, int | bool] = {} 247 248 for c_variable in c_variables: 249 if "runtime_data.flags.flags" in str(c_variable): 250 for i in range(0, NUM_FLAGS): 251 table = c_variable.index_path[0] # type: ignore[attr-defined] 252 base_key = f"flags.{FLAG_PROC.bit_2_name(table, i)}" 253 254 # Handle duplicates 255 count = 0 256 while (key := f"{base_key}{count or ''}") in bms_dictionary: 257 count += 1 258 bms_dictionary[key] = bool(c_variable.value & (1 << i)) # type: ignore[attr-defined] 259 else: 260 base_key = c_variable.name 261 262 # Handle duplicates 263 count = 0 264 while (key := f"{base_key}{count or ''}") in bms_dictionary: 265 count += 1 266 bms_dictionary[key] = c_variable.value # type: ignore[attr-defined] 267 268 return bms_dictionary 269 270 def read(self, latest: bool = True, blocking: bool = True) -> dict[str, int | bool | str] | None: 271 """Fetch serial data from BMS.""" 272 timeout_minutes = 10 273 start = time.perf_counter() 274 read_deque = self.latest_packet if latest else self.packet_deque 275 276 # logger.write_debug_to_report(f"Reading serial {latest=} {blocking=}") 277 while True: 278 with suppress(IndexError): 279 result = read_deque.popleft() 280 # logger.write_debug_to_report(f"Got serial {result=}") 281 return result 282 if not blocking: 283 return None 284 time.sleep(0.1) # Packets only come in every 0.5 seconds at most 285 if time.perf_counter() - start > timeout_minutes * 60: 286 raise TimeoutError(f"Serial reader waited {timeout_minutes} minutes, but got no response.") 287 288 def get_headers(self) -> list[str]: 289 """Get the headers for the struct.""" 290 runtime_data = runtime_data_format("runtime_data") 291 struct_size = struct.calcsize(runtime_data_format.to_Python()) 292 unpacked_data = struct.unpack(runtime_data_format.to_Python(), bytes(struct_size)) 293 cvars = runtime_data.set_values(unpacked_data) 294 return list(self.create_dictionary(cvars).keys()) 295 296 def parse_serial_data(self, serial_data: dict[str, int | bool | str]) -> dict[str, str | float | None]: 297 """Read serial data dict and parse it into human-readable text.""" 298 299 def decode_timestamp(timestamp: int) -> str: 300 """Convert timestamp to elapsed time string.""" 301 seconds, milliseconds = divmod(timestamp, 1000) 302 try: 303 relative_timestamp = datetime.datetime.fromtimestamp( 304 seconds, datetime.timezone.utc 305 ) - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc) 306 except ValueError: 307 return f"0x{timestamp:02X}" 308 return f"{relative_timestamp.total_seconds():.0f}.{milliseconds}" 309 310 def decode_date(iso_date: int) -> str: 311 """Convert date to text.""" 312 try: 313 date = datetime.datetime.strptime(str(iso_date), "%Y%m%d") 314 return date.strftime("%Y-%m-%d") 315 except ValueError: 316 return f"0x{iso_date:08X}" 317 318 def key_iter(key: str) -> Iterator[tuple[int, int | str]]: 319 """Find as many of key as possible. Format is key, key1, key2, etc.""" 320 counter = -1 321 while (data := serial_data.get(f"{key}{(counter := counter + 1) or ''}")) is not None: 322 yield counter, data 323 324 if VERSION != serial_data["version"]: 325 direction = VERSION < int(serial_data["version"]) 326 raise RuntimeError( 327 f"Expected serial version {VERSION}, but BMS was version {serial_data['version']}. Flash " 328 f"a{'n old' if direction else ' new'}er firmware, or {'up' if direction else 'down'}grade the HITL." 329 ) 330 csv_data: dict[str, str | float | None] = {} 331 332 # Raw packet for debugging 333 csv_data["Encoded Packet"] = serial_data.get("encoded_packet") 334 csv_data["Decoded Packet"] = serial_data.get("decoded_packet") 335 336 # Runtime data 337 csv_data["Version"] = serial_data["version"] 338 csv_data["Build Date"] = decode_date(int(serial_data["build_date"])) 339 340 # Raw measurements 341 csv_data["Elapsed Time Raw"] = decode_timestamp(int(serial_data["timestamp"])) 342 for index, current in key_iter("current"): 343 csv_data[f"Current {index} Raw (A)"] = float(current) / 1000 344 for index, voltage_cell in key_iter("voltage_cell"): 345 csv_data[f"Cell {index} Voltage Raw (V)"] = float(voltage_cell) / 1000 346 for index, voltage_terminal in key_iter("voltage_terminal"): 347 csv_data[f"Terminal {index} Voltage Raw (V)"] = float(voltage_terminal) / 1000 348 for index, temperature in key_iter("temperature"): 349 csv_data[f"Temperature {index} Raw (°C)"] = float(temperature) / 10 - 273 350 csv_data["Micro Temperature Raw (°C)"] = float(serial_data["micro_temperature_raw"]) 351 csv_data["Charge Enable GPIO"] = bool(serial_data["ncharge_en_gpio"]) 352 csv_data["Wakeup GPIO"] = bool(serial_data["n_wakeup_gpio"]) 353 354 # Cooked measurements 355 csv_data["Elapsed Time Measurements"] = decode_timestamp(int(serial_data["meas_timestamp"])) 356 csv_data["Elapsed Time Cooked"] = decode_timestamp(int(serial_data["timestamp1"])) 357 csv_data["Current (A)"] = float(serial_data["mamps"]) / 1000 358 for index, mvolt_battery in key_iter("mvolt_battery"): 359 csv_data[f"Battery {index} Voltage (V)"] = float(mvolt_battery) / 1000 360 for index, mvolt_cell in key_iter("mvolt_cell"): 361 csv_data[f"Cell {index} Voltage (V)"] = float(mvolt_cell) / 1000 362 for index, mvolt_terminal in key_iter("mvolt_terminal"): 363 csv_data[f"Terminal {index} Voltage (V)"] = float(mvolt_terminal) / 1000 364 for index, dk_temp in key_iter("dk_temp"): 365 csv_data[f"Temperature {index} (°C)"] = float(dk_temp) / 10 - 273 366 csv_data["Micro Temperature (°C)"] = float(serial_data["micro_temperature_dc"]) / 10 367 368 # Flags 369 csv_data["Flags"] = " | ".join( 370 key.removeprefix("flags.").title() for key in serial_data if key.startswith("flags.") and serial_data[key] 371 ) 372 csv_data["Cell State"] = str(CellState(int(serial_data["cell_state"]))) 373 csv_data["Charge (%)"] = serial_data["percent_charged"] 374 csv_data["Health (%)"] = serial_data["percent_health"] 375 csv_data["Used (Ah)"] = float(serial_data["milliamp_hour_used"]) / 1000 376 csv_data["Remaining (Ah)"] = float(serial_data["milliamp_hour_remaining"]) / 1000 377 csv_data["Charge Cycles"] = serial_data["charge_cycles"] 378 csv_data["Capacity (Ah)"] = float(serial_data["milliamp_hour_capacity"]) / 1000 379 csv_data["SOC Charge Max"] = serial_data["Q_Max"] 380 csv_data["Columb Count"] = serial_data.get("Columb_Count") 381 csv_data["BMS State"] = str(BMSState(int(serial_data.get("BMS_State", 0)))) 382 csv_data["Wakeup Counter"] = serial_data.get("Wakeup_Counter") 383 csv_data["Reset Flags"] = str(ControlStatusRegister(int(serial_data.get("Reset_Flags", 0)))) 384 csv_data["Git Hash"] = f"0x{serial_data.get('git_hash'):X}" 385 386 # User data 387 for index, user_data in key_iter("userdef_8"): 388 csv_data[f"User Data Byte {index}"] = f"0x{user_data:02X}" 389 for index, user_data in key_iter("userdef_16"): 390 csv_data[f"User Data Word {index}"] = f"0x{user_data:04X}" 391 for index, user_data in key_iter("userdef_32"): 392 csv_data[f"User Data DWord {index}"] = f"0x{user_data:08X}" 393 csv_data["User Data QWord"] = f"0x{serial_data['userdef_64']:016X}" 394 395 return csv_data 396 397 398serial_monitor = SerialMonitor()
NUM_FLAGS =
63
VERSION =
8
class
SerialPort:
72class SerialPort: 73 """Manager for serial port.""" 74 75 BAUD = 115200 76 instance: ClassVar[Self | None] = None 77 78 def __new__(cls): 79 """Make BMS hardware a singleton.""" 80 if cls.instance is None: 81 cls.instance = super().__new__(cls) 82 return cls.instance 83 84 def __init__(self) -> None: 85 self.index = 0 86 self.buffer: list[int] = [] 87 88 # Get serial port 89 vid = pid = serial_number = None 90 if hasattr(pytest, "flags") and isinstance(pytest.flags, BMSFlags): 91 vid = cast(dict[str, str], pytest.flags.config["bms_serial"])["vendor_id"] 92 pid = cast(dict[str, str], pytest.flags.config["bms_serial"])["product_id"] 93 serial_number = cast(dict[str, str], pytest.flags.config["bms_serial"])["serial_id"] 94 for x in serial.tools.list_ports.comports(): 95 if (x.vid, x.pid, x.serial_number) == (vid, pid, serial_number): 96 serial_path = x.device 97 break 98 else: 99 raise RuntimeError(f"Serial device (vid:pid:serial) {vid}:{pid}:{serial_number} could not be found.") 100 self.serial = serial.Serial(port=serial_path, baudrate=SerialPort.BAUD, timeout=0) 101 102 def peek(self): 103 """Fetch the current byte.""" 104 while self.serial.in_waiting > 0 or self.index >= len(self.buffer): 105 if self.serial.in_waiting > 0: 106 # logger.write_debug_to_report(f"System serial buffer has {self.serial.in_waiting} bytes.") 107 try: 108 self.buffer += list(self.serial.read(self.serial.in_waiting)) 109 except serial.serialutil.SerialException as e: 110 logger.write_debug_to_report(f"Got error: {e}") 111 time.sleep(5) 112 time.sleep(0.1) 113 return self.buffer[self.index] 114 115 def consume(self): 116 """Fetch the current byte and advance.""" 117 result = self.peek() 118 self.index += 1 119 return result 120 121 def clear_history(self): 122 """Discard data before the current index.""" 123 self.buffer = self.buffer[self.index :] 124 self.index = 0
Manager for serial port.
SerialPort()
78 def __new__(cls): 79 """Make BMS hardware a singleton.""" 80 if cls.instance is None: 81 cls.instance = super().__new__(cls) 82 return cls.instance
Make BMS hardware a singleton.
def
peek(self):
102 def peek(self): 103 """Fetch the current byte.""" 104 while self.serial.in_waiting > 0 or self.index >= len(self.buffer): 105 if self.serial.in_waiting > 0: 106 # logger.write_debug_to_report(f"System serial buffer has {self.serial.in_waiting} bytes.") 107 try: 108 self.buffer += list(self.serial.read(self.serial.in_waiting)) 109 except serial.serialutil.SerialException as e: 110 logger.write_debug_to_report(f"Got error: {e}") 111 time.sleep(5) 112 time.sleep(0.1) 113 return self.buffer[self.index]
Fetch the current byte.
class
SerialMonitor:
127class SerialMonitor: 128 """Collect data from the serial buffer.""" 129 130 instance: ClassVar[Self | None] = None 131 132 def __new__(cls, packet_buffer_size: int = 4096): 133 """Make SerialMonitor a singleton.""" 134 if cls.instance is None: 135 cls.instance = super().__new__(cls) 136 return cls.instance 137 138 def __init__(self, packet_buffer_size: int = 4096): 139 self.dropped_packets = 0 140 self.written_packets = 1 141 self.latest_packet: deque[dict[str, int | bool | str]] = deque(maxlen=1) 142 self.packet_deque: deque[dict[str, int | bool | str]] = deque(maxlen=packet_buffer_size) 143 144 if ( 145 hasattr(pytest, "flags") 146 and isinstance(pytest.flags, BMSFlags) 147 and not (pytest.flags.doc_generation or pytest.flags.dry_run) 148 and "bms_serial" in pytest.flags.config 149 ): 150 # Create log file 151 with open(REPORT_PATH, "a", encoding="UTF-8") as csvfile: 152 csv.writer(csvfile).writerow(self.parse_serial_data(dict.fromkeys(self.get_headers(), VERSION)).keys()) 153 154 # Start serial monitor thread 155 self.serial_port = SerialPort() 156 self.daemon = threading.Thread(target=self.runtime, daemon=True) 157 self.daemon.start() 158 159 def runtime(self): 160 """Fetch packets as quickly as possible.""" 161 162 class Command(IntEnum): 163 """The commands for decoding.""" 164 165 START = 0x7C 166 ESCAPE = 0x7D 167 END = 0x7E 168 169 def fetch_data() -> bytes: 170 """Attempt to fetch a valid packet.""" 171 packet_data: list[int] = [] 172 packet_size = struct.calcsize(runtime_data_format.to_Python()) + 1 # Data + CRC 173 174 # Fetch until we have enough data 175 self.serial_port.clear_history() 176 while len(packet_data) <= packet_size: 177 if (byte := self.serial_port.consume()) == Command.START: 178 raise PacketError("Packet was truncated by another.") 179 180 if byte == Command.ESCAPE: 181 if self.serial_port.peek() ^ 0x20 in iter(Command): 182 packet_data.append(self.serial_port.consume() ^ 0x20) 183 else: 184 escaped_flags = ", ".join([f"0x{flag ^ 0x20:02X}" for flag in Command]) 185 raise PacketError(f"Escape was 0x{byte:02X}, expected one of {escaped_flags}.") 186 187 elif byte == Command.END: 188 if len(packet_data) < packet_size: 189 raise PacketError(f"Packet was too short, {len(packet_data)}/{packet_size} bytes.") 190 break 191 192 else: 193 packet_data.append(byte) 194 else: 195 raise PacketError(f"Packet was larger than {packet_size} bytes.") 196 197 # Validate CRC 198 if reduce(xor, packet_data): 199 raise PacketError(f"Invalid CRC, 0x{packet_data[-1]:02X}.") 200 return bytes(packet_data[:-1]) # Remove CRC 201 202 def fetch_packet() -> bytes: 203 """Decode serial stream to a valid packet.""" 204 while True: 205 while self.serial_port.consume() != Command.START: # Find start flag 206 logger.write_debug_to_report( 207 "Discarding bytes: " + str(self.serial_port.buffer[: self.serial_port.index]) 208 ) 209 self.serial_port.clear_history() 210 211 try: # Fetch packet data 212 # logger.write_debug_to_report("waiting for data...") 213 result = fetch_data() 214 # logger.write_debug_to_report("got data") 215 return result 216 except PacketError: 217 logger.write_debug_to_report(f"Index: {self.serial_port.index}, Buffer: {self.serial_port.buffer}") 218 self.serial_port.index = 0 219 220 runtime_data = runtime_data_format("runtime_data") 221 while True: 222 decoded_packet = fetch_packet() 223 unpacked_data = struct.unpack(runtime_data_format.to_Python(), decoded_packet) 224 c_variables = runtime_data.set_values(unpacked_data) 225 if len(self.packet_deque) == self.packet_deque.maxlen: # Deque is full 226 self.dropped_packets += 1 227 packet_data = self.create_dictionary(c_variables) | { 228 "encoded_packet": f"0x{Command.START:02X}" 229 f"{bytes(self.serial_port.buffer[:self.serial_port.index]).hex().upper()}", 230 "decoded_packet": f"0x{decoded_packet.hex().upper()}", 231 } 232 233 # Save to file 234 with open(REPORT_PATH, "a", encoding="UTF-8") as csvfile: 235 csv.writer(csvfile).writerow(self.parse_serial_data(packet_data).values()) 236 self.written_packets += 1 237 logger.write_debug_to_report(f"Serial #{self.written_packets}") 238 239 # Save to deque 240 self.packet_deque.append(packet_data) 241 self.latest_packet.append(packet_data) 242 243 time.sleep(0.1) # Packets only come in every 0.5 seconds at most 244 245 def create_dictionary(self, c_variables: list[Struct]) -> dict[str, int | bool]: 246 """Gather all runtime flags into a dictionary.""" 247 bms_dictionary: dict[str, int | bool] = {} 248 249 for c_variable in c_variables: 250 if "runtime_data.flags.flags" in str(c_variable): 251 for i in range(0, NUM_FLAGS): 252 table = c_variable.index_path[0] # type: ignore[attr-defined] 253 base_key = f"flags.{FLAG_PROC.bit_2_name(table, i)}" 254 255 # Handle duplicates 256 count = 0 257 while (key := f"{base_key}{count or ''}") in bms_dictionary: 258 count += 1 259 bms_dictionary[key] = bool(c_variable.value & (1 << i)) # type: ignore[attr-defined] 260 else: 261 base_key = c_variable.name 262 263 # Handle duplicates 264 count = 0 265 while (key := f"{base_key}{count or ''}") in bms_dictionary: 266 count += 1 267 bms_dictionary[key] = c_variable.value # type: ignore[attr-defined] 268 269 return bms_dictionary 270 271 def read(self, latest: bool = True, blocking: bool = True) -> dict[str, int | bool | str] | None: 272 """Fetch serial data from BMS.""" 273 timeout_minutes = 10 274 start = time.perf_counter() 275 read_deque = self.latest_packet if latest else self.packet_deque 276 277 # logger.write_debug_to_report(f"Reading serial {latest=} {blocking=}") 278 while True: 279 with suppress(IndexError): 280 result = read_deque.popleft() 281 # logger.write_debug_to_report(f"Got serial {result=}") 282 return result 283 if not blocking: 284 return None 285 time.sleep(0.1) # Packets only come in every 0.5 seconds at most 286 if time.perf_counter() - start > timeout_minutes * 60: 287 raise TimeoutError(f"Serial reader waited {timeout_minutes} minutes, but got no response.") 288 289 def get_headers(self) -> list[str]: 290 """Get the headers for the struct.""" 291 runtime_data = runtime_data_format("runtime_data") 292 struct_size = struct.calcsize(runtime_data_format.to_Python()) 293 unpacked_data = struct.unpack(runtime_data_format.to_Python(), bytes(struct_size)) 294 cvars = runtime_data.set_values(unpacked_data) 295 return list(self.create_dictionary(cvars).keys()) 296 297 def parse_serial_data(self, serial_data: dict[str, int | bool | str]) -> dict[str, str | float | None]: 298 """Read serial data dict and parse it into human-readable text.""" 299 300 def decode_timestamp(timestamp: int) -> str: 301 """Convert timestamp to elapsed time string.""" 302 seconds, milliseconds = divmod(timestamp, 1000) 303 try: 304 relative_timestamp = datetime.datetime.fromtimestamp( 305 seconds, datetime.timezone.utc 306 ) - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc) 307 except ValueError: 308 return f"0x{timestamp:02X}" 309 return f"{relative_timestamp.total_seconds():.0f}.{milliseconds}" 310 311 def decode_date(iso_date: int) -> str: 312 """Convert date to text.""" 313 try: 314 date = datetime.datetime.strptime(str(iso_date), "%Y%m%d") 315 return date.strftime("%Y-%m-%d") 316 except ValueError: 317 return f"0x{iso_date:08X}" 318 319 def key_iter(key: str) -> Iterator[tuple[int, int | str]]: 320 """Find as many of key as possible. Format is key, key1, key2, etc.""" 321 counter = -1 322 while (data := serial_data.get(f"{key}{(counter := counter + 1) or ''}")) is not None: 323 yield counter, data 324 325 if VERSION != serial_data["version"]: 326 direction = VERSION < int(serial_data["version"]) 327 raise RuntimeError( 328 f"Expected serial version {VERSION}, but BMS was version {serial_data['version']}. Flash " 329 f"a{'n old' if direction else ' new'}er firmware, or {'up' if direction else 'down'}grade the HITL." 330 ) 331 csv_data: dict[str, str | float | None] = {} 332 333 # Raw packet for debugging 334 csv_data["Encoded Packet"] = serial_data.get("encoded_packet") 335 csv_data["Decoded Packet"] = serial_data.get("decoded_packet") 336 337 # Runtime data 338 csv_data["Version"] = serial_data["version"] 339 csv_data["Build Date"] = decode_date(int(serial_data["build_date"])) 340 341 # Raw measurements 342 csv_data["Elapsed Time Raw"] = decode_timestamp(int(serial_data["timestamp"])) 343 for index, current in key_iter("current"): 344 csv_data[f"Current {index} Raw (A)"] = float(current) / 1000 345 for index, voltage_cell in key_iter("voltage_cell"): 346 csv_data[f"Cell {index} Voltage Raw (V)"] = float(voltage_cell) / 1000 347 for index, voltage_terminal in key_iter("voltage_terminal"): 348 csv_data[f"Terminal {index} Voltage Raw (V)"] = float(voltage_terminal) / 1000 349 for index, temperature in key_iter("temperature"): 350 csv_data[f"Temperature {index} Raw (°C)"] = float(temperature) / 10 - 273 351 csv_data["Micro Temperature Raw (°C)"] = float(serial_data["micro_temperature_raw"]) 352 csv_data["Charge Enable GPIO"] = bool(serial_data["ncharge_en_gpio"]) 353 csv_data["Wakeup GPIO"] = bool(serial_data["n_wakeup_gpio"]) 354 355 # Cooked measurements 356 csv_data["Elapsed Time Measurements"] = decode_timestamp(int(serial_data["meas_timestamp"])) 357 csv_data["Elapsed Time Cooked"] = decode_timestamp(int(serial_data["timestamp1"])) 358 csv_data["Current (A)"] = float(serial_data["mamps"]) / 1000 359 for index, mvolt_battery in key_iter("mvolt_battery"): 360 csv_data[f"Battery {index} Voltage (V)"] = float(mvolt_battery) / 1000 361 for index, mvolt_cell in key_iter("mvolt_cell"): 362 csv_data[f"Cell {index} Voltage (V)"] = float(mvolt_cell) / 1000 363 for index, mvolt_terminal in key_iter("mvolt_terminal"): 364 csv_data[f"Terminal {index} Voltage (V)"] = float(mvolt_terminal) / 1000 365 for index, dk_temp in key_iter("dk_temp"): 366 csv_data[f"Temperature {index} (°C)"] = float(dk_temp) / 10 - 273 367 csv_data["Micro Temperature (°C)"] = float(serial_data["micro_temperature_dc"]) / 10 368 369 # Flags 370 csv_data["Flags"] = " | ".join( 371 key.removeprefix("flags.").title() for key in serial_data if key.startswith("flags.") and serial_data[key] 372 ) 373 csv_data["Cell State"] = str(CellState(int(serial_data["cell_state"]))) 374 csv_data["Charge (%)"] = serial_data["percent_charged"] 375 csv_data["Health (%)"] = serial_data["percent_health"] 376 csv_data["Used (Ah)"] = float(serial_data["milliamp_hour_used"]) / 1000 377 csv_data["Remaining (Ah)"] = float(serial_data["milliamp_hour_remaining"]) / 1000 378 csv_data["Charge Cycles"] = serial_data["charge_cycles"] 379 csv_data["Capacity (Ah)"] = float(serial_data["milliamp_hour_capacity"]) / 1000 380 csv_data["SOC Charge Max"] = serial_data["Q_Max"] 381 csv_data["Columb Count"] = serial_data.get("Columb_Count") 382 csv_data["BMS State"] = str(BMSState(int(serial_data.get("BMS_State", 0)))) 383 csv_data["Wakeup Counter"] = serial_data.get("Wakeup_Counter") 384 csv_data["Reset Flags"] = str(ControlStatusRegister(int(serial_data.get("Reset_Flags", 0)))) 385 csv_data["Git Hash"] = f"0x{serial_data.get('git_hash'):X}" 386 387 # User data 388 for index, user_data in key_iter("userdef_8"): 389 csv_data[f"User Data Byte {index}"] = f"0x{user_data:02X}" 390 for index, user_data in key_iter("userdef_16"): 391 csv_data[f"User Data Word {index}"] = f"0x{user_data:04X}" 392 for index, user_data in key_iter("userdef_32"): 393 csv_data[f"User Data DWord {index}"] = f"0x{user_data:08X}" 394 csv_data["User Data QWord"] = f"0x{serial_data['userdef_64']:016X}" 395 396 return csv_data
Collect data from the serial buffer.
SerialMonitor(packet_buffer_size: int = 4096)
132 def __new__(cls, packet_buffer_size: int = 4096): 133 """Make SerialMonitor a singleton.""" 134 if cls.instance is None: 135 cls.instance = super().__new__(cls) 136 return cls.instance
Make SerialMonitor a singleton.
def
runtime(self):
159 def runtime(self): 160 """Fetch packets as quickly as possible.""" 161 162 class Command(IntEnum): 163 """The commands for decoding.""" 164 165 START = 0x7C 166 ESCAPE = 0x7D 167 END = 0x7E 168 169 def fetch_data() -> bytes: 170 """Attempt to fetch a valid packet.""" 171 packet_data: list[int] = [] 172 packet_size = struct.calcsize(runtime_data_format.to_Python()) + 1 # Data + CRC 173 174 # Fetch until we have enough data 175 self.serial_port.clear_history() 176 while len(packet_data) <= packet_size: 177 if (byte := self.serial_port.consume()) == Command.START: 178 raise PacketError("Packet was truncated by another.") 179 180 if byte == Command.ESCAPE: 181 if self.serial_port.peek() ^ 0x20 in iter(Command): 182 packet_data.append(self.serial_port.consume() ^ 0x20) 183 else: 184 escaped_flags = ", ".join([f"0x{flag ^ 0x20:02X}" for flag in Command]) 185 raise PacketError(f"Escape was 0x{byte:02X}, expected one of {escaped_flags}.") 186 187 elif byte == Command.END: 188 if len(packet_data) < packet_size: 189 raise PacketError(f"Packet was too short, {len(packet_data)}/{packet_size} bytes.") 190 break 191 192 else: 193 packet_data.append(byte) 194 else: 195 raise PacketError(f"Packet was larger than {packet_size} bytes.") 196 197 # Validate CRC 198 if reduce(xor, packet_data): 199 raise PacketError(f"Invalid CRC, 0x{packet_data[-1]:02X}.") 200 return bytes(packet_data[:-1]) # Remove CRC 201 202 def fetch_packet() -> bytes: 203 """Decode serial stream to a valid packet.""" 204 while True: 205 while self.serial_port.consume() != Command.START: # Find start flag 206 logger.write_debug_to_report( 207 "Discarding bytes: " + str(self.serial_port.buffer[: self.serial_port.index]) 208 ) 209 self.serial_port.clear_history() 210 211 try: # Fetch packet data 212 # logger.write_debug_to_report("waiting for data...") 213 result = fetch_data() 214 # logger.write_debug_to_report("got data") 215 return result 216 except PacketError: 217 logger.write_debug_to_report(f"Index: {self.serial_port.index}, Buffer: {self.serial_port.buffer}") 218 self.serial_port.index = 0 219 220 runtime_data = runtime_data_format("runtime_data") 221 while True: 222 decoded_packet = fetch_packet() 223 unpacked_data = struct.unpack(runtime_data_format.to_Python(), decoded_packet) 224 c_variables = runtime_data.set_values(unpacked_data) 225 if len(self.packet_deque) == self.packet_deque.maxlen: # Deque is full 226 self.dropped_packets += 1 227 packet_data = self.create_dictionary(c_variables) | { 228 "encoded_packet": f"0x{Command.START:02X}" 229 f"{bytes(self.serial_port.buffer[:self.serial_port.index]).hex().upper()}", 230 "decoded_packet": f"0x{decoded_packet.hex().upper()}", 231 } 232 233 # Save to file 234 with open(REPORT_PATH, "a", encoding="UTF-8") as csvfile: 235 csv.writer(csvfile).writerow(self.parse_serial_data(packet_data).values()) 236 self.written_packets += 1 237 logger.write_debug_to_report(f"Serial #{self.written_packets}") 238 239 # Save to deque 240 self.packet_deque.append(packet_data) 241 self.latest_packet.append(packet_data) 242 243 time.sleep(0.1) # Packets only come in every 0.5 seconds at most
Fetch packets as quickly as possible.
def
create_dictionary(self, c_variables: 'list[Struct]') -> dict[str, int | bool]:
245 def create_dictionary(self, c_variables: list[Struct]) -> dict[str, int | bool]: 246 """Gather all runtime flags into a dictionary.""" 247 bms_dictionary: dict[str, int | bool] = {} 248 249 for c_variable in c_variables: 250 if "runtime_data.flags.flags" in str(c_variable): 251 for i in range(0, NUM_FLAGS): 252 table = c_variable.index_path[0] # type: ignore[attr-defined] 253 base_key = f"flags.{FLAG_PROC.bit_2_name(table, i)}" 254 255 # Handle duplicates 256 count = 0 257 while (key := f"{base_key}{count or ''}") in bms_dictionary: 258 count += 1 259 bms_dictionary[key] = bool(c_variable.value & (1 << i)) # type: ignore[attr-defined] 260 else: 261 base_key = c_variable.name 262 263 # Handle duplicates 264 count = 0 265 while (key := f"{base_key}{count or ''}") in bms_dictionary: 266 count += 1 267 bms_dictionary[key] = c_variable.value # type: ignore[attr-defined] 268 269 return bms_dictionary
Gather all runtime flags into a dictionary.
def
read( self, latest: bool = True, blocking: bool = True) -> dict[str, int | bool | str] | None:
271 def read(self, latest: bool = True, blocking: bool = True) -> dict[str, int | bool | str] | None: 272 """Fetch serial data from BMS.""" 273 timeout_minutes = 10 274 start = time.perf_counter() 275 read_deque = self.latest_packet if latest else self.packet_deque 276 277 # logger.write_debug_to_report(f"Reading serial {latest=} {blocking=}") 278 while True: 279 with suppress(IndexError): 280 result = read_deque.popleft() 281 # logger.write_debug_to_report(f"Got serial {result=}") 282 return result 283 if not blocking: 284 return None 285 time.sleep(0.1) # Packets only come in every 0.5 seconds at most 286 if time.perf_counter() - start > timeout_minutes * 60: 287 raise TimeoutError(f"Serial reader waited {timeout_minutes} minutes, but got no response.")
Fetch serial data from BMS.
def
get_headers(self) -> list[str]:
289 def get_headers(self) -> list[str]: 290 """Get the headers for the struct.""" 291 runtime_data = runtime_data_format("runtime_data") 292 struct_size = struct.calcsize(runtime_data_format.to_Python()) 293 unpacked_data = struct.unpack(runtime_data_format.to_Python(), bytes(struct_size)) 294 cvars = runtime_data.set_values(unpacked_data) 295 return list(self.create_dictionary(cvars).keys())
Get the headers for the struct.
def
parse_serial_data( self, serial_data: dict[str, int | bool | str]) -> dict[str, str | float | None]:
297 def parse_serial_data(self, serial_data: dict[str, int | bool | str]) -> dict[str, str | float | None]: 298 """Read serial data dict and parse it into human-readable text.""" 299 300 def decode_timestamp(timestamp: int) -> str: 301 """Convert timestamp to elapsed time string.""" 302 seconds, milliseconds = divmod(timestamp, 1000) 303 try: 304 relative_timestamp = datetime.datetime.fromtimestamp( 305 seconds, datetime.timezone.utc 306 ) - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc) 307 except ValueError: 308 return f"0x{timestamp:02X}" 309 return f"{relative_timestamp.total_seconds():.0f}.{milliseconds}" 310 311 def decode_date(iso_date: int) -> str: 312 """Convert date to text.""" 313 try: 314 date = datetime.datetime.strptime(str(iso_date), "%Y%m%d") 315 return date.strftime("%Y-%m-%d") 316 except ValueError: 317 return f"0x{iso_date:08X}" 318 319 def key_iter(key: str) -> Iterator[tuple[int, int | str]]: 320 """Find as many of key as possible. Format is key, key1, key2, etc.""" 321 counter = -1 322 while (data := serial_data.get(f"{key}{(counter := counter + 1) or ''}")) is not None: 323 yield counter, data 324 325 if VERSION != serial_data["version"]: 326 direction = VERSION < int(serial_data["version"]) 327 raise RuntimeError( 328 f"Expected serial version {VERSION}, but BMS was version {serial_data['version']}. Flash " 329 f"a{'n old' if direction else ' new'}er firmware, or {'up' if direction else 'down'}grade the HITL." 330 ) 331 csv_data: dict[str, str | float | None] = {} 332 333 # Raw packet for debugging 334 csv_data["Encoded Packet"] = serial_data.get("encoded_packet") 335 csv_data["Decoded Packet"] = serial_data.get("decoded_packet") 336 337 # Runtime data 338 csv_data["Version"] = serial_data["version"] 339 csv_data["Build Date"] = decode_date(int(serial_data["build_date"])) 340 341 # Raw measurements 342 csv_data["Elapsed Time Raw"] = decode_timestamp(int(serial_data["timestamp"])) 343 for index, current in key_iter("current"): 344 csv_data[f"Current {index} Raw (A)"] = float(current) / 1000 345 for index, voltage_cell in key_iter("voltage_cell"): 346 csv_data[f"Cell {index} Voltage Raw (V)"] = float(voltage_cell) / 1000 347 for index, voltage_terminal in key_iter("voltage_terminal"): 348 csv_data[f"Terminal {index} Voltage Raw (V)"] = float(voltage_terminal) / 1000 349 for index, temperature in key_iter("temperature"): 350 csv_data[f"Temperature {index} Raw (°C)"] = float(temperature) / 10 - 273 351 csv_data["Micro Temperature Raw (°C)"] = float(serial_data["micro_temperature_raw"]) 352 csv_data["Charge Enable GPIO"] = bool(serial_data["ncharge_en_gpio"]) 353 csv_data["Wakeup GPIO"] = bool(serial_data["n_wakeup_gpio"]) 354 355 # Cooked measurements 356 csv_data["Elapsed Time Measurements"] = decode_timestamp(int(serial_data["meas_timestamp"])) 357 csv_data["Elapsed Time Cooked"] = decode_timestamp(int(serial_data["timestamp1"])) 358 csv_data["Current (A)"] = float(serial_data["mamps"]) / 1000 359 for index, mvolt_battery in key_iter("mvolt_battery"): 360 csv_data[f"Battery {index} Voltage (V)"] = float(mvolt_battery) / 1000 361 for index, mvolt_cell in key_iter("mvolt_cell"): 362 csv_data[f"Cell {index} Voltage (V)"] = float(mvolt_cell) / 1000 363 for index, mvolt_terminal in key_iter("mvolt_terminal"): 364 csv_data[f"Terminal {index} Voltage (V)"] = float(mvolt_terminal) / 1000 365 for index, dk_temp in key_iter("dk_temp"): 366 csv_data[f"Temperature {index} (°C)"] = float(dk_temp) / 10 - 273 367 csv_data["Micro Temperature (°C)"] = float(serial_data["micro_temperature_dc"]) / 10 368 369 # Flags 370 csv_data["Flags"] = " | ".join( 371 key.removeprefix("flags.").title() for key in serial_data if key.startswith("flags.") and serial_data[key] 372 ) 373 csv_data["Cell State"] = str(CellState(int(serial_data["cell_state"]))) 374 csv_data["Charge (%)"] = serial_data["percent_charged"] 375 csv_data["Health (%)"] = serial_data["percent_health"] 376 csv_data["Used (Ah)"] = float(serial_data["milliamp_hour_used"]) / 1000 377 csv_data["Remaining (Ah)"] = float(serial_data["milliamp_hour_remaining"]) / 1000 378 csv_data["Charge Cycles"] = serial_data["charge_cycles"] 379 csv_data["Capacity (Ah)"] = float(serial_data["milliamp_hour_capacity"]) / 1000 380 csv_data["SOC Charge Max"] = serial_data["Q_Max"] 381 csv_data["Columb Count"] = serial_data.get("Columb_Count") 382 csv_data["BMS State"] = str(BMSState(int(serial_data.get("BMS_State", 0)))) 383 csv_data["Wakeup Counter"] = serial_data.get("Wakeup_Counter") 384 csv_data["Reset Flags"] = str(ControlStatusRegister(int(serial_data.get("Reset_Flags", 0)))) 385 csv_data["Git Hash"] = f"0x{serial_data.get('git_hash'):X}" 386 387 # User data 388 for index, user_data in key_iter("userdef_8"): 389 csv_data[f"User Data Byte {index}"] = f"0x{user_data:02X}" 390 for index, user_data in key_iter("userdef_16"): 391 csv_data[f"User Data Word {index}"] = f"0x{user_data:04X}" 392 for index, user_data in key_iter("userdef_32"): 393 csv_data[f"User Data DWord {index}"] = f"0x{user_data:08X}" 394 csv_data["User Data QWord"] = f"0x{serial_data['userdef_64']:016X}" 395 396 return csv_data
Read serial data dict and parse it into human-readable text.
serial_monitor =
<SerialMonitor object>