hitl_tester.modules.bms.bms_serial

Gives access to runtime flags

(c) 2020-2024 TurnAround Factor, Inc.

#

CUI DISTRIBUTION CONTROL

Controlled by: DLA J68 R&D SBIP

CUI Category: Small Business Research and Technology

Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS

POC: GOV SBIP Program Manager Denise Price, 571-767-0111

Distribution authorized to U.S. Government Agencies only, to protect information not owned by the

U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that

it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests

for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,

Fort Belvoir, VA 22060-6221

#

SBIR DATA RIGHTS

Contract No.:SP4701-23-C-0083

Contractor Name: TurnAround Factor, Inc.

Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005

Expiration of SBIR Data Rights Period: September 24, 2029

The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer

software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights

in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause

contained in the above identified contract. No restrictions apply after the expiration date shown above. Any

reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce

the markings.

  1"""
  2Gives access to runtime flags
  3
  4# (c) 2020-2024 TurnAround Factor, Inc.
  5#
  6# CUI DISTRIBUTION CONTROL
  7# Controlled by: DLA J68 R&D SBIP
  8# CUI Category: Small Business Research and Technology
  9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111
 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
 15# Fort Belvoir, VA 22060-6221
 16#
 17# SBIR DATA RIGHTS
 18# Contract No.:SP4701-23-C-0083
 19# Contractor Name: TurnAround Factor, Inc.
 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
 21# Expiration of SBIR Data Rights Period: September 24, 2029
 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
 27# the markings.
 28"""
 29
 30from __future__ import annotations
 31
 32import csv
 33import datetime
 34import struct
 35import threading
 36import time
 37from collections import deque
 38from contextlib import suppress
 39from enum import IntEnum
 40from functools import reduce
 41from operator import xor
 42from pathlib import Path
 43from typing import Iterator, ClassVar, cast
 44
 45import pytest
 46import serial
 47import serial.tools.list_ports
 48from typing_extensions import Self
 49
 50from hitl_tester.modules.bms_types import BMSFlags, PacketError, BMSState, ControlStatusRegister, CellState
 51from hitl_tester.modules.logger import logger
 52
 53# Do not initialize if we are generating documentation.
 54if hasattr(pytest, "flags") and isinstance(pytest.flags, BMSFlags):
 55    if not pytest.flags.doc_generation:
 56        from hitl_tester.modules.bms.hitl_data import (
 57            FlagProcessor,
 58            Struct,
 59            runtime_data_format,
 60            FLAG_COLLECTION_SIZE,
 61            FLAGS_SECTION,
 62        )
 63
 64        FLAG_PROC = FlagProcessor(FLAG_COLLECTION_SIZE, FLAGS_SECTION)
 65    REPORT_PATH = Path(f"{pytest.flags.report_filename}_serial.csv")
 66
 67NUM_FLAGS = 63  # Total number of flags to iterate over.
 68VERSION = 8  # Serial version
 69
 70
 71class SerialPort:
 72    """Manager for serial port."""
 73
 74    BAUD = 115200
 75    instance: ClassVar[Self | None] = None
 76
 77    def __new__(cls):
 78        """Make BMS hardware a singleton."""
 79        if cls.instance is None:
 80            cls.instance = super().__new__(cls)
 81        return cls.instance
 82
 83    def __init__(self) -> None:
 84        self.index = 0
 85        self.buffer: list[int] = []
 86
 87        # Get serial port
 88        vid = pid = serial_number = None
 89        if hasattr(pytest, "flags") and isinstance(pytest.flags, BMSFlags):
 90            vid = cast(dict[str, str], pytest.flags.config["bms_serial"])["vendor_id"]
 91            pid = cast(dict[str, str], pytest.flags.config["bms_serial"])["product_id"]
 92            serial_number = cast(dict[str, str], pytest.flags.config["bms_serial"])["serial_id"]
 93        for x in serial.tools.list_ports.comports():
 94            if (x.vid, x.pid, x.serial_number) == (vid, pid, serial_number):
 95                serial_path = x.device
 96                break
 97        else:
 98            raise RuntimeError(f"Serial device (vid:pid:serial) {vid}:{pid}:{serial_number} could not be found.")
 99        self.serial = serial.Serial(port=serial_path, baudrate=SerialPort.BAUD, timeout=0)
