hitl_tester.modules.bms.event_watcher

Continually polls for data, recording any changes in an event log.

(c) 2020-2024 TurnAround Factor, Inc.

#

CUI DISTRIBUTION CONTROL

Controlled by: DLA J68 R&D SBIP

CUI Category: Small Business Research and Technology

Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS

POC: GOV SBIP Program Manager Denise Price, 571-767-0111

Distribution authorized to U.S. Government Agencies only, to protect information not owned by the

U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that

it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests

for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,

Fort Belvoir, VA 22060-6221

#

SBIR DATA RIGHTS

Contract No.:SP4701-23-C-0083

Contractor Name: TurnAround Factor, Inc.

Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005

Expiration of SBIR Data Rights Period: September 24, 2029

The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer

software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights

in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause

contained in the above identified contract. No restrictions apply after the expiration date shown above. Any

reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce

the markings.

  1"""
  2Continually polls for data, recording any changes in an event log.
  3
  4# (c) 2020-2024 TurnAround Factor, Inc.
  5#
  6# CUI DISTRIBUTION CONTROL
  7# Controlled by: DLA J68 R&D SBIP
  8# CUI Category: Small Business Research and Technology
  9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111
 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
 15# Fort Belvoir, VA 22060-6221
 16#
 17# SBIR DATA RIGHTS
 18# Contract No.:SP4701-23-C-0083
 19# Contractor Name: TurnAround Factor, Inc.
 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
 21# Expiration of SBIR Data Rights Period: September 24, 2029
 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
 27# the markings.
 28"""
 29
 30from __future__ import annotations
 31
 32import datetime
 33import threading
 34import time
 35from abc import ABC, abstractmethod
 36from dataclasses import dataclass
 37from enum import Enum, auto
 38from typing import Any
 39
 40from hitl_tester.modules.bms.bms_serial import serial_monitor
 41from hitl_tester.modules.bms.bms_hw import BMSHardware
 42from hitl_tester.modules.bms.plateset import Plateset
 43from hitl_tester.modules.bms.smbus import SMBus
 44from hitl_tester.modules.bms_types import DischargeType
 45from hitl_tester.modules.logger import logger
 46
 47bms_hardware = BMSHardware()
 48plateset = Plateset()
 49smbus = SMBus()
 50
 51# Allowable errors in measurements
 52VOLTAGE_ERROR_MV = (1000, 1000)
 53CURRENT_ERROR_MA = (30, 30)
 54TEMPERATURE_ERROR_DK = (10, 10)
 55CELL_VOLTAGE_ERROR_MV = (300, 300)
 56
 57
 58@dataclass
 59class Event:
 60    """A log of one flag event."""
 61
 62    value: Any
 63    time: float
 64    bms_time: datetime.timedelta
 65
 66
 67class MeasurementMode(Enum):
 68    """
 69    The action that occurred before we took the measurement. Used to decide what measurements are expected. For
 70    example, current should read zero after a fault).
 71    """
 72
 73    NO_FAULT = auto()
 74    CHARGING_FAULT = auto()
 75    DISCHARGING_FAULT = auto()
 76
 77
 78class Watcher(ABC):
 79    """Base class for any watcher."""
 80
 81    def __init__(self) -> None:
 82        self.stop_event = threading.Event()
 83        self.daemon: threading.Thread | None = None
 84        self.events: dict[str, list[Event]] = {}
 85        self.last_packet_timestamp: float = time.perf_counter()
 86        self.csv_state = ""
 87
 88    @abstractmethod
 89    def check_flags(self, csv: bool):
 90        """Poll flags and update events."""
 91
 92    def start(self, csv: bool = False):
 93        """Start daemon thread."""
 94        self.events.clear()
 95        self.daemon = threading.Thread(target=self.check_flags, daemon=True, args=(csv,))
 96        self.stop_event.clear()
 97        self.daemon.start()
 98
 99    def stop(self):
