hitl_tester.modules.bms.event_watcher
Continually polls for data, recording any changes in an event log.
(c) 2020-2024 TurnAround Factor, Inc.
#
CUI DISTRIBUTION CONTROL
Controlled by: DLA J68 R&D SBIP
CUI Category: Small Business Research and Technology
Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
POC: GOV SBIP Program Manager Denise Price, 571-767-0111
Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
Fort Belvoir, VA 22060-6221
#
SBIR DATA RIGHTS
Contract No.:SP4701-23-C-0083
Contractor Name: TurnAround Factor, Inc.
Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
Expiration of SBIR Data Rights Period: September 24, 2029
The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
the markings.
1""" 2Continually polls for data, recording any changes in an event log. 3 4# (c) 2020-2024 TurnAround Factor, Inc. 5# 6# CUI DISTRIBUTION CONTROL 7# Controlled by: DLA J68 R&D SBIP 8# CUI Category: Small Business Research and Technology 9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, 15# Fort Belvoir, VA 22060-6221 16# 17# SBIR DATA RIGHTS 18# Contract No.:SP4701-23-C-0083 19# Contractor Name: TurnAround Factor, Inc. 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 21# Expiration of SBIR Data Rights Period: September 24, 2029 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce 27# the markings. 28""" 29 30from __future__ import annotations 31 32import datetime 33import threading 34import time 35from abc import ABC, abstractmethod 36from dataclasses import dataclass 37from enum import Enum, auto 38from typing import Any 39 40from hitl_tester.modules.bms.bms_serial import serial_monitor 41from hitl_tester.modules.bms.bms_hw import BMSHardware 42from hitl_tester.modules.bms.plateset import Plateset 43from hitl_tester.modules.bms.smbus import SMBus 44from hitl_tester.modules.bms_types import DischargeType 45from hitl_tester.modules.logger import logger 46 47bms_hardware = BMSHardware() 48plateset = Plateset() 49smbus = SMBus() 50 51# Allowable errors in measurements 52VOLTAGE_ERROR_MV = (1000, 1000) 53CURRENT_ERROR_MA = (30, 30) 54TEMPERATURE_ERROR_DK = (10, 10) 55CELL_VOLTAGE_ERROR_MV = (300, 300) 56 57 58@dataclass 59class Event: 60 """A log of one flag event.""" 61 62 value: Any 63 time: float 64 bms_time: datetime.timedelta 65 66 67class MeasurementMode(Enum): 68 """ 69 The action that occurred before we took the measurement. Used to decide what measurements are expected. For 70 example, current should read zero after a fault). 71 """ 72 73 NO_FAULT = auto() 74 CHARGING_FAULT = auto() 75 DISCHARGING_FAULT = auto() 76 77 78class Watcher(ABC): 79 """Base class for any watcher.""" 80 81 def __init__(self) -> None: 82 self.stop_event = threading.Event() 83 self.daemon: threading.Thread | None = None 84 self.events: dict[str, list[Event]] = {} 85 self.last_packet_timestamp: float = time.perf_counter() 86 self.csv_state = "" 87 88 @abstractmethod 89 def check_flags(self, csv: bool): 90 """Poll flags and update events.""" 91 92 def start(self, csv: bool = False): 93 """Start daemon thread.""" 94 self.events.clear() 95 self.daemon = threading.Thread(target=self.check_flags, daemon=True, args=(csv,)) 96 self.stop_event.clear() 97 self.daemon.start() 98 99 def stop(self): 100 """Kill daemon thread.""" 101 if self.daemon and self.daemon.is_alive(): 102 self.stop_event.set() 103 self.daemon.join() 104 105 def assert_true( 106 self, 107 flag: str, 108 value: Any, 109 event_count: int = 0, 110 wait_time: float = 300, 111 error: tuple[float, float] = (0, 0), 112 log_active: bool = True, 113 ): # FIXME(JA): Change time limit to frame limit (i.e. wait for x new frames) 114 """Wait some amount of seconds until the assertion is true, or raise AssertionError.""" 115 logger.write_info_to_report(f"Waiting for {'' if value else '!'}{flag}") 116 if self.daemon: 117 flag_events: list[Event] = [] 118 start_time = time.perf_counter() 119 old_packet_time = self.last_packet_timestamp - 2 # Only accept packets 2s after assertions 120 121 # Poll for some amount of seconds (waiting for at least one new packet within 5 minutes) 122 while ( 123 time.perf_counter() - start_time < wait_time 124 or (start_time > self.last_packet_timestamp and time.perf_counter() - start_time < 5.5 * 60) 125 or old_packet_time != self.last_packet_timestamp 126 ): 127 old_packet_time = self.last_packet_timestamp 128 flag_events = self.events.get(flag, []) 129 if len(flag_events) >= max(1, event_count) and ( 130 value - error[0] <= flag_events[event_count - 1].value <= value + error[1] 131 ): 132 if log_active and flag.startswith("flags."): 133 logger.write_result_to_html_report(f"{flag} was {flag_events[event_count - 1].value}") 134 is_permanent = flag.startswith("flags.permanent") 135 if (flag.startswith("flags.fault") or is_permanent) and event_count != 1: 136 self.assert_true("flags.measure_output_fets_disabled", value, wait_time=600) 137 if not value: 138 self.assert_measurements(not is_permanent, MeasurementMode.NO_FAULT) 139 elif plateset.charger_switch: 140 self.assert_measurements(not is_permanent, MeasurementMode.CHARGING_FAULT) 141 else: 142 self.assert_measurements(not is_permanent, MeasurementMode.DISCHARGING_FAULT) 143 144 return 145 146 if len(flag_events) == 0: 147 if log_active: 148 logger.write_result_to_html_report('<font color="#990000"> No data received </font>') 149 raise AssertionError("No data received.") 150 151 message = ( 152 f"{flag} was {flag_events[-1].value} with {len(flag_events)} event(s), expected " 153 f"{value} with {max(1, event_count)} event(s)" 154 ) 155 if log_active: 156 logger.write_result_to_html_report(f'<font color="#990000">{message}</font>') 157 raise AssertionError(f"{message}: {flag_events}") 158 159 def assert_false( 160 self, 161 flag: str, 162 value: Any, 163 event_count: int = 0, 164 wait_time: int = 60, 165 error: tuple[float, float] = (0, 0), 166 ): 167 """Wait some amount of seconds until the assertion is false, or raise AssertionError.""" 168 try: 169 self.assert_true(flag, value, event_count, wait_time, error, log_active=False) 170 except AssertionError: 171 return 172 message = f"{flag} was {value} after {event_count} events." 173 logger.write_result_to_html_report(f'<font color="#990000>{message}</font>') 174 raise AssertionError(message) 175 176 def assert_measurements(self, check_serial: bool = True, mode: MeasurementMode = MeasurementMode.NO_FAULT): 177 """Verify voltage, current, and temperature match what was set.""" 178 global VOLTAGE_ERROR_MV 179 180 # Expected values 181 if mode is MeasurementMode.NO_FAULT: 182 expected_voltage_v = sum(cell.volts for cell in bms_hardware.cells.values()) 183 if len(bms_hardware.cells) == 0: 184 VOLTAGE_ERROR_MV = (4000, 4000) 185 expected_voltage_v = 13 186 expected_current_a = 0.0 187 if plateset.load_switch: 188 assert bms_hardware.load 189 expected_current_a = bms_hardware.load.target_amps 190 elif plateset.charger_switch: 191 assert bms_hardware.charger 192 expected_current_a = bms_hardware.charger.target_amps 193 elif mode is MeasurementMode.DISCHARGING_FAULT: 194 expected_voltage_v = 0.0 195 expected_current_a = 0.0 196 elif mode is MeasurementMode.CHARGING_FAULT: 197 assert bms_hardware.charger 198 expected_voltage_v = bms_hardware.charger.target_volts 199 expected_current_a = 0.0 200 logger.write_info_to_report(f"Expecting {expected_voltage_v} V, {expected_current_a} A") 201 202 # Measurement check 203 assert bms_hardware.dmm 204 205 if not (plateset.load_switch or plateset.charger_switch): 206 assert bms_hardware.load 207 with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000): 208 dmm_volts = bms_hardware.dmm.volts 209 else: 210 dmm_volts = bms_hardware.dmm.volts 211 212 logger.write_info_to_report( 213 f"Voltage: {expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000} V " 214 f"<= {dmm_volts} V <= {expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000} V" 215 ) 216 217 assert ( 218 expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000 219 <= dmm_volts 220 <= expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000 221 ) 222 223 assert bms_hardware.load and bms_hardware.charger 224 if plateset.load_switch or plateset.charger_switch: 225 if plateset.load_switch: 226 measured_current = -bms_hardware.load.amps 227 elif plateset.charger_switch: 228 measured_current = bms_hardware.charger.amps 229 logger.write_info_to_report( 230 f"Current: {expected_current_a - CURRENT_ERROR_MA[0] / 1000} A " 231 f"<= {measured_current} A <= {expected_current_a + CURRENT_ERROR_MA[1] / 1000} A" 232 ) 233 assert ( 234 expected_current_a - CURRENT_ERROR_MA[0] / 1000 235 <= measured_current 236 <= expected_current_a + CURRENT_ERROR_MA[1] / 1000 237 ) 238 239 # TODO(JA): Check cell sims and themisters with ADC plates 0 and 1 240 241 # Serial check 242 if check_serial: 243 244 if not (plateset.load_switch or plateset.charger_switch): 245 assert bms_hardware.load 246 with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000): 247 logger.write_info_to_report(f"{expected_voltage_v * 1000}") 248 self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV) 249 else: 250 logger.write_info_to_report(f"{expected_voltage_v * 1000}") 251 self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV) 252 253 logger.write_info_to_report(f"{expected_current_a * 1000}") 254 self.assert_true("mamps", expected_current_a * 1000, error=CURRENT_ERROR_MA) 255 256 if len(bms_hardware.cells): 257 logger.write_info_to_report(f"{(plateset.thermistor1 + 273)*10}") 258 self.assert_true("dk_temp", (plateset.thermistor1 + 273) * 10, error=TEMPERATURE_ERROR_DK) 259 logger.write_info_to_report(f"{(plateset.thermistor2 + 273)*10}") 260 self.assert_true("dk_temp1", (plateset.thermistor2 + 273) * 10, error=TEMPERATURE_ERROR_DK) 261 262 for cell_id, cell in bms_hardware.cells.items(): 263 logger.write_info_to_report(f"{cell.measured_volts=}") 264 for cell_id, cell in bms_hardware.cells.items(): 265 flag_name = f"mvolt_cell{cell_id-1 if cell_id != 1 else ''}" 266 logger.write_info_to_report(f"{(cell.measured_volts - 0.08)*1000}") 267 self.assert_true(flag_name, (cell.measured_volts - 0.08) * 1000, error=CELL_VOLTAGE_ERROR_MV) 268 269 270class SMBusWatcher(Watcher): 271 """Polls the SMBus for data, recording any updates as events.""" 272 273 def check_flags(self, csv: bool): 274 """Poll flags and update events.""" 275 while not self.stop_event.is_set(): 276 smbus_data = smbus.parse_smbus_data() 277 logger.write_debug_to_report(smbus_data) 278 smbus_time = time.perf_counter() 279 for flag, value in smbus_data.items(): 280 if self.events.get(flag) is None: 281 self.events[flag] = [Event(value, smbus_time, datetime.timedelta())] 282 elif self.events[flag][-1].value != value: # Flag changed 283 self.events[flag].append(Event(value, smbus_time, datetime.timedelta())) 284 logger.write_debug_to_report(f"Flag {flag}: {value}") 285 self.last_packet_timestamp = time.perf_counter() 286 287 288class SerialWatcher(Watcher): 289 """Polls the serial connection for data, recording any updates as events.""" 290 291 def check_flags(self, csv: bool): 292 """Poll flags and update events.""" 293 start_time = time.perf_counter() 294 while not self.stop_event.is_set(): 295 serial_data = serial_monitor.read(latest=False) 296 if serial_data: 297 bms_seconds, bms_milliseconds = divmod(int(serial_data["timestamp"]), 1000) 298 bms_time = ( 299 datetime.datetime.fromtimestamp(bms_seconds, datetime.timezone.utc) 300 - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc) 301 + datetime.timedelta(milliseconds=bms_milliseconds) 302 ) 303 logger.write_debug_to_report(str(serial_data)) 304 serial_time = time.perf_counter() 305 for flag, value in serial_data.items(): 306 if self.events.get(flag) is None: 307 self.events[flag] = [Event(value, serial_time, bms_time)] 308 elif self.events[flag][-1].value != value: # Flag changed 309 self.events[flag].append(Event(value, serial_time, bms_time)) 310 if serial_data[flag] in (0, 1): 311 logger.write_debug_to_report(f"Flag {flag}: {value}") 312 self.last_packet_timestamp = time.perf_counter() 313 if csv: 314 elapsed_time = time.perf_counter() - start_time 315 latest_i = 0.0 316 if plateset.charger_switch: 317 assert bms_hardware.charger 318 latest_i = bms_hardware.charger.amps 319 elif plateset.load_switch: 320 assert bms_hardware.load 321 latest_i = bms_hardware.load.amps 322 bms_hardware.csv.cycle_smbus.record(elapsed_time, latest_i, None, 0, self.csv_state, serial_data)
bms_hardware =
<hitl_tester.modules.bms.bms_hw.BMSHardware object>
plateset =
<hitl_tester.modules.bms.plateset.Plateset object>
smbus =
<hitl_tester.modules.bms.smbus.SMBus object>
VOLTAGE_ERROR_MV =
(1000, 1000)
CURRENT_ERROR_MA =
(30, 30)
TEMPERATURE_ERROR_DK =
(10, 10)
CELL_VOLTAGE_ERROR_MV =
(300, 300)
@dataclass
class
Event:
59@dataclass 60class Event: 61 """A log of one flag event.""" 62 63 value: Any 64 time: float 65 bms_time: datetime.timedelta
A log of one flag event.
class
MeasurementMode(enum.Enum):
68class MeasurementMode(Enum): 69 """ 70 The action that occurred before we took the measurement. Used to decide what measurements are expected. For 71 example, current should read zero after a fault). 72 """ 73 74 NO_FAULT = auto() 75 CHARGING_FAULT = auto() 76 DISCHARGING_FAULT = auto()
The action that occurred before we took the measurement. Used to decide what measurements are expected. For example, current should read zero after a fault).
NO_FAULT =
<MeasurementMode.NO_FAULT: 1>
CHARGING_FAULT =
<MeasurementMode.CHARGING_FAULT: 2>
DISCHARGING_FAULT =
<MeasurementMode.DISCHARGING_FAULT: 3>
Inherited Members
- enum.Enum
- name
- value
class
Watcher(abc.ABC):
79class Watcher(ABC): 80 """Base class for any watcher.""" 81 82 def __init__(self) -> None: 83 self.stop_event = threading.Event() 84 self.daemon: threading.Thread | None = None 85 self.events: dict[str, list[Event]] = {} 86 self.last_packet_timestamp: float = time.perf_counter() 87 self.csv_state = "" 88 89 @abstractmethod 90 def check_flags(self, csv: bool): 91 """Poll flags and update events.""" 92 93 def start(self, csv: bool = False): 94 """Start daemon thread.""" 95 self.events.clear() 96 self.daemon = threading.Thread(target=self.check_flags, daemon=True, args=(csv,)) 97 self.stop_event.clear() 98 self.daemon.start() 99 100 def stop(self): 101 """Kill daemon thread.""" 102 if self.daemon and self.daemon.is_alive(): 103 self.stop_event.set() 104 self.daemon.join() 105 106 def assert_true( 107 self, 108 flag: str, 109 value: Any, 110 event_count: int = 0, 111 wait_time: float = 300, 112 error: tuple[float, float] = (0, 0), 113 log_active: bool = True, 114 ): # FIXME(JA): Change time limit to frame limit (i.e. wait for x new frames) 115 """Wait some amount of seconds until the assertion is true, or raise AssertionError.""" 116 logger.write_info_to_report(f"Waiting for {'' if value else '!'}{flag}") 117 if self.daemon: 118 flag_events: list[Event] = [] 119 start_time = time.perf_counter() 120 old_packet_time = self.last_packet_timestamp - 2 # Only accept packets 2s after assertions 121 122 # Poll for some amount of seconds (waiting for at least one new packet within 5 minutes) 123 while ( 124 time.perf_counter() - start_time < wait_time 125 or (start_time > self.last_packet_timestamp and time.perf_counter() - start_time < 5.5 * 60) 126 or old_packet_time != self.last_packet_timestamp 127 ): 128 old_packet_time = self.last_packet_timestamp 129 flag_events = self.events.get(flag, []) 130 if len(flag_events) >= max(1, event_count) and ( 131 value - error[0] <= flag_events[event_count - 1].value <= value + error[1] 132 ): 133 if log_active and flag.startswith("flags."): 134 logger.write_result_to_html_report(f"{flag} was {flag_events[event_count - 1].value}") 135 is_permanent = flag.startswith("flags.permanent") 136 if (flag.startswith("flags.fault") or is_permanent) and event_count != 1: 137 self.assert_true("flags.measure_output_fets_disabled", value, wait_time=600) 138 if not value: 139 self.assert_measurements(not is_permanent, MeasurementMode.NO_FAULT) 140 elif plateset.charger_switch: 141 self.assert_measurements(not is_permanent, MeasurementMode.CHARGING_FAULT) 142 else: 143 self.assert_measurements(not is_permanent, MeasurementMode.DISCHARGING_FAULT) 144 145 return 146 147 if len(flag_events) == 0: 148 if log_active: 149 logger.write_result_to_html_report('<font color="#990000"> No data received </font>') 150 raise AssertionError("No data received.") 151 152 message = ( 153 f"{flag} was {flag_events[-1].value} with {len(flag_events)} event(s), expected " 154 f"{value} with {max(1, event_count)} event(s)" 155 ) 156 if log_active: 157 logger.write_result_to_html_report(f'<font color="#990000">{message}</font>') 158 raise AssertionError(f"{message}: {flag_events}") 159 160 def assert_false( 161 self, 162 flag: str, 163 value: Any, 164 event_count: int = 0, 165 wait_time: int = 60, 166 error: tuple[float, float] = (0, 0), 167 ): 168 """Wait some amount of seconds until the assertion is false, or raise AssertionError.""" 169 try: 170 self.assert_true(flag, value, event_count, wait_time, error, log_active=False) 171 except AssertionError: 172 return 173 message = f"{flag} was {value} after {event_count} events." 174 logger.write_result_to_html_report(f'<font color="#990000>{message}</font>') 175 raise AssertionError(message) 176 177 def assert_measurements(self, check_serial: bool = True, mode: MeasurementMode = MeasurementMode.NO_FAULT): 178 """Verify voltage, current, and temperature match what was set.""" 179 global VOLTAGE_ERROR_MV 180 181 # Expected values 182 if mode is MeasurementMode.NO_FAULT: 183 expected_voltage_v = sum(cell.volts for cell in bms_hardware.cells.values()) 184 if len(bms_hardware.cells) == 0: 185 VOLTAGE_ERROR_MV = (4000, 4000) 186 expected_voltage_v = 13 187 expected_current_a = 0.0 188 if plateset.load_switch: 189 assert bms_hardware.load 190 expected_current_a = bms_hardware.load.target_amps 191 elif plateset.charger_switch: 192 assert bms_hardware.charger 193 expected_current_a = bms_hardware.charger.target_amps 194 elif mode is MeasurementMode.DISCHARGING_FAULT: 195 expected_voltage_v = 0.0 196 expected_current_a = 0.0 197 elif mode is MeasurementMode.CHARGING_FAULT: 198 assert bms_hardware.charger 199 expected_voltage_v = bms_hardware.charger.target_volts 200 expected_current_a = 0.0 201 logger.write_info_to_report(f"Expecting {expected_voltage_v} V, {expected_current_a} A") 202 203 # Measurement check 204 assert bms_hardware.dmm 205 206 if not (plateset.load_switch or plateset.charger_switch): 207 assert bms_hardware.load 208 with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000): 209 dmm_volts = bms_hardware.dmm.volts 210 else: 211 dmm_volts = bms_hardware.dmm.volts 212 213 logger.write_info_to_report( 214 f"Voltage: {expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000} V " 215 f"<= {dmm_volts} V <= {expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000} V" 216 ) 217 218 assert ( 219 expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000 220 <= dmm_volts 221 <= expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000 222 ) 223 224 assert bms_hardware.load and bms_hardware.charger 225 if plateset.load_switch or plateset.charger_switch: 226 if plateset.load_switch: 227 measured_current = -bms_hardware.load.amps 228 elif plateset.charger_switch: 229 measured_current = bms_hardware.charger.amps 230 logger.write_info_to_report( 231 f"Current: {expected_current_a - CURRENT_ERROR_MA[0] / 1000} A " 232 f"<= {measured_current} A <= {expected_current_a + CURRENT_ERROR_MA[1] / 1000} A" 233 ) 234 assert ( 235 expected_current_a - CURRENT_ERROR_MA[0] / 1000 236 <= measured_current 237 <= expected_current_a + CURRENT_ERROR_MA[1] / 1000 238 ) 239 240 # TODO(JA): Check cell sims and themisters with ADC plates 0 and 1 241 242 # Serial check 243 if check_serial: 244 245 if not (plateset.load_switch or plateset.charger_switch): 246 assert bms_hardware.load 247 with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000): 248 logger.write_info_to_report(f"{expected_voltage_v * 1000}") 249 self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV) 250 else: 251 logger.write_info_to_report(f"{expected_voltage_v * 1000}") 252 self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV) 253 254 logger.write_info_to_report(f"{expected_current_a * 1000}") 255 self.assert_true("mamps", expected_current_a * 1000, error=CURRENT_ERROR_MA) 256 257 if len(bms_hardware.cells): 258 logger.write_info_to_report(f"{(plateset.thermistor1 + 273)*10}") 259 self.assert_true("dk_temp", (plateset.thermistor1 + 273) * 10, error=TEMPERATURE_ERROR_DK) 260 logger.write_info_to_report(f"{(plateset.thermistor2 + 273)*10}") 261 self.assert_true("dk_temp1", (plateset.thermistor2 + 273) * 10, error=TEMPERATURE_ERROR_DK) 262 263 for cell_id, cell in bms_hardware.cells.items(): 264 logger.write_info_to_report(f"{cell.measured_volts=}") 265 for cell_id, cell in bms_hardware.cells.items(): 266 flag_name = f"mvolt_cell{cell_id-1 if cell_id != 1 else ''}" 267 logger.write_info_to_report(f"{(cell.measured_volts - 0.08)*1000}") 268 self.assert_true(flag_name, (cell.measured_volts - 0.08) * 1000, error=CELL_VOLTAGE_ERROR_MV)
Base class for any watcher.
events: dict[str, list[Event]]
def
start(self, csv: bool = False):
93 def start(self, csv: bool = False): 94 """Start daemon thread.""" 95 self.events.clear() 96 self.daemon = threading.Thread(target=self.check_flags, daemon=True, args=(csv,)) 97 self.stop_event.clear() 98 self.daemon.start()
Start daemon thread.
def
stop(self):
100 def stop(self): 101 """Kill daemon thread.""" 102 if self.daemon and self.daemon.is_alive(): 103 self.stop_event.set() 104 self.daemon.join()
Kill daemon thread.
def
assert_true( self, flag: str, value: Any, event_count: int = 0, wait_time: float = 300, error: tuple[float, float] = (0, 0), log_active: bool = True):
106 def assert_true( 107 self, 108 flag: str, 109 value: Any, 110 event_count: int = 0, 111 wait_time: float = 300, 112 error: tuple[float, float] = (0, 0), 113 log_active: bool = True, 114 ): # FIXME(JA): Change time limit to frame limit (i.e. wait for x new frames) 115 """Wait some amount of seconds until the assertion is true, or raise AssertionError.""" 116 logger.write_info_to_report(f"Waiting for {'' if value else '!'}{flag}") 117 if self.daemon: 118 flag_events: list[Event] = [] 119 start_time = time.perf_counter() 120 old_packet_time = self.last_packet_timestamp - 2 # Only accept packets 2s after assertions 121 122 # Poll for some amount of seconds (waiting for at least one new packet within 5 minutes) 123 while ( 124 time.perf_counter() - start_time < wait_time 125 or (start_time > self.last_packet_timestamp and time.perf_counter() - start_time < 5.5 * 60) 126 or old_packet_time != self.last_packet_timestamp 127 ): 128 old_packet_time = self.last_packet_timestamp 129 flag_events = self.events.get(flag, []) 130 if len(flag_events) >= max(1, event_count) and ( 131 value - error[0] <= flag_events[event_count - 1].value <= value + error[1] 132 ): 133 if log_active and flag.startswith("flags."): 134 logger.write_result_to_html_report(f"{flag} was {flag_events[event_count - 1].value}") 135 is_permanent = flag.startswith("flags.permanent") 136 if (flag.startswith("flags.fault") or is_permanent) and event_count != 1: 137 self.assert_true("flags.measure_output_fets_disabled", value, wait_time=600) 138 if not value: 139 self.assert_measurements(not is_permanent, MeasurementMode.NO_FAULT) 140 elif plateset.charger_switch: 141 self.assert_measurements(not is_permanent, MeasurementMode.CHARGING_FAULT) 142 else: 143 self.assert_measurements(not is_permanent, MeasurementMode.DISCHARGING_FAULT) 144 145 return 146 147 if len(flag_events) == 0: 148 if log_active: 149 logger.write_result_to_html_report('<font color="#990000"> No data received </font>') 150 raise AssertionError("No data received.") 151 152 message = ( 153 f"{flag} was {flag_events[-1].value} with {len(flag_events)} event(s), expected " 154 f"{value} with {max(1, event_count)} event(s)" 155 ) 156 if log_active: 157 logger.write_result_to_html_report(f'<font color="#990000">{message}</font>') 158 raise AssertionError(f"{message}: {flag_events}")
Wait some amount of seconds until the assertion is true, or raise AssertionError.
def
assert_false( self, flag: str, value: Any, event_count: int = 0, wait_time: int = 60, error: tuple[float, float] = (0, 0)):
160 def assert_false( 161 self, 162 flag: str, 163 value: Any, 164 event_count: int = 0, 165 wait_time: int = 60, 166 error: tuple[float, float] = (0, 0), 167 ): 168 """Wait some amount of seconds until the assertion is false, or raise AssertionError.""" 169 try: 170 self.assert_true(flag, value, event_count, wait_time, error, log_active=False) 171 except AssertionError: 172 return 173 message = f"{flag} was {value} after {event_count} events." 174 logger.write_result_to_html_report(f'<font color="#990000>{message}</font>') 175 raise AssertionError(message)
Wait some amount of seconds until the assertion is false, or raise AssertionError.
def
assert_measurements( self, check_serial: bool = True, mode: MeasurementMode = <MeasurementMode.NO_FAULT: 1>):
177 def assert_measurements(self, check_serial: bool = True, mode: MeasurementMode = MeasurementMode.NO_FAULT): 178 """Verify voltage, current, and temperature match what was set.""" 179 global VOLTAGE_ERROR_MV 180 181 # Expected values 182 if mode is MeasurementMode.NO_FAULT: 183 expected_voltage_v = sum(cell.volts for cell in bms_hardware.cells.values()) 184 if len(bms_hardware.cells) == 0: 185 VOLTAGE_ERROR_MV = (4000, 4000) 186 expected_voltage_v = 13 187 expected_current_a = 0.0 188 if plateset.load_switch: 189 assert bms_hardware.load 190 expected_current_a = bms_hardware.load.target_amps 191 elif plateset.charger_switch: 192 assert bms_hardware.charger 193 expected_current_a = bms_hardware.charger.target_amps 194 elif mode is MeasurementMode.DISCHARGING_FAULT: 195 expected_voltage_v = 0.0 196 expected_current_a = 0.0 197 elif mode is MeasurementMode.CHARGING_FAULT: 198 assert bms_hardware.charger 199 expected_voltage_v = bms_hardware.charger.target_volts 200 expected_current_a = 0.0 201 logger.write_info_to_report(f"Expecting {expected_voltage_v} V, {expected_current_a} A") 202 203 # Measurement check 204 assert bms_hardware.dmm 205 206 if not (plateset.load_switch or plateset.charger_switch): 207 assert bms_hardware.load 208 with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000): 209 dmm_volts = bms_hardware.dmm.volts 210 else: 211 dmm_volts = bms_hardware.dmm.volts 212 213 logger.write_info_to_report( 214 f"Voltage: {expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000} V " 215 f"<= {dmm_volts} V <= {expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000} V" 216 ) 217 218 assert ( 219 expected_voltage_v - VOLTAGE_ERROR_MV[0] / 1000 220 <= dmm_volts 221 <= expected_voltage_v + VOLTAGE_ERROR_MV[1] / 1000 222 ) 223 224 assert bms_hardware.load and bms_hardware.charger 225 if plateset.load_switch or plateset.charger_switch: 226 if plateset.load_switch: 227 measured_current = -bms_hardware.load.amps 228 elif plateset.charger_switch: 229 measured_current = bms_hardware.charger.amps 230 logger.write_info_to_report( 231 f"Current: {expected_current_a - CURRENT_ERROR_MA[0] / 1000} A " 232 f"<= {measured_current} A <= {expected_current_a + CURRENT_ERROR_MA[1] / 1000} A" 233 ) 234 assert ( 235 expected_current_a - CURRENT_ERROR_MA[0] / 1000 236 <= measured_current 237 <= expected_current_a + CURRENT_ERROR_MA[1] / 1000 238 ) 239 240 # TODO(JA): Check cell sims and themisters with ADC plates 0 and 1 241 242 # Serial check 243 if check_serial: 244 245 if not (plateset.load_switch or plateset.charger_switch): 246 assert bms_hardware.load 247 with bms_hardware.load(0.0, DischargeType.CONSTANT_RESISTANCE, 50_000): 248 logger.write_info_to_report(f"{expected_voltage_v * 1000}") 249 self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV) 250 else: 251 logger.write_info_to_report(f"{expected_voltage_v * 1000}") 252 self.assert_true("mvolt_terminal", expected_voltage_v * 1000, error=VOLTAGE_ERROR_MV) 253 254 logger.write_info_to_report(f"{expected_current_a * 1000}") 255 self.assert_true("mamps", expected_current_a * 1000, error=CURRENT_ERROR_MA) 256 257 if len(bms_hardware.cells): 258 logger.write_info_to_report(f"{(plateset.thermistor1 + 273)*10}") 259 self.assert_true("dk_temp", (plateset.thermistor1 + 273) * 10, error=TEMPERATURE_ERROR_DK) 260 logger.write_info_to_report(f"{(plateset.thermistor2 + 273)*10}") 261 self.assert_true("dk_temp1", (plateset.thermistor2 + 273) * 10, error=TEMPERATURE_ERROR_DK) 262 263 for cell_id, cell in bms_hardware.cells.items(): 264 logger.write_info_to_report(f"{cell.measured_volts=}") 265 for cell_id, cell in bms_hardware.cells.items(): 266 flag_name = f"mvolt_cell{cell_id-1 if cell_id != 1 else ''}" 267 logger.write_info_to_report(f"{(cell.measured_volts - 0.08)*1000}") 268 self.assert_true(flag_name, (cell.measured_volts - 0.08) * 1000, error=CELL_VOLTAGE_ERROR_MV)
Verify voltage, current, and temperature match what was set.
271class SMBusWatcher(Watcher): 272 """Polls the SMBus for data, recording any updates as events.""" 273 274 def check_flags(self, csv: bool): 275 """Poll flags and update events.""" 276 while not self.stop_event.is_set(): 277 smbus_data = smbus.parse_smbus_data() 278 logger.write_debug_to_report(smbus_data) 279 smbus_time = time.perf_counter() 280 for flag, value in smbus_data.items(): 281 if self.events.get(flag) is None: 282 self.events[flag] = [Event(value, smbus_time, datetime.timedelta())] 283 elif self.events[flag][-1].value != value: # Flag changed 284 self.events[flag].append(Event(value, smbus_time, datetime.timedelta())) 285 logger.write_debug_to_report(f"Flag {flag}: {value}") 286 self.last_packet_timestamp = time.perf_counter()
Polls the SMBus for data, recording any updates as events.
def
check_flags(self, csv: bool):
274 def check_flags(self, csv: bool): 275 """Poll flags and update events.""" 276 while not self.stop_event.is_set(): 277 smbus_data = smbus.parse_smbus_data() 278 logger.write_debug_to_report(smbus_data) 279 smbus_time = time.perf_counter() 280 for flag, value in smbus_data.items(): 281 if self.events.get(flag) is None: 282 self.events[flag] = [Event(value, smbus_time, datetime.timedelta())] 283 elif self.events[flag][-1].value != value: # Flag changed 284 self.events[flag].append(Event(value, smbus_time, datetime.timedelta())) 285 logger.write_debug_to_report(f"Flag {flag}: {value}") 286 self.last_packet_timestamp = time.perf_counter()
Poll flags and update events.
Inherited Members
289class SerialWatcher(Watcher): 290 """Polls the serial connection for data, recording any updates as events.""" 291 292 def check_flags(self, csv: bool): 293 """Poll flags and update events.""" 294 start_time = time.perf_counter() 295 while not self.stop_event.is_set(): 296 serial_data = serial_monitor.read(latest=False) 297 if serial_data: 298 bms_seconds, bms_milliseconds = divmod(int(serial_data["timestamp"]), 1000) 299 bms_time = ( 300 datetime.datetime.fromtimestamp(bms_seconds, datetime.timezone.utc) 301 - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc) 302 + datetime.timedelta(milliseconds=bms_milliseconds) 303 ) 304 logger.write_debug_to_report(str(serial_data)) 305 serial_time = time.perf_counter() 306 for flag, value in serial_data.items(): 307 if self.events.get(flag) is None: 308 self.events[flag] = [Event(value, serial_time, bms_time)] 309 elif self.events[flag][-1].value != value: # Flag changed 310 self.events[flag].append(Event(value, serial_time, bms_time)) 311 if serial_data[flag] in (0, 1): 312 logger.write_debug_to_report(f"Flag {flag}: {value}") 313 self.last_packet_timestamp = time.perf_counter() 314 if csv: 315 elapsed_time = time.perf_counter() - start_time 316 latest_i = 0.0 317 if plateset.charger_switch: 318 assert bms_hardware.charger 319 latest_i = bms_hardware.charger.amps 320 elif plateset.load_switch: 321 assert bms_hardware.load 322 latest_i = bms_hardware.load.amps 323 bms_hardware.csv.cycle_smbus.record(elapsed_time, latest_i, None, 0, self.csv_state, serial_data)
Polls the serial connection for data, recording any updates as events.
def
check_flags(self, csv: bool):
292 def check_flags(self, csv: bool): 293 """Poll flags and update events.""" 294 start_time = time.perf_counter() 295 while not self.stop_event.is_set(): 296 serial_data = serial_monitor.read(latest=False) 297 if serial_data: 298 bms_seconds, bms_milliseconds = divmod(int(serial_data["timestamp"]), 1000) 299 bms_time = ( 300 datetime.datetime.fromtimestamp(bms_seconds, datetime.timezone.utc) 301 - datetime.datetime(2015, 1, 1, tzinfo=datetime.timezone.utc) 302 + datetime.timedelta(milliseconds=bms_milliseconds) 303 ) 304 logger.write_debug_to_report(str(serial_data)) 305 serial_time = time.perf_counter() 306 for flag, value in serial_data.items(): 307 if self.events.get(flag) is None: 308 self.events[flag] = [Event(value, serial_time, bms_time)] 309 elif self.events[flag][-1].value != value: # Flag changed 310 self.events[flag].append(Event(value, serial_time, bms_time)) 311 if serial_data[flag] in (0, 1): 312 logger.write_debug_to_report(f"Flag {flag}: {value}") 313 self.last_packet_timestamp = time.perf_counter() 314 if csv: 315 elapsed_time = time.perf_counter() - start_time 316 latest_i = 0.0 317 if plateset.charger_switch: 318 assert bms_hardware.charger 319 latest_i = bms_hardware.charger.amps 320 elif plateset.load_switch: 321 assert bms_hardware.load 322 latest_i = bms_hardware.load.amps 323 bms_hardware.csv.cycle_smbus.record(elapsed_time, latest_i, None, 0, self.csv_state, serial_data)
Poll flags and update events.