100
101    def peek(self):
102        """Fetch the current byte."""
103        while self.serial.in_waiting > 0 or self.index >= len(self.buffer):
104            if self.serial.in_waiting > 0:
105                # logger.write_debug_to_report(f"System serial buffer has {self.serial.in_waiting} bytes.")
106                try:
107                    self.buffer += list(self.serial.read(self.serial.in_waiting))
108                except serial.serialutil.SerialException as e:
109                    logger.write_debug_to_report(f"Got error: {e}")
110                    time.sleep(5)
111            time.sleep(0.1)
112        return self.buffer[self.index]
113
114    def consume(self):
115        """Fetch the current byte and advance."""
116        result = self.peek()
117        self.index += 1
118        return result
119
120    def clear_history(self):
121        """Discard data before the current index."""
122        self.buffer = self.buffer[self.index :]
123        self.index = 0
124
125
126class SerialMonitor:
127    """Collect data from the serial buffer."""
128
129    instance: ClassVar[Self | None] = None
130
131    def __new__(cls, packet_buffer_size: int = 4096):
132        """Make SerialMonitor a singleton."""
133        if cls.instance is None:
134            cls.instance = super().__new__(cls)
135        return cls.instance
136
137    def __init__(self, packet_buffer_size: int = 4096):
138        self.dropped_packets = 0
139        self.written_packets = 1
140        self.latest_packet: deque[dict[str, int | bool | str]] = deque(maxlen=1)
141        self.packet_deque: deque[dict[str, int | bool | str]] = deque(maxlen=packet_buffer_size)
142
143        if (
144            hasattr(pytest, "flags")
145            and isinstance(pytest.flags, BMSFlags)
146            and not (pytest.flags.doc_generation or pytest.flags.dry_run)
147            and "bms_serial" in pytest.flags.config
148        ):
149            # Create log file
150            with open(REPORT_PATH, "a", encoding="UTF-8") as csvfile:
151                csv.writer(csvfile).writerow(self.parse_serial_data(dict.fromkeys(self.get_headers(), VERSION)).keys())
152
153            # Start serial monitor thread
154            self.serial_port = SerialPort()
155            self.daemon = threading.Thread(target=self.runtime, daemon=True)
156            self.daemon.start()
157
158    def runtime(self):
159        """Fetch packets as quickly as possible."""
160
161        class Command(IntEnum):
162            """The commands for decoding."""
163
164            START = 0x7C
165            ESCAPE = 0x7D
166            END = 0x7E
167
168        def fetch_data() -> bytes:
169            """Attempt to fetch a valid packet."""
170            packet_data: list[int] = []
171            packet_size = struct.calcsize(runtime_data_format.to_Python()) + 1  # Data + CRC
172
173            # Fetch until we have enough data
174            self.serial_port.clear_history()
175            while len(packet_data) <= packet_size:
176                if (byte := self.serial_port.consume()) == Command.START:
177                    raise PacketError("Packet was truncated by another.")
178
179                if byte == Command.ESCAPE:
180                    if self.serial_port.peek() ^ 0x20 in iter(Command):
181                        packet_data.append(self.serial_port.consume() ^ 0x20)
182                    else:
183                        escaped_flags = ", ".join([f"0x{flag ^ 0x20:02X}" for flag in Command])
184                        raise PacketError(f"Escape was 0x{byte:02X}, expected one of {escaped_flags}.")
185
186                elif byte == Command.END:
187                    if len(packet_data) < packet_size:
188                        raise PacketError(f"Packet was too short, {len(packet_data)}/{packet_size} bytes.")
189                    break
190
191                else:
192                    packet_data.append(byte)
193            else:
194                raise PacketError(f"Packet was larger than {packet_size} bytes.")
195
196            # Validate CRC
197            if reduce(xor, packet_data):
198                raise PacketError(f"Invalid CRC, 0x{packet_data[-1]:02X}.")
199            return bytes(packet_data[:-1])  # Remove CRC
200
201        def fetch_packet() -> bytes:
202            """Decode serial stream to a valid packet."""
203            while True:
204                while self.serial_port.consume() != Command.START:  # Find start flag
205                    logger.write_debug_to_report(
206                        "Discarding bytes: " + str(self.serial_port.buffer[: self.serial_port.index])
207                    )
208                    self.serial_port.clear_history()
209
210                try:  # Fetch packet data
211                    # logger.write_debug_to_report("waiting for data...")
212                    result = fetch_data()
213                    # logger.write_debug_to_report("got data")
214                    return result
215                except PacketError:
216                    logger.write_debug_to_report(f"Index: {self.serial_port.index}, Buffer: {self.serial_port.buffer}")
217                    self.serial_port.index = 0
218
219        runtime_data = runtime_data_format("runtime_data")
220        while True:
221            decoded_packet = fetch_packet()
222            unpacked_data = struct.unpack(runtime_data_format.to_Python(), decoded_packet)
223            c_variables = runtime_data.set_values(unpacked_data)
224            if len(self.packet_deque) == self.packet_deque.maxlen:  # Deque is full
225                self.dropped_packets += 1
226            packet_data = self.create_dictionary(c_variables) | {
227                "encoded_packet": f"0x{Command.START:02X}"
228                f"{bytes(self.serial_port.buffer[:self.serial_port.index]).hex().upper()}",
229                "decoded_packet": f"0x{decoded_packet.hex().upper()}",
230            }
231
232            # Save to file
233            with open(REPORT_PATH, "a", encoding="UTF-8") as csvfile:
234                csv.writer(csvfile).writerow(self.parse_serial_data(packet_data).values())
235                self.written_packets += 1
236                logger.write_debug_to_report(f"Serial #{self.written_packets}")
237
238            # Save to deque
239            self.packet_deque.append(packet_data)
240            self.latest_packet.append(packet_data)
241
242            time.sleep(0.1)  # Packets only come in every 0.5 seconds at most
243
244    def create_dictionary(self, c_variables: list[Struct]) -> dict[str, int | bool]:
245        """Gather all runtime flags into a dictionary."""
246        bms_dictionary: dict[str, int | bool] = {}
247
248        for c_variable in c_variables:
249            if "runtime_data.flags.flags" in str(c_variable):
250                for i in range(0, NUM_FLAGS):
251                    table = c_variable.index_path[0]  # type: ignore[attr-defined]
252                    base_key = f"flags.{FLAG_PROC.bit_2_name(table, i)}"
253
254                    # Handle duplicates
255                    count = 0
256                    while (key := f"{base_key}{count or ''}") in bms_dictionary:
257                        count += 1
258                    bms_dictionary[key] = bool(c_variable.value & (1 << i))  # type: ignore[attr-defined]
259            else:
260                base_key = c_variable.name
261
262                # Handle duplicates
263                count = 0
264                while (key := f"{base_key}{count or ''}") in bms_dictionary:
265                    count += 1
266                bms_dictionary[key] = c_variable.value  # type: ignore[attr-defined]
267
268        return bms_dictionary
269
270    def read(self, latest: bool = True, blocking: bool = True) -> dict[str, int | bool | str] | None:
271        """Fetch serial data from BMS."""
272        timeout_minutes = 10
273        start = time.perf_counter()
274        read_deque = self.latest_packet if latest else self.packet_deque
275
276        # logger.write_debug_to_report(f"Reading serial {latest=} {blocking=}")
277        while True:
278            with suppress(IndexError):
279                result = read_deque.popleft()
280                # logger.write_debug_to_report(f"Got serial {result=}")
281                return result
282            if not blocking:
283                return None
284            time.sleep(0.1)  # Packets only come in every 0.5 seconds at most
285            if time.perf_counter() - start > timeout_minutes * 60:
286                raise TimeoutError(f"Serial reader waited {timeout_minutes} minutes, but got no response.")
287
288    def get_headers(self) -> list[str]:
289        """Get the headers for the struct."""
290        runtime_data = runtime_data_format("runtime_data")
291        struct_size = struct.calcsize(runtime_data_format.to_Python())
292        unpacked_data = struct.unpack(runtime_data_format.to_Python(), bytes(struct_size))
293        cvars = runtime_data.set_values(unpacked_data)
294        return list(self.create_dictionary(cvars).keys())
295
296    def parse_serial_data(self, serial_data: dict[str, int | bool | str]) -> dict[str, str | float | None]:
297        """Read serial data dict and parse it into human-readable text."""
298
299        def decode_timestamp(timestamp: int) -> str:
300            """Convert timestamp to elapsed time string."""
301            seconds, milliseconds = divmod(timestamp, 1000)
302            try:
303                relative_timestamp = datetime.datetime.fromtimestamp(
304                    seconds, datetime.timezone.utc
305                ) - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc)
306            except ValueError:
307                return f"0x{timestamp:02X}"
308            return f"{relative_timestamp.total_seconds():.0f}.{milliseconds}"
309
310        def decode_date(iso_date: int) -> str:
311            """Convert date to text."""
312            try:
313                date = datetime.datetime.strptime(str(iso_date), "%Y%m%d")
314                return date.strftime("%Y-%m-%d")
315            except ValueError:
316                return f"0x{iso_date:08X}"
317
318        def key_iter(key: str) -> Iterator[tuple[int, int | str]]:
319            """Find as many of key as possible. Format is key, key1, key2, etc."""
320            counter = -1
321            while (data := serial_data.get(f"{key}{(counter := counter + 1) or ''}")) is not None:
322                yield counter, data
323
324        if VERSION != serial_data["version"]:
325            direction = VERSION < int(serial_data["version"])
326            raise RuntimeError(
327                f"Expected serial version {VERSION}, but BMS was version {serial_data['version']}. Flash "
328                f"a{'n old' if direction else ' new'}er firmware, or {'up' if direction else 'down'}grade the HITL."
329            )
330        csv_data: dict[str, str | float | None] = {}
331
332        # Raw packet for debugging
333        csv_data["Encoded Packet"] = serial_data.get("encoded_packet")
334        csv_data["Decoded Packet"] = serial_data.get("decoded_packet")
335
336        # Runtime data
337        csv_data["Version"] = serial_data["version"]
338        csv_data["Build Date"] = decode_date(int(serial_data["build_date"]))
339
340        # Raw measurements
341        csv_data["Elapsed Time Raw"] = decode_timestamp(int(serial_data["timestamp"]))
342        for index, current in key_iter("current"):
343            csv_data[f"Current {index} Raw (A)"] = float(current) / 1000
344        for index, voltage_cell in key_iter("voltage_cell"):
345            csv_data[f"Cell {index} Voltage Raw (V)"] = float(voltage_cell) / 1000
346        for index, voltage_terminal in key_iter("voltage_terminal"):
347            csv_data[f"Terminal {index} Voltage Raw (V)"] = float(voltage_terminal) / 1000
348        for index, temperature in key_iter("temperature"):
349            csv_data[f"Temperature {index} Raw (°C)"] = float(temperature) / 10 - 273
350        csv_data["Micro Temperature Raw (°C)"] = float(serial_data["micro_temperature_raw"])
351        csv_data["Charge Enable GPIO"] = bool(serial_data["ncharge_en_gpio"])
352        csv_data["Wakeup GPIO"] = bool(serial_data["n_wakeup_gpio"])
353
354        # Cooked measurements
355        csv_data["Elapsed Time Measurements"] = decode_timestamp(int(serial_data["meas_timestamp"]))
356        csv_data["Elapsed Time Cooked"] = decode_timestamp(int(serial_data["timestamp1"]))
357        csv_data["Current (A)"] = float(serial_data["mamps"]) / 1000
358        for index, mvolt_battery in key_iter("mvolt_battery"):
359            csv_data[f"Battery {index} Voltage (V)"] = float(mvolt_battery) / 1000
360        for index, mvolt_cell in key_iter("mvolt_cell"):
361            csv_data[f"Cell {index} Voltage (V)"] = float(mvolt_cell) / 1000
362        for index, mvolt_terminal in key_iter("mvolt_terminal"):
363            csv_data[f"Terminal {index} Voltage (V)"] = float(mvolt_terminal) / 1000
364        for index, dk_temp in key_iter("dk_temp"):
365            csv_data[f"Temperature {index} (°C)"] = float(dk_temp) / 10 - 273
366        csv_data["Micro Temperature (°C)"] = float(serial_data["micro_temperature_dc"]) / 10
367
368        # Flags
369        csv_data["Flags"] = " | ".join(
370            key.removeprefix("flags.").title() for key in serial_data if key.startswith("flags.") and serial_data[key]
371        )
372        csv_data["Cell State"] = str(CellState(int(serial_data["cell_state"])))
373        csv_data["Charge (%)"] = serial_data["percent_charged"]
374        csv_data["Health (%)"] = serial_data["percent_health"]
375        csv_data["Used (Ah)"] = float(serial_data["milliamp_hour_used"]) / 1000
376        csv_data["Remaining (Ah)"] = float(serial_data["milliamp_hour_remaining"]) / 1000
377        csv_data["Charge Cycles"] = serial_data["charge_cycles"]
378        csv_data["Capacity (Ah)"] = float(serial_data["milliamp_hour_capacity"]) / 1000
379        csv_data["SOC Charge Max"] = serial_data["Q_Max"]
380        csv_data["Columb Count"] = serial_data.get("Columb_Count")
381        csv_data["BMS State"] = str(BMSState(int(serial_data.get("BMS_State", 0))))
382        csv_data["Wakeup Counter"] = serial_data.get("Wakeup_Counter")
383        csv_data["Reset Flags"] = str(ControlStatusRegister(int(serial_data.get("Reset_Flags", 0))))
384        csv_data["Git Hash"] = f"0x{serial_data.get('git_hash'):X}"
385
386        # User data
387        for index, user_data in key_iter("userdef_8"):
388            csv_data[f"User Data Byte {index}"] = f"0x{user_data:02X}"
389        for index, user_data in key_iter("userdef_16"):
390            csv_data[f"User Data Word {index}"] = f"0x{user_data:04X}"
391        for index, user_data in key_iter("userdef_32"):
392            csv_data[f"User Data DWord {index}"] = f"0x{user_data:08X}"
393        csv_data["User Data QWord"] = f"0x{serial_data['userdef_64']:016X}"
394
395        return csv_data
396
397
398serial_monitor = SerialMonitor()
NUM_FLAGS = 63
VERSION = 8
class SerialPort:
 72class SerialPort:
 73    """Manager for serial port."""
 74
 75    BAUD = 115200
 76    instance: ClassVar[Self | None] = None
 77
 78    def __new__(cls):
 79        """Make BMS hardware a singleton."""
 80        if cls.instance is None:
 81            cls.instance = super().__new__(cls)
 82        return cls.instance
 83
 84    def __init__(self) -> None:
 85        self.index = 0
 86        self.buffer: list[int] = []
 87
 88        # Get serial port
 89        vid = pid = serial_number = None
 90        if hasattr(pytest, "flags") and isinstance(pytest.flags, BMSFlags):
 91            vid = cast(dict[str, str], pytest.flags.config["bms_serial"])["vendor_id"]
 92            pid = cast(dict[str, str], pytest.flags.config["bms_serial"])["product_id"]
 93            serial_number = cast(dict[str, str], pytest.flags.config["bms_serial"])["serial_id"]
 94        for x in serial.tools.list_ports.comports():
 95            if (x.vid, x.pid, x.serial_number) == (vid, pid, serial_number):
 96                serial_path = x.device
 97                break
 98        else:
 99            raise RuntimeError(f"Serial device (vid:pid:serial) {vid}:{pid}:{serial_number} could not be found.")