100        """Kill daemon thread."""
101        if self.daemon and self.daemon.is_alive():
102            self.stop_event.set()
103            self.daemon.join()
104
105    def assert_true(
106        self,
107        flag: str,
108        value: Any,
109        event_count: int = 0,
110        wait_time: float = 300,
111        error: tuple[float, float] = (0, 0),
112        log_active: bool = True,
113    ):  # FIXME(JA): Change time limit to frame limit (i.e. wait for x new frames)
114        """Wait some amount of seconds until the assertion is true, or raise AssertionError."""
115        logger.write_info_to_report(f"Waiting for {'' if value else '!'}{flag}")
116        if self.daemon:
117            flag_events: list[Event] = []
118            start_time = time.perf_counter()
119            old_packet_time = self.last_packet_timestamp - 2  # Only accept packets 2s after assertions
120
121            # Poll for some amount of seconds (waiting for at least one new packet within 5 minutes)
122            while (
123                time.perf_counter() - start_time < wait_time
124                or (start_time > self.last_packet_timestamp and time.perf_counter() - start_time < 5.5 * 60)
125                or old_packet_time != self.last_packet_timestamp
126            ):
127                old_packet_time = self.last_packet_timestamp
128                flag_events = self.events.get(flag, [])
129                if len(flag_events) >= max(1, event_count) and (
130                    value - error[0] <= flag_events[event_count - 1].value <= value + error[1]
131                ):
132                    if log_active and flag.startswith("flags."):
133                        logger.write_result_to_html_report(f"{flag} was {flag_events[event_count - 1].value}")
134                    is_permanent = flag.startswith("flags.permanent")
135                    if (flag.startswith("flags.fault") or is_permanent) and event_count != 1:
136                        self.assert_true("flags.measure_output_fets_disabled", value, wait_time=600)
137                        if not value:
138                            self.assert_measurements(not is_permanent, MeasurementMode.NO_FAULT)
139                        elif plateset.charger_switch:
140                            self.assert_measurements(not is_permanent, MeasurementMode.CHARGING_FAULT)
141                        else:
142                            self.assert_measurements(not is_permanent, MeasurementMode.DISCHARGING_FAULT)
143
144                    return
145
146            if len(flag_events) == 0:
147                if log_active:
148                    logger.write_result_to_html_report('<font color="#990000"> No data received </font>')
149                raise AssertionError("No data received.")
150
151            message = (
152                f"{flag} was {flag_events[-1].value} with {len(flag_events)} event(s), expected "
153                f"{value} with {max(1, event_count)} event(s)"
154            )
155            if log_active:
156                logger.write_result_to_html_report(f'<font color="#990000">{message}</font>')
157            raise AssertionError(f"{message}: {flag_events}")
158
159    def assert_false(
160        self,
161        flag: str,
162        value: Any,
163        event_count: int = 0,
164        wait_time: int = 60,
165        error: tuple[float, float] = (0, 0),
166    ):
167        """Wait some amount of seconds until the assertion is false, or raise AssertionError."""
168        try:
169            self.assert_true(flag, value, event_count, wait_time, error, log_active=False)
170        except AssertionError:
171            return
172        message = f"{flag} was {value} after {event_count} events."
173        logger.write_result_to_html_report(f'<font color="#990000>{message}</font>')
174        raise AssertionError(message)
175
176    def assert_measurements(self, check_serial: bool = True, mode: MeasurementMode = MeasurementMode.NO_FAULT):
177        """Verify voltage, current, and temperature match what was set."""
178        global VOLTAGE_ERROR_MV
179
180        # Expected values
181        if mode is MeasurementMode.NO_FAULT:
182            expected_voltage_v = sum(cell.volts for cell in bms_hardware.cells.values())
183            if len(bms_hardware.cells) == 0:
184                VOLTAGE_ERROR_MV = (4000, 4000)
185                expected_voltage_v = 13
186            expected_current_a = 0.0
187            if plateset.load_switch:
188                assert bms_hardware.load
189                expected_current_a = bms_hardware.load.target_amps
190            elif plateset.charger_switch:
191                assert bms_hardware.charger
192                expected_current_a = bms_hardware.charger.target_amps
193        elif mode is MeasurementMode.DISCHARGING_FAULT:
194            expected_voltage_v = 0.0
195            expected_current_a = 0.0
196        elif mode is MeasurementMode.CHARGING_FAULT:
197            assert bms_hardware.charger
198            expected_voltage_v = bms_hardware.charger.target_volts
199            expected_current_a = 0.0
200        logger.write_info_to_report(f"Expecting {expected_voltage_v} V, {expected_current_a} A")
201
202        # Measurement check
203        assert bms_hardware.dmm
204
205        if not (plateset.load_switch or plateset.charger_switch):
206            assert bms_hardware.load
207            with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000):
208                dmm_volts = bms_hardware.dmm.volts
209        else:
210            dmm_volts = bms_hardware.dmm.volts
211
212        logger.write_info_to_report(
213            f"Voltage: {expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000} V "
214            f"<= {dmm_volts} V <= {expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000} V"
215        )
216
217        assert (
218            expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000
219            <= dmm_volts
220            <= expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000
221        )
222
223        assert bms_hardware.load and bms_hardware.charger
224        if plateset.load_switch or plateset.charger_switch:
225            if plateset.load_switch:
226                measured_current = -bms_hardware.load.amps
227            elif plateset.charger_switch:
228                measured_current = bms_hardware.charger.amps
229            logger.write_info_to_report(
230                f"Current: {expected_current_a - CURRENT_ERROR_MA[0] / 1000} A "
231                f"<= {measured_current} A <= {expected_current_a + CURRENT_ERROR_MA[1] / 1000} A"
232            )
233            assert (
234                expected_current_a - CURRENT_ERROR_MA[0] / 1000
235                <= measured_current
236                <= expected_current_a + CURRENT_ERROR_MA[1] / 1000
237            )
238
239        # TODO(JA): Check cell sims and themisters with ADC plates 0 and 1
240
241        # Serial check
242        if check_serial:
243
244            if not (plateset.load_switch or plateset.charger_switch):
245                assert bms_hardware.load
246                with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000):
247                    logger.write_info_to_report(f"{expected_voltage_v * 1000}")
248                    self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV)
249            else:
250                logger.write_info_to_report(f"{expected_voltage_v * 1000}")
251                self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV)
252
253            logger.write_info_to_report(f"{expected_current_a * 1000}")
254            self.assert_true("mamps", expected_current_a * 1000, error=CURRENT_ERROR_MA)
255
256            if len(bms_hardware.cells):
257                logger.write_info_to_report(f"{(plateset.thermistor1 + 273)*10}")
258                self.assert_true("dk_temp", (plateset.thermistor1 + 273) * 10, error=TEMPERATURE_ERROR_DK)
259                logger.write_info_to_report(f"{(plateset.thermistor2 + 273)*10}")
260                self.assert_true("dk_temp1", (plateset.thermistor2 + 273) * 10, error=TEMPERATURE_ERROR_DK)
261
262            for cell_id, cell in bms_hardware.cells.items():
263                logger.write_info_to_report(f"{cell.measured_volts=}")
264            for cell_id, cell in bms_hardware.cells.items():
265                flag_name = f"mvolt_cell{cell_id-1 if cell_id != 1 else ''}"
266                logger.write_info_to_report(f"{(cell.measured_volts - 0.08)*1000}")
267                self.assert_true(flag_name, (cell.measured_volts - 0.08) * 1000, error=CELL_VOLTAGE_ERROR_MV)
268
269
270class SMBusWatcher(Watcher):
271    """Polls the SMBus for data, recording any updates as events."""
272
273    def check_flags(self, csv: bool):
274        """Poll flags and update events."""
275        while not self.stop_event.is_set():
276            smbus_data = smbus.parse_smbus_data()
277            logger.write_debug_to_report(smbus_data)
278            smbus_time = time.perf_counter()
279            for flag, value in smbus_data.items():
280                if self.events.get(flag) is None:
281                    self.events[flag] = [Event(value, smbus_time, datetime.timedelta())]
282                elif self.events[flag][-1].value != value:  # Flag changed
283                    self.events[flag].append(Event(value, smbus_time, datetime.timedelta()))
284                    logger.write_debug_to_report(f"Flag {flag}: {value}")
285            self.last_packet_timestamp = time.perf_counter()
286
287
288class SerialWatcher(Watcher):
289    """Polls the serial connection for data, recording any updates as events."""
290
291    def check_flags(self, csv: bool):
292        """Poll flags and update events."""
293        start_time = time.perf_counter()
294        while not self.stop_event.is_set():
295            serial_data = serial_monitor.read(latest=False)
296            if serial_data:
297                bms_seconds, bms_milliseconds = divmod(int(serial_data["timestamp"]), 1000)
298                bms_time = (
299                    datetime.datetime.fromtimestamp(bms_seconds, datetime.timezone.utc)
300                    - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc)
301                    + datetime.timedelta(milliseconds=bms_milliseconds)
302                )
303                logger.write_debug_to_report(str(serial_data))
304                serial_time = time.perf_counter()
305                for flag, value in serial_data.items():
306                    if self.events.get(flag) is None:
307                        self.events[flag] = [Event(value, serial_time, bms_time)]
308                    elif self.events[flag][-1].value != value:  # Flag changed
309                        self.events[flag].append(Event(value, serial_time, bms_time))
310                        if serial_data[flag] in (0, 1):
311                            logger.write_debug_to_report(f"Flag {flag}: {value}")
312                self.last_packet_timestamp = time.perf_counter()
313                if csv:
314                    elapsed_time = time.perf_counter() - start_time
315                    latest_i = 0.0
316                    if plateset.charger_switch:
317                        assert bms_hardware.charger
318                        latest_i = bms_hardware.charger.amps
319                    elif plateset.load_switch:
320                        assert bms_hardware.load
321                        latest_i = bms_hardware.load.amps
322                    bms_hardware.csv.cycle_smbus.record(elapsed_time, latest_i, None, 0, self.csv_state, serial_data)
VOLTAGE_ERROR_MV = (1000, 1000)
CURRENT_ERROR_MA = (30, 30)
TEMPERATURE_ERROR_DK = (10, 10)
CELL_VOLTAGE_ERROR_MV = (300, 300)
@dataclass
class Event:
59@dataclass
60class Event:
61    """A log of one flag event."""
62
63    value: Any
64    time: float
65    bms_time: datetime.timedelta

