hitl_tester.modules.bms.csv_tables
Functions for the Open Circuit Voltage recording.
(c) 2020-2024 TurnAround Factor, Inc.
CUI DISTRIBUTION CONTROL Controlled by: DLA J68 R&D SBIP CUI Category: Small Business Research and Technology Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS POC: GOV SBIP Program Manager Denise Price, 571-767-0111 Distribution authorized to U.S. Government Agencies only, to protect information not owned by the U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, Fort Belvoir, VA 22060-6221
SBIR DATA RIGHTS Contract No.:SP4701-23-C-0083 Contractor Name: TurnAround Factor, Inc. Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 Expiration of SBIR Data Rights Period: September 24, 2029 The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause contained in the above identified contract. No restrictions apply after the expiration date shown above. Any reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce the markings.
1""" 2Functions for the Open Circuit Voltage recording. 3 4(c) 2020-2024 TurnAround Factor, Inc. 5 6CUI DISTRIBUTION CONTROL 7Controlled by: DLA J68 R&D SBIP 8CUI Category: Small Business Research and Technology 9Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS 10POC: GOV SBIP Program Manager Denise Price, 571-767-0111 11Distribution authorized to U.S. Government Agencies only, to protect information not owned by the 12U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that 13it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests 14for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, 15Fort Belvoir, VA 22060-6221 16 17SBIR DATA RIGHTS 18Contract No.:SP4701-23-C-0083 19Contractor Name: TurnAround Factor, Inc. 20Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 21Expiration of SBIR Data Rights Period: September 24, 2029 22The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer 23software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights 24in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause 25contained in the above identified contract. No restrictions apply after the expiration date shown above. Any 26reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce 27the markings. 28""" 29 30from __future__ import annotations 31 32import csv 33import inspect 34import math 35import time 36from abc import ABC 37from abc import abstractmethod 38from contextlib import suppress 39from pathlib import Path 40from typing import Any, cast, TYPE_CHECKING 41 42import pytest 43 44from hitl_tester.modules.bms.bms_serial import serial_monitor, VERSION 45from hitl_tester.modules.bms.adc_plate import ADCPlate 46from hitl_tester.modules.logger import logger 47from hitl_tester.modules.bms.smbus import SMBus 48from hitl_tester.modules.bms.smbus_types import SMBusError 49from hitl_tester.modules.bms_types import BMSFlags 50 51if TYPE_CHECKING: # Avoid circular import issue 52 from hitl_tester.modules.bms.bms_hw import BMSHardware 53 54 55class CSVBase(ABC): 56 """Base class for csv writers.""" 57 58 def __init__(self, bms_hardware: BMSHardware): 59 """Create the csv object.""" 60 self.bms_hardware = bms_hardware 61 self.filename: Path | None = None 62 self.postfix_fn = lambda: ... 63 64 @property 65 @abstractmethod 66 def header(self) -> list[str]: 67 """The csv header.""" 68 69 def create_file(self, prefix: str = "", postfix: str = "") -> Path: 70 """Create a csv file.""" 71 72 # Return file if it was already created 73 if self.filename or ( 74 hasattr(pytest, "flags") 75 and isinstance(pytest.flags, BMSFlags) 76 and (pytest.flags.doc_generation or pytest.flags.dry_run) 77 ): 78 return self.filename or Path(".") 79 80 # Generate filename 81 path = Path(self.bms_hardware.report_filename) 82 path = path.with_stem(f"{prefix}{path.stem}") 83 filename = type(self).__name__.lower() 84 self.filename = Path(f"{path}_{filename}{postfix}.csv") 85 86 # Create file 87 with open(self.filename, "w", encoding="utf-8") as csv_file: 88 csv.writer(csv_file).writerow(self.header) 89 return self.filename 90 91 @abstractmethod 92 def record(self): 93 """Generate row data.""" 94 95 def write_row(self, row: list[Any]): 96 """Write the row.""" 97 self.create_file() # Create the file if it does not exist 98 assert self.filename # Appease pylint 99 100 with open(self.filename, "a", encoding="utf-8") as csv_file: 101 csv.writer(csv_file).writerow(row) 102 103 self.postfix_fn() 104 105 106class OCV(CSVBase): 107 """Functions for the Open Circuit Voltage recording.""" 108 109 header = ["Temperature (C)", "Capacity (%)", "Voltage (V)", "Impedance (mΩ)", "Remaining Capacity (mAh)"] 110 111 def calculate_impedance(self, raw_voltage_data: list[float], current_a: float = 1.0) -> float: 112 """Calculate the impedance from the raw voltages.""" 113 for i, voltage in enumerate(raw_voltage_data): 114 if i > 0 and raw_voltage_data[i - 1] - voltage > 0.003: # If the voltage drops by at least 3 mV 115 # Impedance = (High voltage - 3rd low voltage) 116 return (raw_voltage_data[i - 1] - raw_voltage_data[i + 2]) * 1000 / current_a 117 return math.nan 118 119 def measure_impedance_data(self, pulse_current: float = 1.0, timestamps: bool = False) -> list[float]: 120 """Record the voltage drop with a current pulse.""" 121 assert self.bms_hardware.load and self.bms_hardware.dmm 122 self.bms_hardware.load.configure_pulse_trigger(pulse_current) 123 self.bms_hardware.dmm.configure_voltage_trigger(timestamps) 124 self.bms_hardware.load.enable() 125 self.bms_hardware.dmm.send_trigger() # Begin voltage measurements 126 self.bms_hardware.load.send_trigger() # Pulse the current 127 self.bms_hardware.load.wait_for_pulse() # Wait for load pulse to complete 128 self.bms_hardware.load.disable() 129 result = self.bms_hardware.dmm.read_internal_memory() # Read voltage measurements 130 self.bms_hardware.dmm.configure_voltage_normal() 131 return result 132 133 def record(self, remaining_capacity_ah: float = 0.0): 134 """Record the OCV.""" 135 logger.write_info_to_report("Recording OCV") 136 137 # FIXME(JA): determine if timestamps are useful or not 138 impedance = 0.0 139 impedance_timestamps = 0.0 140 assert self.bms_hardware.dmm 141 config = self.bms_hardware.dmm.impedance_config 142 for i in range(config.total_readings): 143 raw_voltage_data = self.measure_impedance_data(config.pulse_current) 144 raw_voltage_and_time = self.measure_impedance_data(config.pulse_current, timestamps=True) 145 146 # Cumulative average of impedance 147 impedance = (impedance * i + self.calculate_impedance(raw_voltage_data, config.pulse_current)) / (i + 1) 148 impedance_timestamps = ( 149 impedance_timestamps * i + self.calculate_impedance(raw_voltage_and_time, config.pulse_current) 150 ) / (i + 1) 151 152 # For debugging impedance calculations 153 self.bms_hardware.csv.raw_impedance.record(impedance, raw_voltage_data) 154 self.bms_hardware.csv.raw_impedance_time.create_file(postfix="_time") 155 self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time) # Both 156 self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[1::2]) # Time 157 self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[::2]) # Voltage 158 159 assert self.bms_hardware.dmm 160 self.write_row( 161 [ 162 self.bms_hardware.temperature, 163 self.bms_hardware.remaining_capacity_percentage, 164 self.bms_hardware.dmm.volts, 165 impedance, 166 remaining_capacity_ah * 1000, 167 ] 168 ) 169 170 171class Cycle(CSVBase): 172 """Functions for recording any run cycle.""" 173 174 header = [ 175 "Cycle", 176 "Initial Capacity (%)", 177 "Target Capacity (%)", 178 "Capacity (Ah)", 179 "Current (A)", 180 "Voltage (V)", 181 "Temperature (C)", 182 "Resistance (Ω)", 183 "Elapsed Time (s)", 184 ] 185 186 def record(self, elapsed_time: float = 0, current: float = 0, amp_hrs: float | None = None, resistance: float = 0): 187 """Record various cycle data.""" 188 assert self.bms_hardware.dmm 189 self.write_row( 190 [ 191 inspect.stack()[1].function, 192 self.bms_hardware.remaining_capacity_percentage, 193 self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100, 194 amp_hrs, 195 current, 196 self.bms_hardware.dmm.volts, 197 self.bms_hardware.temperature, 198 resistance, 199 elapsed_time, 200 ] 201 ) 202 203 204class CycleSMBus(CSVBase): 205 """Functions for recording any run cycle, as well as the smbus.""" 206 207 def __init__(self, bms_hardware: BMSHardware): 208 """Add variable to detect first run.""" 209 super().__init__(bms_hardware) 210 self.smbus = SMBus() 211 self.adc_plate = ADCPlate() 212 self._header_cache: list[str] = [] 213 self.last_row: dict[str, float | str | None] = {} 214 self.last_serial_data: dict[str, int | str] = {} 215 self.last_cell_data: dict[int, dict[str, int]] = {} 216 217 @property 218 def header(self) -> list[str]: 219 if self._header_cache: 220 return self._header_cache 221 222 header_text = [ 223 "Cycle", 224 "Initial Capacity (%)", 225 "Target Capacity (%)", 226 "HITL Capacity (Ah)", 227 "HITL Current (A)", 228 "HITL Voltage (V)", 229 "ADC Plate Terminal Voltage (V)", 230 "HITL Temperature (C)", 231 "HITL Resistance (Ω)", 232 "HITL Elapsed Time (s)", 233 ] 234 235 # Create dynamic headers 236 for cell_id in self.adc_plate.cell_ids: 237 header_text.append(f"ADC Plate Cell {cell_id} Voltage (V)") 238 239 for name in ("SOC (%)", "Volts (V)", "Measured Volts (V)", "Current (A)", "Ohms (Ω)"): 240 for i in sorted(self.bms_hardware.cells): 241 header_text.append(f"Cell Sim {i} {name}") 242 243 header_text.append("|SERIAL|") 244 header_text += serial_monitor.parse_serial_data(dict.fromkeys(serial_monitor.get_headers(), VERSION)) 245 246 header_text.append("|SMBUS|") 247 with suppress(SMBusError): 248 header_text += self.smbus.parse_smbus_data() 249 250 self._header_cache = header_text 251 return header_text 252 253 def record( 254 self, 255 elapsed_time: float = 0, 256 current: float = 0, 257 amp_hrs: float | None = None, 258 resistance: float = 0, 259 state: str = "", 260 serial_data: dict[str, int | str] | None = None, 261 suppress_smbus: bool = False, 262 ): 263 """Record various cycle data.""" 264 265 # Get cell simulator data 266 sim_data = [] 267 self.last_cell_data = {} 268 for attr in ("state_of_charge", "volts", "measured_volts", "amps", "ohms"): 269 for cell in self.bms_hardware.cells.values(): 270 self.last_cell_data[cell.id] = self.last_cell_data.get(cell.id, {}) 271 self.last_cell_data[cell.id][attr] = getattr(cell, attr) 272 sim_data.append( 273 f"{self.last_cell_data[cell.id][attr]:.2%}" 274 if attr == "state_of_charge" 275 else str(self.last_cell_data[cell.id][attr]) 276 ) 277 278 # Get serial data 279 if serial_data is None: 280 serial_data = serial_monitor.read() 281 assert serial_data is not None 282 self.last_serial_data = serial_data 283 parsed_serial_data = serial_monitor.parse_serial_data(serial_data) 284 285 # Get smbus data 286 try: 287 parsed_smbus_data = self.smbus.parse_smbus_data() 288 except SMBusError as e: 289 parsed_smbus_data = {} 290 if not suppress_smbus: 291 logger.write_error_to_report(str(e)) 292 293 assert self.bms_hardware.dmm 294 row = [ 295 state or inspect.stack()[1].function, 296 self.bms_hardware.remaining_capacity_percentage, 297 self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100, 298 amp_hrs, 299 current, 300 self.bms_hardware.dmm.volts, 301 self.adc_plate.terminal_volts, 302 self.bms_hardware.temperature, 303 resistance, 304 elapsed_time, 305 *[self.adc_plate.cell_volts(cell_id) for cell_id in self.adc_plate.cell_ids], 306 *sim_data, 307 None, # Spacing 308 *parsed_serial_data.values(), 309 None, # Spacing 310 *parsed_smbus_data.values(), 311 ] 312 self.last_row = dict(zip(self.header, cast(list[str | float | None], row))) 313 self.write_row(row) 314 315 316class RawImpedance(CSVBase): 317 """Functions for recording the Impedance.""" 318 319 header = ["Capacity (%)", "Impedance (mΩ)", "Raw Voltages (V)"] 320 321 def record(self, impedance: float = math.nan, raw_voltage_data: list[float] | None = None): 322 """Record the raw impedance data.""" 323 logger.write_info_to_report("Recording Raw Impedance") 324 325 if raw_voltage_data is None: 326 raw_voltage_data = [] 327 self.write_row( 328 [ 329 self.bms_hardware.remaining_capacity_percentage, 330 impedance, 331 *raw_voltage_data, 332 ] 333 ) 334 335 336class RawVoltages(CSVBase): 337 """Functions for recording the voltages.""" 338 339 header = ["Elapsed time (s)", "Measurement time (s)", "Raw Voltages (V)"] 340 341 def record(self, elapsed_time: float = 0, measurement_time: float = 0, raw_voltage_data: list[float] | None = None): 342 """Record the raw voltage data.""" 343 logger.write_info_to_report("Recording Raw Voltages") 344 345 if raw_voltage_data is None: 346 raw_voltage_data = [] 347 self.write_row( 348 [ 349 elapsed_time, 350 measurement_time, 351 *raw_voltage_data, 352 ] 353 ) 354 355 356class RawVoltagesAverage(CSVBase): 357 """Functions for recording the voltages.""" 358 359 header = [ 360 "Elapsed time (s)", 361 "Measurement time (s)", 362 "Average Voltage (V)", 363 "Max Voltage (V)", 364 "Min Voltage (V)", 365 "Total Measurements", 366 ] 367 368 def record( 369 self, 370 elapsed_time: float = 0, 371 measurement_time: float = 0, 372 average_volts: float = 0, 373 max_volts: float = 0, 374 min_volts: float = 0, 375 measurement_count: float = 0, 376 ): 377 """Record the raw voltage data.""" 378 logger.write_info_to_report("Recording Raw Voltages") 379 380 self.write_row( 381 [ 382 elapsed_time, 383 measurement_time, 384 average_volts, 385 max_volts, 386 min_volts, 387 measurement_count, 388 ] 389 ) 390 391 392class NiCdQC(CSVBase): 393 """Functions for recording a NiCd cell during QC.""" 394 395 header = [ 396 "Elapsed time (s)", 397 "Elapsed time discharge (s)", 398 "Board", 399 "Channel", 400 "Voltage (V)", 401 "State", 402 "1.21V Time (s)", 403 "1.20V Time (s)", 404 ] 405 406 def record( 407 self, 408 elapsed_time: float = 0, 409 elapsed_time_discharge: float = 0, 410 board: int = 0, 411 channel: int = 0, 412 voltage: float = 0.0, 413 state: Any = None, 414 time_121: float = 0.0, 415 time_120: float = 0.0, 416 ): 417 """Record the NiCd cell data.""" 418 self.write_row([elapsed_time, elapsed_time_discharge, board, channel, voltage, state, time_121, time_120]) 419 420 421class DMMAC(CSVBase): 422 """Functions for recording the DMM AC current.""" 423 424 header = [ 425 "Elapsed time (s)", 426 "AC Current (A)", 427 ] 428 429 def record(self, elapsed_time: float = 0, current: float = 0): 430 """Record the DMM AC current.""" 431 self.write_row([elapsed_time, current]) 432 433 434class NiCdKorad(CSVBase): 435 """Functions for recording the DMM AC current.""" 436 437 header = [ 438 "Elapsed time (s)", 439 "Status flags", 440 "Current (A)", 441 "Voltage (V)", 442 ] 443 444 def record(self, elapsed_time: float = 0, status_flags: str = "", current: float = 0, voltage: float = 0): 445 """Record the DMM AC current.""" 446 self.write_row([elapsed_time, status_flags, current, voltage]) 447 448 449class Thermo(CSVBase): 450 """Functions for recording the thermocouples.""" 451 452 @property 453 def header(self) -> list[str]: 454 header_list: list[str] = ["Elapsed Time (s)", "Seconds since epoch (s)"] 455 header_list.extend(f"Channel {thermocouple_id}" for thermocouple_id in self.bms_hardware.thermo_couples) 456 return header_list 457 458 def record(self, elapsed_time: float = 0): 459 """Record the DMM AC current.""" 460 temps: list[float] = [] 461 for thermocouple_id, thermocouple in self.bms_hardware.thermo_couples.items(): 462 temps.append(thermocouple.temperature) 463 logger.write_debug_to_report(f"Thermocouple channel {thermocouple_id}: {temps[-1]} °C") 464 self.write_row([elapsed_time, time.time(), *temps]) 465 466 467class CSVRecorders: 468 """Access to CSV recorders.""" 469 470 def __init__(self, bms_hardware: BMSHardware): 471 """Initialize CSV recorders.""" 472 self.ocv = OCV(bms_hardware) 473 self.cycle: CSVBase = Cycle(bms_hardware) 474 self.raw_voltage = RawVoltages(bms_hardware) 475 self.raw_voltage_average = RawVoltagesAverage(bms_hardware) 476 self.cycle_smbus = CycleSMBus(bms_hardware) 477 self.nicd = NiCdQC(bms_hardware) 478 self.dmm_ac = DMMAC(bms_hardware) 479 self.nicd_korad = NiCdKorad(bms_hardware) 480 self.raw_impedance = RawImpedance(bms_hardware) 481 self.raw_impedance_time = RawImpedance(bms_hardware) 482 self.thermo = Thermo(bms_hardware) 483 484 # This can be done dynamically, but of course the type checker really doesn't like it 485 # for csv_child in CSVBase.__subclasses__(): 486 # if not inspect.isabstract(csv_child): # Child is confirmed concrete 487 # setattr(self, csv_child.__name__.lower(), csv_child(bms_hardware)) # type: ignore 488 489 # def __getattr__(self, name): 490 # """Called for undefined attributes.""" 491 # raise AttributeError(f"'{self.__name__}' object has no attribute '{name}'")
56class CSVBase(ABC): 57 """Base class for csv writers.""" 58 59 def __init__(self, bms_hardware: BMSHardware): 60 """Create the csv object.""" 61 self.bms_hardware = bms_hardware 62 self.filename: Path | None = None 63 self.postfix_fn = lambda: ... 64 65 @property 66 @abstractmethod 67 def header(self) -> list[str]: 68 """The csv header.""" 69 70 def create_file(self, prefix: str = "", postfix: str = "") -> Path: 71 """Create a csv file.""" 72 73 # Return file if it was already created 74 if self.filename or ( 75 hasattr(pytest, "flags") 76 and isinstance(pytest.flags, BMSFlags) 77 and (pytest.flags.doc_generation or pytest.flags.dry_run) 78 ): 79 return self.filename or Path(".") 80 81 # Generate filename 82 path = Path(self.bms_hardware.report_filename) 83 path = path.with_stem(f"{prefix}{path.stem}") 84 filename = type(self).__name__.lower() 85 self.filename = Path(f"{path}_{filename}{postfix}.csv") 86 87 # Create file 88 with open(self.filename, "w", encoding="utf-8") as csv_file: 89 csv.writer(csv_file).writerow(self.header) 90 return self.filename 91 92 @abstractmethod 93 def record(self): 94 """Generate row data.""" 95 96 def write_row(self, row: list[Any]): 97 """Write the row.""" 98 self.create_file() # Create the file if it does not exist 99 assert self.filename # Appease pylint 100 101 with open(self.filename, "a", encoding="utf-8") as csv_file: 102 csv.writer(csv_file).writerow(row) 103 104 self.postfix_fn()
Base class for csv writers.
59 def __init__(self, bms_hardware: BMSHardware): 60 """Create the csv object.""" 61 self.bms_hardware = bms_hardware 62 self.filename: Path | None = None 63 self.postfix_fn = lambda: ...
Create the csv object.
70 def create_file(self, prefix: str = "", postfix: str = "") -> Path: 71 """Create a csv file.""" 72 73 # Return file if it was already created 74 if self.filename or ( 75 hasattr(pytest, "flags") 76 and isinstance(pytest.flags, BMSFlags) 77 and (pytest.flags.doc_generation or pytest.flags.dry_run) 78 ): 79 return self.filename or Path(".") 80 81 # Generate filename 82 path = Path(self.bms_hardware.report_filename) 83 path = path.with_stem(f"{prefix}{path.stem}") 84 filename = type(self).__name__.lower() 85 self.filename = Path(f"{path}_{filename}{postfix}.csv") 86 87 # Create file 88 with open(self.filename, "w", encoding="utf-8") as csv_file: 89 csv.writer(csv_file).writerow(self.header) 90 return self.filename
Create a csv file.
96 def write_row(self, row: list[Any]): 97 """Write the row.""" 98 self.create_file() # Create the file if it does not exist 99 assert self.filename # Appease pylint 100 101 with open(self.filename, "a", encoding="utf-8") as csv_file: 102 csv.writer(csv_file).writerow(row) 103 104 self.postfix_fn()
Write the row.
107class OCV(CSVBase): 108 """Functions for the Open Circuit Voltage recording.""" 109 110 header = ["Temperature (C)", "Capacity (%)", "Voltage (V)", "Impedance (mΩ)", "Remaining Capacity (mAh)"] 111 112 def calculate_impedance(self, raw_voltage_data: list[float], current_a: float = 1.0) -> float: 113 """Calculate the impedance from the raw voltages.""" 114 for i, voltage in enumerate(raw_voltage_data): 115 if i > 0 and raw_voltage_data[i - 1] - voltage > 0.003: # If the voltage drops by at least 3 mV 116 # Impedance = (High voltage - 3rd low voltage) 117 return (raw_voltage_data[i - 1] - raw_voltage_data[i + 2]) * 1000 / current_a 118 return math.nan 119 120 def measure_impedance_data(self, pulse_current: float = 1.0, timestamps: bool = False) -> list[float]: 121 """Record the voltage drop with a current pulse.""" 122 assert self.bms_hardware.load and self.bms_hardware.dmm 123 self.bms_hardware.load.configure_pulse_trigger(pulse_current) 124 self.bms_hardware.dmm.configure_voltage_trigger(timestamps) 125 self.bms_hardware.load.enable() 126 self.bms_hardware.dmm.send_trigger() # Begin voltage measurements 127 self.bms_hardware.load.send_trigger() # Pulse the current 128 self.bms_hardware.load.wait_for_pulse() # Wait for load pulse to complete 129 self.bms_hardware.load.disable() 130 result = self.bms_hardware.dmm.read_internal_memory() # Read voltage measurements 131 self.bms_hardware.dmm.configure_voltage_normal() 132 return result 133 134 def record(self, remaining_capacity_ah: float = 0.0): 135 """Record the OCV.""" 136 logger.write_info_to_report("Recording OCV") 137 138 # FIXME(JA): determine if timestamps are useful or not 139 impedance = 0.0 140 impedance_timestamps = 0.0 141 assert self.bms_hardware.dmm 142 config = self.bms_hardware.dmm.impedance_config 143 for i in range(config.total_readings): 144 raw_voltage_data = self.measure_impedance_data(config.pulse_current) 145 raw_voltage_and_time = self.measure_impedance_data(config.pulse_current, timestamps=True) 146 147 # Cumulative average of impedance 148 impedance = (impedance * i + self.calculate_impedance(raw_voltage_data, config.pulse_current)) / (i + 1) 149 impedance_timestamps = ( 150 impedance_timestamps * i + self.calculate_impedance(raw_voltage_and_time, config.pulse_current) 151 ) / (i + 1) 152 153 # For debugging impedance calculations 154 self.bms_hardware.csv.raw_impedance.record(impedance, raw_voltage_data) 155 self.bms_hardware.csv.raw_impedance_time.create_file(postfix="_time") 156 self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time) # Both 157 self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[1::2]) # Time 158 self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[::2]) # Voltage 159 160 assert self.bms_hardware.dmm 161 self.write_row( 162 [ 163 self.bms_hardware.temperature, 164 self.bms_hardware.remaining_capacity_percentage, 165 self.bms_hardware.dmm.volts, 166 impedance, 167 remaining_capacity_ah * 1000, 168 ] 169 )
Functions for the Open Circuit Voltage recording.
The csv header.
112 def calculate_impedance(self, raw_voltage_data: list[float], current_a: float = 1.0) -> float: 113 """Calculate the impedance from the raw voltages.""" 114 for i, voltage in enumerate(raw_voltage_data): 115 if i > 0 and raw_voltage_data[i - 1] - voltage > 0.003: # If the voltage drops by at least 3 mV 116 # Impedance = (High voltage - 3rd low voltage) 117 return (raw_voltage_data[i - 1] - raw_voltage_data[i + 2]) * 1000 / current_a 118 return math.nan
Calculate the impedance from the raw voltages.
120 def measure_impedance_data(self, pulse_current: float = 1.0, timestamps: bool = False) -> list[float]: 121 """Record the voltage drop with a current pulse.""" 122 assert self.bms_hardware.load and self.bms_hardware.dmm 123 self.bms_hardware.load.configure_pulse_trigger(pulse_current) 124 self.bms_hardware.dmm.configure_voltage_trigger(timestamps) 125 self.bms_hardware.load.enable() 126 self.bms_hardware.dmm.send_trigger() # Begin voltage measurements 127 self.bms_hardware.load.send_trigger() # Pulse the current 128 self.bms_hardware.load.wait_for_pulse() # Wait for load pulse to complete 129 self.bms_hardware.load.disable() 130 result = self.bms_hardware.dmm.read_internal_memory() # Read voltage measurements 131 self.bms_hardware.dmm.configure_voltage_normal() 132 return result
Record the voltage drop with a current pulse.
134 def record(self, remaining_capacity_ah: float = 0.0): 135 """Record the OCV.""" 136 logger.write_info_to_report("Recording OCV") 137 138 # FIXME(JA): determine if timestamps are useful or not 139 impedance = 0.0 140 impedance_timestamps = 0.0 141 assert self.bms_hardware.dmm 142 config = self.bms_hardware.dmm.impedance_config 143 for i in range(config.total_readings): 144 raw_voltage_data = self.measure_impedance_data(config.pulse_current) 145 raw_voltage_and_time = self.measure_impedance_data(config.pulse_current, timestamps=True) 146 147 # Cumulative average of impedance 148 impedance = (impedance * i + self.calculate_impedance(raw_voltage_data, config.pulse_current)) / (i + 1) 149 impedance_timestamps = ( 150 impedance_timestamps * i + self.calculate_impedance(raw_voltage_and_time, config.pulse_current) 151 ) / (i + 1) 152 153 # For debugging impedance calculations 154 self.bms_hardware.csv.raw_impedance.record(impedance, raw_voltage_data) 155 self.bms_hardware.csv.raw_impedance_time.create_file(postfix="_time") 156 self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time) # Both 157 self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[1::2]) # Time 158 self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[::2]) # Voltage 159 160 assert self.bms_hardware.dmm 161 self.write_row( 162 [ 163 self.bms_hardware.temperature, 164 self.bms_hardware.remaining_capacity_percentage, 165 self.bms_hardware.dmm.volts, 166 impedance, 167 remaining_capacity_ah * 1000, 168 ] 169 )
Record the OCV.
Inherited Members
172class Cycle(CSVBase): 173 """Functions for recording any run cycle.""" 174 175 header = [ 176 "Cycle", 177 "Initial Capacity (%)", 178 "Target Capacity (%)", 179 "Capacity (Ah)", 180 "Current (A)", 181 "Voltage (V)", 182 "Temperature (C)", 183 "Resistance (Ω)", 184 "Elapsed Time (s)", 185 ] 186 187 def record(self, elapsed_time: float = 0, current: float = 0, amp_hrs: float | None = None, resistance: float = 0): 188 """Record various cycle data.""" 189 assert self.bms_hardware.dmm 190 self.write_row( 191 [ 192 inspect.stack()[1].function, 193 self.bms_hardware.remaining_capacity_percentage, 194 self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100, 195 amp_hrs, 196 current, 197 self.bms_hardware.dmm.volts, 198 self.bms_hardware.temperature, 199 resistance, 200 elapsed_time, 201 ] 202 )
Functions for recording any run cycle.
The csv header.
187 def record(self, elapsed_time: float = 0, current: float = 0, amp_hrs: float | None = None, resistance: float = 0): 188 """Record various cycle data.""" 189 assert self.bms_hardware.dmm 190 self.write_row( 191 [ 192 inspect.stack()[1].function, 193 self.bms_hardware.remaining_capacity_percentage, 194 self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100, 195 amp_hrs, 196 current, 197 self.bms_hardware.dmm.volts, 198 self.bms_hardware.temperature, 199 resistance, 200 elapsed_time, 201 ] 202 )
Record various cycle data.
Inherited Members
205class CycleSMBus(CSVBase): 206 """Functions for recording any run cycle, as well as the smbus.""" 207 208 def __init__(self, bms_hardware: BMSHardware): 209 """Add variable to detect first run.""" 210 super().__init__(bms_hardware) 211 self.smbus = SMBus() 212 self.adc_plate = ADCPlate() 213 self._header_cache: list[str] = [] 214 self.last_row: dict[str, float | str | None] = {} 215 self.last_serial_data: dict[str, int | str] = {} 216 self.last_cell_data: dict[int, dict[str, int]] = {} 217 218 @property 219 def header(self) -> list[str]: 220 if self._header_cache: 221 return self._header_cache 222 223 header_text = [ 224 "Cycle", 225 "Initial Capacity (%)", 226 "Target Capacity (%)", 227 "HITL Capacity (Ah)", 228 "HITL Current (A)", 229 "HITL Voltage (V)", 230 "ADC Plate Terminal Voltage (V)", 231 "HITL Temperature (C)", 232 "HITL Resistance (Ω)", 233 "HITL Elapsed Time (s)", 234 ] 235 236 # Create dynamic headers 237 for cell_id in self.adc_plate.cell_ids: 238 header_text.append(f"ADC Plate Cell {cell_id} Voltage (V)") 239 240 for name in ("SOC (%)", "Volts (V)", "Measured Volts (V)", "Current (A)", "Ohms (Ω)"): 241 for i in sorted(self.bms_hardware.cells): 242 header_text.append(f"Cell Sim {i} {name}") 243 244 header_text.append("|SERIAL|") 245 header_text += serial_monitor.parse_serial_data(dict.fromkeys(serial_monitor.get_headers(), VERSION)) 246 247 header_text.append("|SMBUS|") 248 with suppress(SMBusError): 249 header_text += self.smbus.parse_smbus_data() 250 251 self._header_cache = header_text 252 return header_text 253 254 def record( 255 self, 256 elapsed_time: float = 0, 257 current: float = 0, 258 amp_hrs: float | None = None, 259 resistance: float = 0, 260 state: str = "", 261 serial_data: dict[str, int | str] | None = None, 262 suppress_smbus: bool = False, 263 ): 264 """Record various cycle data.""" 265 266 # Get cell simulator data 267 sim_data = [] 268 self.last_cell_data = {} 269 for attr in ("state_of_charge", "volts", "measured_volts", "amps", "ohms"): 270 for cell in self.bms_hardware.cells.values(): 271 self.last_cell_data[cell.id] = self.last_cell_data.get(cell.id, {}) 272 self.last_cell_data[cell.id][attr] = getattr(cell, attr) 273 sim_data.append( 274 f"{self.last_cell_data[cell.id][attr]:.2%}" 275 if attr == "state_of_charge" 276 else str(self.last_cell_data[cell.id][attr]) 277 ) 278 279 # Get serial data 280 if serial_data is None: 281 serial_data = serial_monitor.read() 282 assert serial_data is not None 283 self.last_serial_data = serial_data 284 parsed_serial_data = serial_monitor.parse_serial_data(serial_data) 285 286 # Get smbus data 287 try: 288 parsed_smbus_data = self.smbus.parse_smbus_data() 289 except SMBusError as e: 290 parsed_smbus_data = {} 291 if not suppress_smbus: 292 logger.write_error_to_report(str(e)) 293 294 assert self.bms_hardware.dmm 295 row = [ 296 state or inspect.stack()[1].function, 297 self.bms_hardware.remaining_capacity_percentage, 298 self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100, 299 amp_hrs, 300 current, 301 self.bms_hardware.dmm.volts, 302 self.adc_plate.terminal_volts, 303 self.bms_hardware.temperature, 304 resistance, 305 elapsed_time, 306 *[self.adc_plate.cell_volts(cell_id) for cell_id in self.adc_plate.cell_ids], 307 *sim_data, 308 None, # Spacing 309 *parsed_serial_data.values(), 310 None, # Spacing 311 *parsed_smbus_data.values(), 312 ] 313 self.last_row = dict(zip(self.header, cast(list[str | float | None], row))) 314 self.write_row(row)
Functions for recording any run cycle, as well as the smbus.
208 def __init__(self, bms_hardware: BMSHardware): 209 """Add variable to detect first run.""" 210 super().__init__(bms_hardware) 211 self.smbus = SMBus() 212 self.adc_plate = ADCPlate() 213 self._header_cache: list[str] = [] 214 self.last_row: dict[str, float | str | None] = {} 215 self.last_serial_data: dict[str, int | str] = {} 216 self.last_cell_data: dict[int, dict[str, int]] = {}
Add variable to detect first run.
218 @property 219 def header(self) -> list[str]: 220 if self._header_cache: 221 return self._header_cache 222 223 header_text = [ 224 "Cycle", 225 "Initial Capacity (%)", 226 "Target Capacity (%)", 227 "HITL Capacity (Ah)", 228 "HITL Current (A)", 229 "HITL Voltage (V)", 230 "ADC Plate Terminal Voltage (V)", 231 "HITL Temperature (C)", 232 "HITL Resistance (Ω)", 233 "HITL Elapsed Time (s)", 234 ] 235 236 # Create dynamic headers 237 for cell_id in self.adc_plate.cell_ids: 238 header_text.append(f"ADC Plate Cell {cell_id} Voltage (V)") 239 240 for name in ("SOC (%)", "Volts (V)", "Measured Volts (V)", "Current (A)", "Ohms (Ω)"): 241 for i in sorted(self.bms_hardware.cells): 242 header_text.append(f"Cell Sim {i} {name}") 243 244 header_text.append("|SERIAL|") 245 header_text += serial_monitor.parse_serial_data(dict.fromkeys(serial_monitor.get_headers(), VERSION)) 246 247 header_text.append("|SMBUS|") 248 with suppress(SMBusError): 249 header_text += self.smbus.parse_smbus_data() 250 251 self._header_cache = header_text 252 return header_text
The csv header.
254 def record( 255 self, 256 elapsed_time: float = 0, 257 current: float = 0, 258 amp_hrs: float | None = None, 259 resistance: float = 0, 260 state: str = "", 261 serial_data: dict[str, int | str] | None = None, 262 suppress_smbus: bool = False, 263 ): 264 """Record various cycle data.""" 265 266 # Get cell simulator data 267 sim_data = [] 268 self.last_cell_data = {} 269 for attr in ("state_of_charge", "volts", "measured_volts", "amps", "ohms"): 270 for cell in self.bms_hardware.cells.values(): 271 self.last_cell_data[cell.id] = self.last_cell_data.get(cell.id, {}) 272 self.last_cell_data[cell.id][attr] = getattr(cell, attr) 273 sim_data.append( 274 f"{self.last_cell_data[cell.id][attr]:.2%}" 275 if attr == "state_of_charge" 276 else str(self.last_cell_data[cell.id][attr]) 277 ) 278 279 # Get serial data 280 if serial_data is None: 281 serial_data = serial_monitor.read() 282 assert serial_data is not None 283 self.last_serial_data = serial_data 284 parsed_serial_data = serial_monitor.parse_serial_data(serial_data) 285 286 # Get smbus data 287 try: 288 parsed_smbus_data = self.smbus.parse_smbus_data() 289 except SMBusError as e: 290 parsed_smbus_data = {} 291 if not suppress_smbus: 292 logger.write_error_to_report(str(e)) 293 294 assert self.bms_hardware.dmm 295 row = [ 296 state or inspect.stack()[1].function, 297 self.bms_hardware.remaining_capacity_percentage, 298 self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100, 299 amp_hrs, 300 current, 301 self.bms_hardware.dmm.volts, 302 self.adc_plate.terminal_volts, 303 self.bms_hardware.temperature, 304 resistance, 305 elapsed_time, 306 *[self.adc_plate.cell_volts(cell_id) for cell_id in self.adc_plate.cell_ids], 307 *sim_data, 308 None, # Spacing 309 *parsed_serial_data.values(), 310 None, # Spacing 311 *parsed_smbus_data.values(), 312 ] 313 self.last_row = dict(zip(self.header, cast(list[str | float | None], row))) 314 self.write_row(row)
Record various cycle data.
Inherited Members
317class RawImpedance(CSVBase): 318 """Functions for recording the Impedance.""" 319 320 header = ["Capacity (%)", "Impedance (mΩ)", "Raw Voltages (V)"] 321 322 def record(self, impedance: float = math.nan, raw_voltage_data: list[float] | None = None): 323 """Record the raw impedance data.""" 324 logger.write_info_to_report("Recording Raw Impedance") 325 326 if raw_voltage_data is None: 327 raw_voltage_data = [] 328 self.write_row( 329 [ 330 self.bms_hardware.remaining_capacity_percentage, 331 impedance, 332 *raw_voltage_data, 333 ] 334 )
Functions for recording the Impedance.
322 def record(self, impedance: float = math.nan, raw_voltage_data: list[float] | None = None): 323 """Record the raw impedance data.""" 324 logger.write_info_to_report("Recording Raw Impedance") 325 326 if raw_voltage_data is None: 327 raw_voltage_data = [] 328 self.write_row( 329 [ 330 self.bms_hardware.remaining_capacity_percentage, 331 impedance, 332 *raw_voltage_data, 333 ] 334 )
Record the raw impedance data.
Inherited Members
337class RawVoltages(CSVBase): 338 """Functions for recording the voltages.""" 339 340 header = ["Elapsed time (s)", "Measurement time (s)", "Raw Voltages (V)"] 341 342 def record(self, elapsed_time: float = 0, measurement_time: float = 0, raw_voltage_data: list[float] | None = None): 343 """Record the raw voltage data.""" 344 logger.write_info_to_report("Recording Raw Voltages") 345 346 if raw_voltage_data is None: 347 raw_voltage_data = [] 348 self.write_row( 349 [ 350 elapsed_time, 351 measurement_time, 352 *raw_voltage_data, 353 ] 354 )
Functions for recording the voltages.
342 def record(self, elapsed_time: float = 0, measurement_time: float = 0, raw_voltage_data: list[float] | None = None): 343 """Record the raw voltage data.""" 344 logger.write_info_to_report("Recording Raw Voltages") 345 346 if raw_voltage_data is None: 347 raw_voltage_data = [] 348 self.write_row( 349 [ 350 elapsed_time, 351 measurement_time, 352 *raw_voltage_data, 353 ] 354 )
Record the raw voltage data.
Inherited Members
357class RawVoltagesAverage(CSVBase): 358 """Functions for recording the voltages.""" 359 360 header = [ 361 "Elapsed time (s)", 362 "Measurement time (s)", 363 "Average Voltage (V)", 364 "Max Voltage (V)", 365 "Min Voltage (V)", 366 "Total Measurements", 367 ] 368 369 def record( 370 self, 371 elapsed_time: float = 0, 372 measurement_time: float = 0, 373 average_volts: float = 0, 374 max_volts: float = 0, 375 min_volts: float = 0, 376 measurement_count: float = 0, 377 ): 378 """Record the raw voltage data.""" 379 logger.write_info_to_report("Recording Raw Voltages") 380 381 self.write_row( 382 [ 383 elapsed_time, 384 measurement_time, 385 average_volts, 386 max_volts, 387 min_volts, 388 measurement_count, 389 ] 390 )
Functions for recording the voltages.
The csv header.
369 def record( 370 self, 371 elapsed_time: float = 0, 372 measurement_time: float = 0, 373 average_volts: float = 0, 374 max_volts: float = 0, 375 min_volts: float = 0, 376 measurement_count: float = 0, 377 ): 378 """Record the raw voltage data.""" 379 logger.write_info_to_report("Recording Raw Voltages") 380 381 self.write_row( 382 [ 383 elapsed_time, 384 measurement_time, 385 average_volts, 386 max_volts, 387 min_volts, 388 measurement_count, 389 ] 390 )
Record the raw voltage data.
Inherited Members
393class NiCdQC(CSVBase): 394 """Functions for recording a NiCd cell during QC.""" 395 396 header = [ 397 "Elapsed time (s)", 398 "Elapsed time discharge (s)", 399 "Board", 400 "Channel", 401 "Voltage (V)", 402 "State", 403 "1.21V Time (s)", 404 "1.20V Time (s)", 405 ] 406 407 def record( 408 self, 409 elapsed_time: float = 0, 410 elapsed_time_discharge: float = 0, 411 board: int = 0, 412 channel: int = 0, 413 voltage: float = 0.0, 414 state: Any = None, 415 time_121: float = 0.0, 416 time_120: float = 0.0, 417 ): 418 """Record the NiCd cell data.""" 419 self.write_row([elapsed_time, elapsed_time_discharge, board, channel, voltage, state, time_121, time_120])
Functions for recording a NiCd cell during QC.
The csv header.
407 def record( 408 self, 409 elapsed_time: float = 0, 410 elapsed_time_discharge: float = 0, 411 board: int = 0, 412 channel: int = 0, 413 voltage: float = 0.0, 414 state: Any = None, 415 time_121: float = 0.0, 416 time_120: float = 0.0, 417 ): 418 """Record the NiCd cell data.""" 419 self.write_row([elapsed_time, elapsed_time_discharge, board, channel, voltage, state, time_121, time_120])
Record the NiCd cell data.
Inherited Members
422class DMMAC(CSVBase): 423 """Functions for recording the DMM AC current.""" 424 425 header = [ 426 "Elapsed time (s)", 427 "AC Current (A)", 428 ] 429 430 def record(self, elapsed_time: float = 0, current: float = 0): 431 """Record the DMM AC current.""" 432 self.write_row([elapsed_time, current])
Functions for recording the DMM AC current.
430 def record(self, elapsed_time: float = 0, current: float = 0): 431 """Record the DMM AC current.""" 432 self.write_row([elapsed_time, current])
Record the DMM AC current.
Inherited Members
435class NiCdKorad(CSVBase): 436 """Functions for recording the DMM AC current.""" 437 438 header = [ 439 "Elapsed time (s)", 440 "Status flags", 441 "Current (A)", 442 "Voltage (V)", 443 ] 444 445 def record(self, elapsed_time: float = 0, status_flags: str = "", current: float = 0, voltage: float = 0): 446 """Record the DMM AC current.""" 447 self.write_row([elapsed_time, status_flags, current, voltage])
Functions for recording the DMM AC current.
445 def record(self, elapsed_time: float = 0, status_flags: str = "", current: float = 0, voltage: float = 0): 446 """Record the DMM AC current.""" 447 self.write_row([elapsed_time, status_flags, current, voltage])
Record the DMM AC current.
Inherited Members
450class Thermo(CSVBase): 451 """Functions for recording the thermocouples.""" 452 453 @property 454 def header(self) -> list[str]: 455 header_list: list[str] = ["Elapsed Time (s)", "Seconds since epoch (s)"] 456 header_list.extend(f"Channel {thermocouple_id}" for thermocouple_id in self.bms_hardware.thermo_couples) 457 return header_list 458 459 def record(self, elapsed_time: float = 0): 460 """Record the DMM AC current.""" 461 temps: list[float] = [] 462 for thermocouple_id, thermocouple in self.bms_hardware.thermo_couples.items(): 463 temps.append(thermocouple.temperature) 464 logger.write_debug_to_report(f"Thermocouple channel {thermocouple_id}: {temps[-1]} °C") 465 self.write_row([elapsed_time, time.time(), *temps])
Functions for recording the thermocouples.
453 @property 454 def header(self) -> list[str]: 455 header_list: list[str] = ["Elapsed Time (s)", "Seconds since epoch (s)"] 456 header_list.extend(f"Channel {thermocouple_id}" for thermocouple_id in self.bms_hardware.thermo_couples) 457 return header_list
The csv header.
459 def record(self, elapsed_time: float = 0): 460 """Record the DMM AC current.""" 461 temps: list[float] = [] 462 for thermocouple_id, thermocouple in self.bms_hardware.thermo_couples.items(): 463 temps.append(thermocouple.temperature) 464 logger.write_debug_to_report(f"Thermocouple channel {thermocouple_id}: {temps[-1]} °C") 465 self.write_row([elapsed_time, time.time(), *temps])
Record the DMM AC current.
Inherited Members
468class CSVRecorders: 469 """Access to CSV recorders.""" 470 471 def __init__(self, bms_hardware: BMSHardware): 472 """Initialize CSV recorders.""" 473 self.ocv = OCV(bms_hardware) 474 self.cycle: CSVBase = Cycle(bms_hardware) 475 self.raw_voltage = RawVoltages(bms_hardware) 476 self.raw_voltage_average = RawVoltagesAverage(bms_hardware) 477 self.cycle_smbus = CycleSMBus(bms_hardware) 478 self.nicd = NiCdQC(bms_hardware) 479 self.dmm_ac = DMMAC(bms_hardware) 480 self.nicd_korad = NiCdKorad(bms_hardware) 481 self.raw_impedance = RawImpedance(bms_hardware) 482 self.raw_impedance_time = RawImpedance(bms_hardware) 483 self.thermo = Thermo(bms_hardware) 484 485 # This can be done dynamically, but of course the type checker really doesn't like it 486 # for csv_child in CSVBase.__subclasses__(): 487 # if not inspect.isabstract(csv_child): # Child is confirmed concrete 488 # setattr(self, csv_child.__name__.lower(), csv_child(bms_hardware)) # type: ignore 489 490 # def __getattr__(self, name): 491 # """Called for undefined attributes.""" 492 # raise AttributeError(f"'{self.__name__}' object has no attribute '{name}'")
Access to CSV recorders.
471 def __init__(self, bms_hardware: BMSHardware): 472 """Initialize CSV recorders.""" 473 self.ocv = OCV(bms_hardware) 474 self.cycle: CSVBase = Cycle(bms_hardware) 475 self.raw_voltage = RawVoltages(bms_hardware) 476 self.raw_voltage_average = RawVoltagesAverage(bms_hardware) 477 self.cycle_smbus = CycleSMBus(bms_hardware) 478 self.nicd = NiCdQC(bms_hardware) 479 self.dmm_ac = DMMAC(bms_hardware) 480 self.nicd_korad = NiCdKorad(bms_hardware) 481 self.raw_impedance = RawImpedance(bms_hardware) 482 self.raw_impedance_time = RawImpedance(bms_hardware) 483 self.thermo = Thermo(bms_hardware) 484 485 # This can be done dynamically, but of course the type checker really doesn't like it 486 # for csv_child in CSVBase.__subclasses__(): 487 # if not inspect.isabstract(csv_child): # Child is confirmed concrete 488 # setattr(self, csv_child.__name__.lower(), csv_child(bms_hardware)) # type: ignore
Initialize CSV recorders.