100        self.serial = serial.Serial(port=serial_path, baudrate=SerialPort.BAUD, timeout=0)
101
102    def peek(self):
103        """Fetch the current byte."""
104        while self.serial.in_waiting > 0 or self.index >= len(self.buffer):
105            if self.serial.in_waiting > 0:
106                # logger.write_debug_to_report(f"System serial buffer has {self.serial.in_waiting} bytes.")
107                try:
108                    self.buffer += list(self.serial.read(self.serial.in_waiting))
109                except serial.serialutil.SerialException as e:
110                    logger.write_debug_to_report(f"Got error: {e}")
111                    time.sleep(5)
112            time.sleep(0.1)
113        return self.buffer[self.index]
114
115    def consume(self):
116        """Fetch the current byte and advance."""
117        result = self.peek()
118        self.index += 1
119        return result
120
121    def clear_history(self):
122        """Discard data before the current index."""
123        self.buffer = self.buffer[self.index :]
124        self.index = 0

Manager for serial port.

SerialPort()
78    def __new__(cls):
79        """Make BMS hardware a singleton."""
80        if cls.instance is None:
81            cls.instance = super().__new__(cls)
82        return cls.instance

Make BMS hardware a singleton.

BAUD = 115200
instance: ClassVar[Optional[Self]] = None
index
buffer: list[int]
serial
def peek(self):
102    def peek(self):
103        """Fetch the current byte."""
104        while self.serial.in_waiting > 0 or self.index >= len(self.buffer):
105            if self.serial.in_waiting > 0:
106                # logger.write_debug_to_report(f"System serial buffer has {self.serial.in_waiting} bytes.")
107                try:
108                    self.buffer += list(self.serial.read(self.serial.in_waiting))
109                except serial.serialutil.SerialException as e:
110                    logger.write_debug_to_report(f"Got error: {e}")
111                    time.sleep(5)
112            time.sleep(0.1)
113        return self.buffer[self.index]