A log of one flag event.

Event(value: Any, time: float, bms_time: datetime.timedelta)
value: Any
time: float
bms_time: datetime.timedelta
class MeasurementMode(enum.Enum):
68class MeasurementMode(Enum):
69    """
70    The action that occurred before we took the measurement. Used to decide what measurements are expected. For
71    example, current should read zero after a fault).
72    """
73
74    NO_FAULT = auto()
75    CHARGING_FAULT = auto()
76    DISCHARGING_FAULT = auto()

The action that occurred before we took the measurement. Used to decide what measurements are expected. For example, current should read zero after a fault).

NO_FAULT = <MeasurementMode.NO_FAULT: 1>
CHARGING_FAULT = <MeasurementMode.CHARGING_FAULT: 2>
DISCHARGING_FAULT = <MeasurementMode.DISCHARGING_FAULT: 3>
Inherited Members
enum.Enum
name
value
class Watcher(abc.ABC):
 79class Watcher(ABC):
 80    """Base class for any watcher."""
 81
 82    def __init__(self) -> None:
 83        self.stop_event = threading.Event()
 84        self.daemon: threading.Thread | None = None
 85        self.events: dict[str, list[Event]] = {}
 86        self.last_packet_timestamp: float = time.perf_counter()
 87        self.csv_state = ""
 88
 89    @abstractmethod
 90    def check_flags(self, csv: bool):
 91        """Poll flags and update events."""
 92
 93    def start(self, csv: bool = False):
 94        """Start daemon thread."""
 95        self.events.clear()
 96        self.daemon = threading.Thread(target=self.check_flags, daemon=True, args=(csv,))
 97        self.stop_event.clear()
 98        self.daemon.start()
 99
100    def stop(self):
101        """Kill daemon thread."""
102        if self.daemon and self.daemon.is_alive():
103            self.stop_event.set()
104            self.daemon.join()
105
106    def assert_true(
107        self,
108        flag: str,
109        value: Any,
110        event_count: int = 0,
111        wait_time: float = 300,
112        error: tuple[float, float] = (0, 0),
113        log_active: bool = True,
114    ):  # FIXME(JA): Change time limit to frame limit (i.e. wait for x new frames)
115        """Wait some amount of seconds until the assertion is true, or raise AssertionError."""
116        logger.write_info_to_report(f"Waiting for {'' if value else '!'}{flag}")
117        if self.daemon:
118            flag_events: list[Event] = []
119            start_time = time.perf_counter()
120            old_packet_time = self.last_packet_timestamp - 2  # Only accept packets 2s after assertions
121
122            # Poll for some amount of seconds (waiting for at least one new packet within 5 minutes)
123            while (
124                time.perf_counter() - start_time < wait_time
125                or (start_time > self.last_packet_timestamp and time.perf_counter() - start_time < 5.5 * 60)
126                or old_packet_time != self.last_packet_timestamp
127            ):
128                old_packet_time = self.last_packet_timestamp
129                flag_events = self.events.get(flag, [])
130                if len(flag_events) >= max(1, event_count) and (
131                    value - error[0] <= flag_events[event_count - 1].value <= value + error[1]
132                ):
133                    if log_active and flag.startswith("flags."):
134                        logger.write_result_to_html_report(f"{flag} was {flag_events[event_count - 1].value}")
135                    is_permanent = flag.startswith("flags.permanent")
136                    if (flag.startswith("flags.fault") or is_permanent) and event_count != 1:
137                        self.assert_true("flags.measure_output_fets_disabled", value, wait_time=600)
138                        if not value:
139                            self.assert_measurements(not is_permanent, MeasurementMode.NO_FAULT)
140                        elif plateset.charger_switch:
141                            self.assert_measurements(not is_permanent, MeasurementMode.CHARGING_FAULT)
142                        else:
143                            self.assert_measurements(not is_permanent, MeasurementMode.DISCHARGING_FAULT)
144
145                    return
146
147            if len(flag_events) == 0:
148                if log_active:
149                    logger.write_result_to_html_report('<font color="#990000"> No data received </font>')
150                raise AssertionError("No data received.")
151
152            message = (
153                f"{flag} was {flag_events[-1].value} with {len(flag_events)} event(s), expected "
154                f"{value} with {max(1, event_count)} event(s)"
155            )
156            if log_active:
157                logger.write_result_to_html_report(f'<font color="#990000">{message}</font>')
158            raise AssertionError(f"{message}: {flag_events}")
159
160    def assert_false(
161        self,
162        flag: str,
163        value: Any,
164        event_count: int = 0,
165        wait_time: int = 60,
166        error: tuple[float, float] = (0, 0),
167    ):
168        """Wait some amount of seconds until the assertion is false, or raise AssertionError."""
169        try:
170            self.assert_true(flag, value, event_count, wait_time, error, log_active=False)
171        except AssertionError:
172            return
173        message = f"{flag} was {value} after {event_count} events."
174        logger.write_result_to_html_report(f'<font color="#990000>{message}</font>')
175        raise AssertionError(message)
176
177    def assert_measurements(self, check_serial: bool = True, mode: MeasurementMode = MeasurementMode.NO_FAULT):
178        """Verify voltage, current, and temperature match what was set."""
179        global VOLTAGE_ERROR_MV
180
181        # Expected values
182        if mode is MeasurementMode.NO_FAULT:
183            expected_voltage_v = sum(cell.volts for cell in bms_hardware.cells.values())
184            if len(bms_hardware.cells) == 0:
185                VOLTAGE_ERROR_MV = (4000, 4000)
186                expected_voltage_v = 13
187            expected_current_a = 0.0
188            if plateset.load_switch:
189                assert bms_hardware.load
190                expected_current_a = bms_hardware.load.target_amps
191            elif plateset.charger_switch:
192                assert bms_hardware.charger
193                expected_current_a = bms_hardware.charger.target_amps
194        elif mode is MeasurementMode.DISCHARGING_FAULT:
195            expected_voltage_v = 0.0
196            expected_current_a = 0.0
197        elif mode is MeasurementMode.CHARGING_FAULT:
198            assert bms_hardware.charger
199            expected_voltage_v = bms_hardware.charger.target_volts
200            expected_current_a = 0.0
201        logger.write_info_to_report(f"Expecting {expected_voltage_v} V, {expected_current_a} A")
202
203        # Measurement check
204        assert bms_hardware.dmm
205
206        if not (plateset.load_switch or plateset.charger_switch):
207            assert bms_hardware.load
208            with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000):
209                dmm_volts = bms_hardware.dmm.volts
210        else:
211            dmm_volts = bms_hardware.dmm.volts
212
213        logger.write_info_to_report(
214            f"Voltage: {expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000} V "
215            f"<= {dmm_volts} V <= {expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000} V"
216        )
217
218        assert (
219            expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000
220            <= dmm_volts
221            <= expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000
222        )
223
224        assert bms_hardware.load and bms_hardware.charger
225        if plateset.load_switch or plateset.charger_switch:
226            if plateset.load_switch:
227                measured_current = -bms_hardware.load.amps
228            elif plateset.charger_switch:
229                measured_current = bms_hardware.charger.amps
230            logger.write_info_to_report(
231                f"Current: {expected_current_a - CURRENT_ERROR_MA[0] / 1000} A "
232                f"<= {measured_current} A <= {expected_current_a + CURRENT_ERROR_MA[1] / 1000} A"
233            )
234            assert (
235                expected_current_a - CURRENT_ERROR_MA[0] / 1000
236                <= measured_current
237                <= expected_current_a + CURRENT_ERROR_MA[1] / 1000
238            )
239
240        # TODO(JA): Check cell sims and themisters with ADC plates 0 and 1
241
242        # Serial check
243        if check_serial:
244
245            if not (plateset.load_switch or plateset.charger_switch):
246                assert bms_hardware.load
247                with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000):
248                    logger.write_info_to_report(f"{expected_voltage_v * 1000}")
249                    self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV)
250            else:
251                logger.write_info_to_report(f"{expected_voltage_v * 1000}")
252                self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV)
253
254            logger.write_info_to_report(f"{expected_current_a * 1000}")
255            self.assert_true("mamps", expected_current_a * 1000, error=CURRENT_ERROR_MA)
256
257            if len(bms_hardware.cells):
258                logger.write_info_to_report(f"{(plateset.thermistor1 + 273)*10}")
259                self.assert_true("dk_temp", (plateset.thermistor1 + 273) * 10, error=TEMPERATURE_ERROR_DK)
260                logger.write_info_to_report(f"{(plateset.thermistor2 + 273)*10}")
261                self.assert_true("dk_temp1", (plateset.thermistor2 + 273) * 10, error=TEMPERATURE_ERROR_DK)
262
263            for cell_id, cell in bms_hardware.cells.items():
264                logger.write_info_to_report(f"{cell.measured_volts=}")
265            for cell_id, cell in bms_hardware.cells.items():
266                flag_name = f"mvolt_cell{cell_id-1 if cell_id != 1 else ''}"
267                logger.write_info_to_report(f"{(cell.measured_volts - 0.08)*1000}")
268                self.assert_true(flag_name, (cell.measured_volts - 0.08) * 1000, error=CELL_VOLTAGE_ERROR_MV)