Fetch the current byte.

def consume(self):
115    def consume(self):
116        """Fetch the current byte and advance."""
117        result = self.peek()
118        self.index += 1
119        return result

Fetch the current byte and advance.

def clear_history(self):
121    def clear_history(self):
122        """Discard data before the current index."""
123        self.buffer = self.buffer[self.index :]
124        self.index = 0

Discard data before the current index.

class SerialMonitor:
127class SerialMonitor:
128    """Collect data from the serial buffer."""
129
130    instance: ClassVar[Self | None] = None
131
132    def __new__(cls, packet_buffer_size: int = 4096):
133        """Make SerialMonitor a singleton."""
134        if cls.instance is None:
135            cls.instance = super().__new__(cls)
136        return cls.instance
137
138    def __init__(self, packet_buffer_size: int = 4096):
139        self.dropped_packets = 0
140        self.written_packets = 1
141        self.latest_packet: deque[dict[str, int | bool | str]] = deque(maxlen=1)
142        self.packet_deque: deque[dict[str, int | bool | str]] = deque(maxlen=packet_buffer_size)
143
144        if (
145            hasattr(pytest, "flags")
146            and isinstance(pytest.flags, BMSFlags)
147            and not (pytest.flags.doc_generation or pytest.flags.dry_run)
148            and "bms_serial" in pytest.flags.config
149        ):
150            # Create log file
151            with open(REPORT_PATH, "a", encoding="UTF-8") as csvfile:
152                csv.writer(csvfile).writerow(self.parse_serial_data(dict.fromkeys(self.get_headers(), VERSION)).keys())
153
154            # Start serial monitor thread
155            self.serial_port = SerialPort()
156            self.daemon = threading.Thread(target=self.runtime, daemon=True)
157            self.daemon.start()
158
159    def runtime(self):
160        """Fetch packets as quickly as possible."""
161
162        class Command(IntEnum):
163            """The commands for decoding."""
164
165            START = 0x7C
166            ESCAPE = 0x7D
167            END = 0x7E
168
169        def fetch_data() -> bytes:
170            """Attempt to fetch a valid packet."""
171            packet_data: list[int] = []
172            packet_size = struct.calcsize(runtime_data_format.to_Python()) + 1  # Data + CRC
173
174            # Fetch until we have enough data
175            self.serial_port.clear_history()
176            while len(packet_data) <= packet_size:
177                if (byte := self.serial_port.consume()) == Command.START:
178                    raise PacketError("Packet was truncated by another.")
179
180                if byte == Command.ESCAPE:
181                    if self.serial_port.peek() ^ 0x20 in iter(Command):
182                        packet_data.append(self.serial_port.consume() ^ 0x20)
183                    else:
184                        escaped_flags = ", ".join([f"0x{flag ^ 0x20:02X}" for flag in Command])
185                        raise PacketError(f"Escape was 0x{byte:02X}, expected one of {escaped_flags}.")
186
187                elif byte == Command.END:
188                    if len(packet_data) < packet_size:
189                        raise PacketError(f"Packet was too short, {len(packet_data)}/{packet_size} bytes.")
190                    break
191
192                else:
193                    packet_data.append(byte)
194            else:
195                raise PacketError(f"Packet was larger than {packet_size} bytes.")
196
197            # Validate CRC
198            if reduce(xor, packet_data):
199                raise PacketError(f"Invalid CRC, 0x{packet_data[-1]:02X}.")
200            return bytes(packet_data[:-1])  # Remove CRC
201
202        def fetch_packet() -> bytes:
203            """Decode serial stream to a valid packet."""
204            while True:
205                while self.serial_port.consume() != Command.START:  # Find start flag
206                    logger.write_debug_to_report(
207                        "Discarding bytes: " + str(self.serial_port.buffer[: self.serial_port.index])
208                    )
209                    self.serial_port.clear_history()
210
211                try:  # Fetch packet data
212                    # logger.write_debug_to_report("waiting for data...")
213                    result = fetch_data()
214                    # logger.write_debug_to_report("got data")
215                    return result
216                except PacketError:
217                    logger.write_debug_to_report(f"Index: {self.serial_port.index}, Buffer: {self.serial_port.buffer}")
218                    self.serial_port.index = 0
219
220        runtime_data = runtime_data_format("runtime_data")
221        while True:
222            decoded_packet = fetch_packet()
223            unpacked_data = struct.unpack(runtime_data_format.to_Python(), decoded_packet)
224            c_variables = runtime_data.set_values(unpacked_data)
225            if len(self.packet_deque) == self.packet_deque.maxlen:  # Deque is full
226                self.dropped_packets += 1
227            packet_data = self.create_dictionary(c_variables) | {
228                "encoded_packet": f"0x{Command.START:02X}"
229                f"{bytes(self.serial_port.buffer[:self.serial_port.index]).hex().upper()}",
230                "decoded_packet": f"0x{decoded_packet.hex().upper()}",
231            }
232
233            # Save to file
234            with open(REPORT_PATH, "a", encoding="UTF-8") as csvfile:
235                csv.writer(csvfile).writerow(self.parse_serial_data(packet_data).values())
236                self.written_packets += 1
237                logger.write_debug_to_report(f"Serial #{self.written_packets}")
238
239            # Save to deque
240            self.packet_deque.append(packet_data)
241            self.latest_packet.append(packet_data)
242
243            time.sleep(0.1)  # Packets only come in every 0.5 seconds at most
244
245    def create_dictionary(self, c_variables: list[Struct]) -> dict[str, int | bool]:
246        """Gather all runtime flags into a dictionary."""
247        bms_dictionary: dict[str, int | bool] = {}
248
249        for c_variable in c_variables:
250            if "runtime_data.flags.flags" in str(c_variable):
251                for i in range(0, NUM_FLAGS):
252                    table = c_variable.index_path[0]  # type: ignore[attr-defined]
253                    base_key = f"flags.{FLAG_PROC.bit_2_name(table, i)}"
254
255                    # Handle duplicates
256                    count = 0
257                    while (key := f"{base_key}{count or ''}") in bms_dictionary:
258                        count += 1
259                    bms_dictionary[key] = bool(c_variable.value & (1 << i))  # type: ignore[attr-defined]
260            else:
261                base_key = c_variable.name
262
263                # Handle duplicates
264                count = 0
265                while (key := f"{base_key}{count or ''}") in bms_dictionary:
266                    count += 1
267                bms_dictionary[key] = c_variable.value  # type: ignore[attr-defined]
268
269        return bms_dictionary
270
271    def read(self, latest: bool = True, blocking: bool = True) -> dict[str, int | bool | str] | None:
272        """Fetch serial data from BMS."""
273        timeout_minutes = 10
274        start = time.perf_counter()
275        read_deque = self.latest_packet if latest else self.packet_deque
276
277        # logger.write_debug_to_report(f"Reading serial {latest=} {blocking=}")
278        while True:
279            with suppress(IndexError):
280                result = read_deque.popleft()
281                # logger.write_debug_to_report(f"Got serial {result=}")
282                return result
283            if not blocking:
284                return None
285            time.sleep(0.1)  # Packets only come in every 0.5 seconds at most
286            if time.perf_counter() - start > timeout_minutes * 60:
287                raise TimeoutError(f"Serial reader waited {timeout_minutes} minutes, but got no response.")
288
289    def get_headers(self) -> list[str]:
290        """Get the headers for the struct."""
291        runtime_data = runtime_data_format("runtime_data")
292        struct_size = struct.calcsize(runtime_data_format.to_Python())
293        unpacked_data = struct.unpack(runtime_data_format.to_Python(), bytes(struct_size))
294        cvars = runtime_data.set_values(unpacked_data)
295        return list(self.create_dictionary(cvars).keys())
296
297    def parse_serial_data(self, serial_data: dict[str, int | bool | str]) -> dict[str, str | float | None]:
298        """Read serial data dict and parse it into human-readable text."""
299
300        def decode_timestamp(timestamp: int) -> str:
301            """Convert timestamp to elapsed time string."""
302            seconds, milliseconds = divmod(timestamp, 1000)
303            try:
304                relative_timestamp = datetime.datetime.fromtimestamp(
305                    seconds, datetime.timezone.utc
306                ) - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc)
307            except ValueError:
308                return f"0x{timestamp:02X}"
309            return f"{relative_timestamp.total_seconds():.0f}.{milliseconds}"
310
311        def decode_date(iso_date: int) -> str:
312            """Convert date to text."""
313            try:
314                date = datetime.datetime.strptime(str(iso_date), "%Y%m%d")
315                return date.strftime("%Y-%m-%d")
316            except ValueError:
317                return f"0x{iso_date:08X}"
318
319        def key_iter(key: str) -> Iterator[tuple[int, int | str]]:
320            """Find as many of key as possible. Format is key, key1, key2, etc."""
321            counter = -1
322            while (data := serial_data.get(f"{key}{(counter := counter + 1) or ''}")) is not None:
323                yield counter, data
324
325        if VERSION != serial_data["version"]:
326            direction = VERSION < int(serial_data["version"])
327            raise RuntimeError(
328                f"Expected serial version {VERSION}, but BMS was version {serial_data['version']}. Flash "
329                f"a{'n old' if direction else ' new'}er firmware, or {'up' if direction else 'down'}grade the HITL."
330            )
331        csv_data: dict[str, str | float | None] = {}
332
333        # Raw packet for debugging
334        csv_data["Encoded Packet"] = serial_data.get("encoded_packet")
335        csv_data["Decoded Packet"] = serial_data.get("decoded_packet")
336
337        # Runtime data
338        csv_data["Version"] = serial_data["version"]
339        csv_data["Build Date"] = decode_date(int(serial_data["build_date"]))
340
341        # Raw measurements
342        csv_data["Elapsed Time Raw"] = decode_timestamp(int(serial_data["timestamp"]))
343        for index, current in key_iter("current"):
344            csv_data[f"Current {index} Raw (A)"] = float(current) / 1000
345        for index, voltage_cell in key_iter("voltage_cell"):
346            csv_data[f"Cell {index} Voltage Raw (V)"] = float(voltage_cell) / 1000
347        for index, voltage_terminal in key_iter("voltage_terminal"):
348            csv_data[f"Terminal {index} Voltage Raw (V)"] = float(voltage_terminal) / 1000
349        for index, temperature in key_iter("temperature"):
350            csv_data[f"Temperature {index} Raw (°C)"] = float(temperature) / 10 - 273
351        csv_data["Micro Temperature Raw (°C)"] = float(serial_data["micro_temperature_raw"])
352        csv_data["Charge Enable GPIO"] = bool(serial_data["ncharge_en_gpio"])
353        csv_data["Wakeup GPIO"] = bool(serial_data["n_wakeup_gpio"])
354
355        # Cooked measurements
356        csv_data["Elapsed Time Measurements"] = decode_timestamp(int(serial_data["meas_timestamp"]))
357        csv_data["Elapsed Time Cooked"] = decode_timestamp(int(serial_data["timestamp1"]))
358        csv_data["Current (A)"] = float(serial_data["mamps"]) / 1000
359        for index, mvolt_battery in key_iter("mvolt_battery"):
360            csv_data[f"Battery {index} Voltage (V)"] = float(mvolt_battery) / 1000
361        for index, mvolt_cell in key_iter("mvolt_cell"):
362            csv_data[f"Cell {index} Voltage (V)"] = float(mvolt_cell) / 1000
363        for index, mvolt_terminal in key_iter("mvolt_terminal"):
364            csv_data[f"Terminal {index} Voltage (V)"] = float(mvolt_terminal) / 1000
365        for index, dk_temp in key_iter("dk_temp"):
366            csv_data[f"Temperature {index} (°C)"] = float(dk_temp) / 10 - 273
367        csv_data["Micro Temperature (°C)"] = float(serial_data["micro_temperature_dc"]) / 10
368
369        # Flags
370        csv_data["Flags"] = " | ".join(
371            key.removeprefix("flags.").title() for key in serial_data if key.startswith("flags.") and serial_data[key]
372        )
373        csv_data["Cell State"] = str(CellState(int(serial_data["cell_state"])))
374        csv_data["Charge (%)"] = serial_data["percent_charged"]
375        csv_data["Health (%)"] = serial_data["percent_health"]
376        csv_data["Used (Ah)"] = float(serial_data["milliamp_hour_used"]) / 1000
377        csv_data["Remaining (Ah)"] = float(serial_data["milliamp_hour_remaining"]) / 1000
378        csv_data["Charge Cycles"] = serial_data["charge_cycles"]
379        csv_data["Capacity (Ah)"] = float(serial_data["milliamp_hour_capacity"]) / 1000
380        csv_data["SOC Charge Max"] = serial_data["Q_Max"]
381        csv_data["Columb Count"] = serial_data.get("Columb_Count")
382        csv_data["BMS State"] = str(BMSState(int(serial_data.get("BMS_State", 0))))
383        csv_data["Wakeup Counter"] = serial_data.get("Wakeup_Counter")
384        csv_data["Reset Flags"] = str(ControlStatusRegister(int(serial_data.get("Reset_Flags", 0))))
385        csv_data["Git Hash"] = f"0x{serial_data.get('git_hash'):X}"
386
387        # User data
388        for index, user_data in key_iter("userdef_8"):
389            csv_data[f"User Data Byte {index}"] = f"0x{user_data:02X}"
390        for index, user_data in key_iter("userdef_16"):
391            csv_data[f"User Data Word {index}"] = f"0x{user_data:04X}"
392        for index, user_data in key_iter("userdef_32"):
393            csv_data[f"User Data DWord {index}"] = f"0x{user_data:08X}"
394        csv_data["User Data QWord"] = f"0x{serial_data['userdef_64']:016X}"
395
396        return csv_data