Base class for any watcher.

stop_event
daemon: threading.Thread | None
events: dict[str, list[Event]]
last_packet_timestamp: float
csv_state
@abstractmethod
def check_flags(self, csv: bool):
89    @abstractmethod
90    def check_flags(self, csv: bool):
91        """Poll flags and update events."""

Poll flags and update events.

def start(self, csv: bool = False):
93    def start(self, csv: bool = False):
94        """Start daemon thread."""
95        self.events.clear()
96        self.daemon = threading.Thread(target=self.check_flags, daemon=True, args=(csv,))
97        self.stop_event.clear()
98        self.daemon.start()

Start daemon thread.

def stop(self):
100    def stop(self):
101        """Kill daemon thread."""
102        if self.daemon and self.daemon.is_alive():
103            self.stop_event.set()
104            self.daemon.join()

Kill daemon thread.

def assert_true( self, flag: str, value: Any, event_count: int = 0, wait_time: float = 300, error: tuple[float, float] = (0, 0), log_active: bool = True):
106    def assert_true(
107        self,
108        flag: str,
109        value: Any,
110        event_count: int = 0,
111        wait_time: float = 300,
112        error: tuple[float, float] = (0, 0),
113        log_active: bool = True,
114    ):  # FIXME(JA): Change time limit to frame limit (i.e. wait for x new frames)
115        """Wait some amount of seconds until the assertion is true, or raise AssertionError."""
116        logger.write_info_to_report(f"Waiting for {'' if value else '!'}{flag}")
117        if self.daemon:
118            flag_events: list[Event] = []
119            start_time = time.perf_counter()
120            old_packet_time = self.last_packet_timestamp - 2  # Only accept packets 2s after assertions
121
122            # Poll for some amount of seconds (waiting for at least one new packet within 5 minutes)
123            while (
124                time.perf_counter() - start_time < wait_time
125                or (start_time > self.last_packet_timestamp and time.perf_counter() - start_time < 5.5 * 60)
126                or old_packet_time != self.last_packet_timestamp
127            ):
128                old_packet_time = self.last_packet_timestamp
129                flag_events = self.events.get(flag, [])
130                if len(flag_events) >= max(1, event_count) and (
131                    value - error[0] <= flag_events[event_count - 1].value <= value + error[1]
132                ):
133                    if log_active and flag.startswith("flags."):
134                        logger.write_result_to_html_report(f"{flag} was {flag_events[event_count - 1].value}")
135                    is_permanent = flag.startswith("flags.permanent")
136                    if (flag.startswith("flags.fault") or is_permanent) and event_count != 1:
137                        self.assert_true("flags.measure_output_fets_disabled", value, wait_time=600)
138                        if not value:
139                            self.assert_measurements(not is_permanent, MeasurementMode.NO_FAULT)
140                        elif plateset.charger_switch:
141                            self.assert_measurements(not is_permanent, MeasurementMode.CHARGING_FAULT)
142                        else:
143                            self.assert_measurements(not is_permanent, MeasurementMode.DISCHARGING_FAULT)
144
145                    return
146
147            if len(flag_events) == 0:
148                if log_active:
149                    logger.write_result_to_html_report('<font color="#990000"> No data received </font>')
150                raise AssertionError("No data received.")
151
152            message = (
153                f"{flag} was {flag_events[-1].value} with {len(flag_events)} event(s), expected "
154                f"{value} with {max(1, event_count)} event(s)"
155            )
156            if log_active:
157                logger.write_result_to_html_report(f'<font color="#990000">{message}</font>')
158            raise AssertionError(f"{message}: {flag_events}")

Wait some amount of seconds until the assertion is true, or raise AssertionError.

def assert_false( self, flag: str, value: Any, event_count: int = 0, wait_time: int = 60, error: tuple[float, float] = (0, 0)):
160    def assert_false(
161        self,
162        flag: str,
163        value: Any,
164        event_count: int = 0,
165        wait_time: int = 60,
166        error: tuple[float, float] = (0, 0),
167    ):
168        """Wait some amount of seconds until the assertion is false, or raise AssertionError."""
169        try:
170            self.assert_true(flag, value, event_count, wait_time, error, log_active=False)
171        except AssertionError:
172            return
173        message = f"{flag} was {value} after {event_count} events."
174        logger.write_result_to_html_report(f'<font color="#990000>{message}</font>')
175        raise AssertionError(message)

Wait some amount of seconds until the assertion is false, or raise AssertionError.

def assert_measurements( self, check_serial: bool = True, mode: MeasurementMode = <MeasurementMode.NO_FAULT: 1>):
177    def assert_measurements(self, check_serial: bool = True, mode: MeasurementMode = MeasurementMode.NO_FAULT):
178        """Verify voltage, current, and temperature match what was set."""
179        global VOLTAGE_ERROR_MV
180
181        # Expected values
182        if mode is MeasurementMode.NO_FAULT:
183            expected_voltage_v = sum(cell.volts for cell in bms_hardware.cells.values())
184            if len(bms_hardware.cells) == 0:
185                VOLTAGE_ERROR_MV = (4000, 4000)
186                expected_voltage_v = 13
187            expected_current_a = 0.0
188            if plateset.load_switch:
189                assert bms_hardware.load
190                expected_current_a = bms_hardware.load.target_amps
191            elif plateset.charger_switch:
192                assert bms_hardware.charger
193                expected_current_a = bms_hardware.charger.target_amps
194        elif mode is MeasurementMode.DISCHARGING_FAULT:
195            expected_voltage_v = 0.0
196            expected_current_a = 0.0
197        elif mode is MeasurementMode.CHARGING_FAULT:
198            assert bms_hardware.charger
199            expected_voltage_v = bms_hardware.charger.target_volts
200            expected_current_a = 0.0
201        logger.write_info_to_report(f"Expecting {expected_voltage_v} V, {expected_current_a} A")
202
203        # Measurement check
204        assert bms_hardware.dmm
205
206        if not (plateset.load_switch or plateset.charger_switch):
207            assert bms_hardware.load
208            with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000):
209                dmm_volts = bms_hardware.dmm.volts
210        else:
211            dmm_volts = bms_hardware.dmm.volts
212
213        logger.write_info_to_report(
214            f"Voltage: {expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000} V "
215            f"<= {dmm_volts} V <= {expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000} V"
216        )
217
218        assert (
219            expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000
220            <= dmm_volts
221            <= expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000
222        )
223
224        assert bms_hardware.load and bms_hardware.charger
225        if plateset.load_switch or plateset.charger_switch:
226            if plateset.load_switch:
227                measured_current = -bms_hardware.load.amps
228            elif plateset.charger_switch:
229                measured_current = bms_hardware.charger.amps
230            logger.write_info_to_report(
231                f"Current: {expected_current_a - CURRENT_ERROR_MA[0] / 1000} A "
232                f"<= {measured_current} A <= {expected_current_a + CURRENT_ERROR_MA[1] / 1000} A"
233            )
234            assert (
235                expected_current_a - CURRENT_ERROR_MA[0] / 1000
236                <= measured_current
237                <= expected_current_a + CURRENT_ERROR_MA[1] / 1000
238            )
239
240        # TODO(JA): Check cell sims and themisters with ADC plates 0 and 1
241
242        # Serial check
243        if check_serial:
244
245            if not (plateset.load_switch or plateset.charger_switch):
246                assert bms_hardware.load
247                with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000):
248                    logger.write_info_to_report(f"{expected_voltage_v * 1000}")
249                    self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV)
250            else:
251                logger.write_info_to_report(f"{expected_voltage_v * 1000}")
252                self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV)
253
254            logger.write_info_to_report(f"{expected_current_a * 1000}")
255            self.assert_true("mamps", expected_current_a * 1000, error=CURRENT_ERROR_MA)
256
257            if len(bms_hardware.cells):
258                logger.write_info_to_report(f"{(plateset.thermistor1 + 273)*10}")
259                self.assert_true("dk_temp", (plateset.thermistor1 + 273) * 10, error=TEMPERATURE_ERROR_DK)
260                logger.write_info_to_report(f"{(plateset.thermistor2 + 273)*10}")
261                self.assert_true("dk_temp1", (plateset.thermistor2 + 273) * 10, error=TEMPERATURE_ERROR_DK)
262
263            for cell_id, cell in bms_hardware.cells.items():
264                logger.write_info_to_report(f"{cell.measured_volts=}")
265            for cell_id, cell in bms_hardware.cells.items():
266                flag_name = f"mvolt_cell{cell_id-1 if cell_id != 1 else ''}"
267                logger.write_info_to_report(f"{(cell.measured_volts - 0.08)*1000}")
268                self.assert_true(flag_name, (cell.measured_volts - 0.08) * 1000, error=CELL_VOLTAGE_ERROR_MV)