Collect data from the serial buffer.

SerialMonitor(packet_buffer_size: int = 4096)
132    def __new__(cls, packet_buffer_size: int = 4096):
133        """Make SerialMonitor a singleton."""
134        if cls.instance is None:
135            cls.instance = super().__new__(cls)
136        return cls.instance

Make SerialMonitor a singleton.

instance: ClassVar[Optional[Self]] = <SerialMonitor object>
dropped_packets
written_packets
latest_packet: collections.deque[dict[str, int | bool | str]]
packet_deque: collections.deque[dict[str, int | bool | str]]
def runtime(self):
159    def runtime(self):
160        """Fetch packets as quickly as possible."""
161
162        class Command(IntEnum):
163            """The commands for decoding."""
164
165            START = 0x7C
166            ESCAPE = 0x7D
167            END = 0x7E
168
169        def fetch_data() -> bytes:
170            """Attempt to fetch a valid packet."""
171            packet_data: list[int] = []
172            packet_size = struct.calcsize(runtime_data_format.to_Python()) + 1  # Data + CRC
173
174            # Fetch until we have enough data
175            self.serial_port.clear_history()
176            while len(packet_data) <= packet_size:
177                if (byte := self.serial_port.consume()) == Command.START:
178                    raise PacketError("Packet was truncated by another.")
179
180                if byte == Command.ESCAPE:
181                    if self.serial_port.peek() ^ 0x20 in iter(Command):
182                        packet_data.append(self.serial_port.consume() ^ 0x20)
183                    else:
184                        escaped_flags = ", ".join([f"0x{flag ^ 0x20:02X}" for flag in Command])
185                        raise PacketError(f"Escape was 0x{byte:02X}, expected one of {escaped_flags}.")
186
187                elif byte == Command.END:
188                    if len(packet_data) < packet_size:
189                        raise PacketError(f"Packet was too short, {len(packet_data)}/{packet_size} bytes.")
190                    break
191
192                else:
193                    packet_data.append(byte)
194            else:
195                raise PacketError(f"Packet was larger than {packet_size} bytes.")
196
197            # Validate CRC
198            if reduce(xor, packet_data):
199                raise PacketError(f"Invalid CRC, 0x{packet_data[-1]:02X}.")
200            return bytes(packet_data[:-1])  # Remove CRC
201
202        def fetch_packet() -> bytes:
203            """Decode serial stream to a valid packet."""
204            while True:
205                while self.serial_port.consume() != Command.START:  # Find start flag
206                    logger.write_debug_to_report(
207                        "Discarding bytes: " + str(self.serial_port.buffer[: self.serial_port.index])
208                    )
209                    self.serial_port.clear_history()
210
211                try:  # Fetch packet data
212                    # logger.write_debug_to_report("waiting for data...")
213                    result = fetch_data()
214                    # logger.write_debug_to_report("got data")
215                    return result
216                except PacketError:
217                    logger.write_debug_to_report(f"Index: {self.serial_port.index}, Buffer: {self.serial_port.buffer}")
218                    self.serial_port.index = 0
219
220        runtime_data = runtime_data_format("runtime_data")
221        while True:
222            decoded_packet = fetch_packet()
223            unpacked_data = struct.unpack(runtime_data_format.to_Python(), decoded_packet)
224            c_variables = runtime_data.set_values(unpacked_data)
225            if len(self.packet_deque) == self.packet_deque.maxlen:  # Deque is full
226                self.dropped_packets += 1
227            packet_data = self.create_dictionary(c_variables) | {
228                "encoded_packet": f"0x{Command.START:02X}"
229                f"{bytes(self.serial_port.buffer[:self.serial_port.index]).hex().upper()}",
230                "decoded_packet": f"0x{decoded_packet.hex().upper()}",
231            }
232
233            # Save to file
234            with open(REPORT_PATH, "a", encoding="UTF-8") as csvfile:
235                csv.writer(csvfile).writerow(self.parse_serial_data(packet_data).values())
236                self.written_packets += 1
237                logger.write_debug_to_report(f"Serial #{self.written_packets}")
238
239            # Save to deque
240            self.packet_deque.append(packet_data)
241            self.latest_packet.append(packet_data)
242
243            time.sleep(0.1)  # Packets only come in every 0.5 seconds at most

Fetch packets as quickly as possible.

def create_dictionary(self, c_variables: 'list[Struct]') -> dict[str, int | bool]:
245    def create_dictionary(self, c_variables: list[Struct]) -> dict[str, int | bool]:
246        """Gather all runtime flags into a dictionary."""
247        bms_dictionary: dict[str, int | bool] = {}
248
249        for c_variable in c_variables:
250            if "runtime_data.flags.flags" in str(c_variable):
251                for i in range(0, NUM_FLAGS):
252                    table = c_variable.index_path[0]  # type: ignore[attr-defined]
253                    base_key = f"flags.{FLAG_PROC.bit_2_name(table, i)}"
254
255                    # Handle duplicates
256                    count = 0
257                    while (key := f"{base_key}{count or ''}") in bms_dictionary:
258                        count += 1
259                    bms_dictionary[key] = bool(c_variable.value & (1 << i))  # type: ignore[attr-defined]
260            else:
261                base_key = c_variable.name
262
263                # Handle duplicates
264                count = 0
265                while (key := f"{base_key}{count or ''}") in bms_dictionary:
266                    count += 1
267                bms_dictionary[key] = c_variable.value  # type: ignore[attr-defined]
268
269        return bms_dictionary

Gather all runtime flags into a dictionary.

def read( self, latest: bool = True, blocking: bool = True) -> dict[str, int | bool | str] | None:
271    def read(self, latest: bool = True, blocking: bool = True) -> dict[str, int | bool | str] | None:
272        """Fetch serial data from BMS."""
273        timeout_minutes = 10
274        start = time.perf_counter()
275        read_deque = self.latest_packet if latest else self.packet_deque
276
277        # logger.write_debug_to_report(f"Reading serial {latest=} {blocking=}")
278        while True:
279            with suppress(IndexError):
280                result = read_deque.popleft()
281                # logger.write_debug_to_report(f"Got serial {result=}")
282                return result
283            if not blocking:
284                return None
285            time.sleep(0.1)  # Packets only come in every 0.5 seconds at most
286            if time.perf_counter() - start > timeout_minutes * 60:
287                raise TimeoutError(f"Serial reader waited {timeout_minutes} minutes, but got no response.")