Verify voltage, current, and temperature match what was set.

class SMBusWatcher(Watcher):
271class SMBusWatcher(Watcher):
272    """Polls the SMBus for data, recording any updates as events."""
273
274    def check_flags(self, csv: bool):
275        """Poll flags and update events."""
276        while not self.stop_event.is_set():
277            smbus_data = smbus.parse_smbus_data()
278            logger.write_debug_to_report(smbus_data)
279            smbus_time = time.perf_counter()
280            for flag, value in smbus_data.items():
281                if self.events.get(flag) is None:
282                    self.events[flag] = [Event(value, smbus_time, datetime.timedelta())]
283                elif self.events[flag][-1].value != value:  # Flag changed
284                    self.events[flag].append(Event(value, smbus_time, datetime.timedelta()))
285                    logger.write_debug_to_report(f"Flag {flag}: {value}")
286            self.last_packet_timestamp = time.perf_counter()

Polls the SMBus for data, recording any updates as events.

def check_flags(self, csv: bool):
274    def check_flags(self, csv: bool):
275        """Poll flags and update events."""
276        while not self.stop_event.is_set():
277            smbus_data = smbus.parse_smbus_data()
278            logger.write_debug_to_report(smbus_data)
279            smbus_time = time.perf_counter()
280            for flag, value in smbus_data.items():
281                if self.events.get(flag) is None:
282                    self.events[flag] = [Event(value, smbus_time, datetime.timedelta())]
283                elif self.events[flag][-1].value != value:  # Flag changed
284                    self.events[flag].append(Event(value, smbus_time, datetime.timedelta()))
285                    logger.write_debug_to_report(f"Flag {flag}: {value}")
286            self.last_packet_timestamp = time.perf_counter()