Fetch serial data from BMS.

def get_headers(self) -> list[str]:
289    def get_headers(self) -> list[str]:
290        """Get the headers for the struct."""
291        runtime_data = runtime_data_format("runtime_data")
292        struct_size = struct.calcsize(runtime_data_format.to_Python())
293        unpacked_data = struct.unpack(runtime_data_format.to_Python(), bytes(struct_size))
294        cvars = runtime_data.set_values(unpacked_data)
295        return list(self.create_dictionary(cvars).keys())

Get the headers for the struct.

def parse_serial_data( self, serial_data: dict[str, int | bool | str]) -> dict[str, str | float | None]:
297    def parse_serial_data(self, serial_data: dict[str, int | bool | str]) -> dict[str, str | float | None]:
298        """Read serial data dict and parse it into human-readable text."""
299
300        def decode_timestamp(timestamp: int) -> str:
301            """Convert timestamp to elapsed time string."""
302            seconds, milliseconds = divmod(timestamp, 1000)
303            try:
304                relative_timestamp = datetime.datetime.fromtimestamp(
305                    seconds, datetime.timezone.utc
306                ) - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc)
307            except ValueError:
308                return f"0x{timestamp:02X}"
309            return f"{relative_timestamp.total_seconds():.0f}.{milliseconds}"
310
311        def decode_date(iso_date: int) -> str:
312            """Convert date to text."""
313            try:
314                date = datetime.datetime.strptime(str(iso_date), "%Y%m%d")
315                return date.strftime("%Y-%m-%d")
316            except ValueError:
317                return f"0x{iso_date:08X}"
318
319        def key_iter(key: str) -> Iterator[tuple[int, int | str]]:
320            """Find as many of key as possible. Format is key, key1, key2, etc."""
321            counter = -1
322            while (data := serial_data.get(f"{key}{(counter := counter + 1) or ''}")) is not None:
323                yield counter, data
324
325        if VERSION != serial_data["version"]:
326            direction = VERSION < int(serial_data["version"])
327            raise RuntimeError(
328                f"Expected serial version {VERSION}, but BMS was version {serial_data['version']}. Flash "
329                f"a{'n old' if direction else ' new'}er firmware, or {'up' if direction else 'down'}grade the HITL."
330            )
331        csv_data: dict[str, str | float | None] = {}
332
333        # Raw packet for debugging
334        csv_data["Encoded Packet"] = serial_data.get("encoded_packet")
335        csv_data["Decoded Packet"] = serial_data.get("decoded_packet")
336
337        # Runtime data
338        csv_data["Version"] = serial_data["version"]
339        csv_data["Build Date"] = decode_date(int(serial_data["build_date"]))
340
341        # Raw measurements
342        csv_data["Elapsed Time Raw"] = decode_timestamp(int(serial_data["timestamp"]))
343        for index, current in key_iter("current"):
344            csv_data[f"Current {index} Raw (A)"] = float(current) / 1000
345        for index, voltage_cell in key_iter("voltage_cell"):
346            csv_data[f"Cell {index} Voltage Raw (V)"] = float(voltage_cell) / 1000
347        for index, voltage_terminal in key_iter("voltage_terminal"):
348            csv_data[f"Terminal {index} Voltage Raw (V)"] = float(voltage_terminal) / 1000
349        for index, temperature in key_iter("temperature"):
350            csv_data[f"Temperature {index} Raw (°C)"] = float(temperature) / 10 - 273
351        csv_data["Micro Temperature Raw (°C)"] = float(serial_data["micro_temperature_raw"])
352        csv_data["Charge Enable GPIO"] = bool(serial_data["ncharge_en_gpio"])
353        csv_data["Wakeup GPIO"] = bool(serial_data["n_wakeup_gpio"])
354
355        # Cooked measurements
356        csv_data["Elapsed Time Measurements"] = decode_timestamp(int(serial_data["meas_timestamp"]))
357        csv_data["Elapsed Time Cooked"] = decode_timestamp(int(serial_data["timestamp1"]))
358        csv_data["Current (A)"] = float(serial_data["mamps"]) / 1000
359        for index, mvolt_battery in key_iter("mvolt_battery"):
360            csv_data[f"Battery {index} Voltage (V)"] = float(mvolt_battery) / 1000
361        for index, mvolt_cell in key_iter("mvolt_cell"):
362            csv_data[f"Cell {index} Voltage (V)"] = float(mvolt_cell) / 1000
363        for index, mvolt_terminal in key_iter("mvolt_terminal"):
364            csv_data[f"Terminal {index} Voltage (V)"] = float(mvolt_terminal) / 1000
365        for index, dk_temp in key_iter("dk_temp"):
366            csv_data[f"Temperature {index} (°C)"] = float(dk_temp) / 10 - 273
367        csv_data["Micro Temperature (°C)"] = float(serial_data["micro_temperature_dc"]) / 10
368
369        # Flags
370        csv_data["Flags"] = " | ".join(
371            key.removeprefix("flags.").title() for key in serial_data if key.startswith("flags.") and serial_data[key]
372        )
373        csv_data["Cell State"] = str(CellState(int(serial_data["cell_state"])))
374        csv_data["Charge (%)"] = serial_data["percent_charged"]
375        csv_data["Health (%)"] = serial_data["percent_health"]
376        csv_data["Used (Ah)"] = float(serial_data["milliamp_hour_used"]) / 1000
377        csv_data["Remaining (Ah)"] = float(serial_data["milliamp_hour_remaining"]) / 1000
378        csv_data["Charge Cycles"] = serial_data["charge_cycles"]
379        csv_data["Capacity (Ah)"] = float(serial_data["milliamp_hour_capacity"]) / 1000
380        csv_data["SOC Charge Max"] = serial_data["Q_Max"]
381        csv_data["Columb Count"] = serial_data.get("Columb_Count")
382        csv_data["BMS State"] = str(BMSState(int(serial_data.get("BMS_State", 0))))
383        csv_data["Wakeup Counter"] = serial_data.get("Wakeup_Counter")
384        csv_data["Reset Flags"] = str(ControlStatusRegister(int(serial_data.get("Reset_Flags", 0))))
385        csv_data["Git Hash"] = f"0x{serial_data.get('git_hash'):X}"
386
387        # User data
388        for index, user_data in key_iter("userdef_8"):
389            csv_data[f"User Data Byte {index}"] = f"0x{user_data:02X}"
390        for index, user_data in key_iter("userdef_16"):
391            csv_data[f"User Data Word {index}"] = f"0x{user_data:04X}"
392        for index, user_data in key_iter("userdef_32"):
393            csv_data[f"User Data DWord {index}"] = f"0x{user_data:08X}"
394        csv_data["User Data QWord"] = f"0x{serial_data['userdef_64']:016X}"
395
396        return csv_data

Read serial data dict and parse it into human-readable text.

serial_monitor = <SerialMonitor object>