Poll flags and update events.

class SerialWatcher(Watcher):
289class SerialWatcher(Watcher):
290    """Polls the serial connection for data, recording any updates as events."""
291
292    def check_flags(self, csv: bool):
293        """Poll flags and update events."""
294        start_time = time.perf_counter()
295        while not self.stop_event.is_set():
296            serial_data = serial_monitor.read(latest=False)
297            if serial_data:
298                bms_seconds, bms_milliseconds = divmod(int(serial_data["timestamp"]), 1000)
299                bms_time = (
300                    datetime.datetime.fromtimestamp(bms_seconds, datetime.timezone.utc)
301                    - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc)
302                    + datetime.timedelta(milliseconds=bms_milliseconds)
303                )
304                logger.write_debug_to_report(str(serial_data))
305                serial_time = time.perf_counter()
306                for flag, value in serial_data.items():
307                    if self.events.get(flag) is None:
308                        self.events[flag] = [Event(value, serial_time, bms_time)]
309                    elif self.events[flag][-1].value != value:  # Flag changed
310                        self.events[flag].append(Event(value, serial_time, bms_time))
311                        if serial_data[flag] in (0, 1):
312                            logger.write_debug_to_report(f"Flag {flag}: {value}")
313                self.last_packet_timestamp = time.perf_counter()
314                if csv:
315                    elapsed_time = time.perf_counter() - start_time
316                    latest_i = 0.0
317                    if plateset.charger_switch:
318                        assert bms_hardware.charger
319                        latest_i = bms_hardware.charger.amps
320                    elif plateset.load_switch:
321                        assert bms_hardware.load
322                        latest_i = bms_hardware.load.amps
323                    bms_hardware.csv.cycle_smbus.record(elapsed_time, latest_i, None, 0, self.csv_state, serial_data)

Polls the serial connection for data, recording any updates as events.

def check_flags(self, csv: bool):
292    def check_flags(self, csv: bool):
293        """Poll flags and update events."""
294        start_time = time.perf_counter()
295        while not self.stop_event.is_set():
296            serial_data = serial_monitor.read(latest=False)
297            if serial_data:
298                bms_seconds, bms_milliseconds = divmod(int(serial_data["timestamp"]), 1000)
299                bms_time = (
300                    datetime.datetime.fromtimestamp(bms_seconds, datetime.timezone.utc)
301                    - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc)
302                    + datetime.timedelta(milliseconds=bms_milliseconds)
303                )
304                logger.write_debug_to_report(str(serial_data))
305                serial_time = time.perf_counter()
306                for flag, value in serial_data.items():
307                    if self.events.get(flag) is None:
308                        self.events[flag] = [Event(value, serial_time, bms_time)]
309                    elif self.events[flag][-1].value != value:  # Flag changed
310                        self.events[flag].append(Event(value, serial_time, bms_time))
311                        if serial_data[flag] in (0, 1):
312                            logger.write_debug_to_report(f"Flag {flag}: {value}")
313                self.last_packet_timestamp = time.perf_counter()
314                if csv:
315                    elapsed_time = time.perf_counter() - start_time
316                    latest_i = 0.0
317                    if plateset.charger_switch:
318                        assert bms_hardware.charger
319                        latest_i = bms_hardware.charger.amps
320                    elif plateset.load_switch:
321                        assert bms_hardware.load
322                        latest_i = bms_hardware.load.amps
323                    bms_hardware.csv.cycle_smbus.record(elapsed_time, latest_i, None, 0, self.csv_state, serial_data)

Poll flags and update events.