hitl_tester.modules.bms.csv_tables

Functions for the Open Circuit Voltage recording.

(c) 2020-2024 TurnAround Factor, Inc.

CUI DISTRIBUTION CONTROL Controlled by: DLA J68 R&D SBIP CUI Category: Small Business Research and Technology Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS POC: GOV SBIP Program Manager Denise Price, 571-767-0111 Distribution authorized to U.S. Government Agencies only, to protect information not owned by the U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, Fort Belvoir, VA 22060-6221

SBIR DATA RIGHTS Contract No.:SP4701-23-C-0083 Contractor Name: TurnAround Factor, Inc. Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 Expiration of SBIR Data Rights Period: September 24, 2029 The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause contained in the above identified contract. No restrictions apply after the expiration date shown above. Any reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce the markings.

  1"""
  2Functions for the Open Circuit Voltage recording.
  3
  4(c) 2020-2024 TurnAround Factor, Inc.
  5
  6CUI DISTRIBUTION CONTROL
  7Controlled by: DLA J68 R&D SBIP
  8CUI Category: Small Business Research and Technology
  9Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
 10POC: GOV SBIP Program Manager Denise Price, 571-767-0111
 11Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
 12U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
 13it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
 14for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
 15Fort Belvoir, VA 22060-6221
 16
 17SBIR DATA RIGHTS
 18Contract No.:SP4701-23-C-0083
 19Contractor Name: TurnAround Factor, Inc.
 20Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
 21Expiration of SBIR Data Rights Period: September 24, 2029
 22The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
 23software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
 24in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
 25contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
 26reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
 27the markings.
 28"""
 29
 30from __future__ import annotations
 31
 32import csv
 33import inspect
 34import math
 35import time
 36from abc import ABC
 37from abc import abstractmethod
 38from contextlib import suppress
 39from pathlib import Path
 40from typing import Any, cast, TYPE_CHECKING
 41
 42import pytest
 43
 44from hitl_tester.modules.bms.bms_serial import serial_monitor, VERSION
 45from hitl_tester.modules.bms.adc_plate import ADCPlate
 46from hitl_tester.modules.logger import logger
 47from hitl_tester.modules.bms.smbus import SMBus
 48from hitl_tester.modules.bms.smbus_types import SMBusError
 49from hitl_tester.modules.bms_types import BMSFlags
 50
 51if TYPE_CHECKING:  # Avoid circular import issue
 52    from hitl_tester.modules.bms.bms_hw import BMSHardware
 53
 54
 55class CSVBase(ABC):
 56    """Base class for csv writers."""
 57
 58    def __init__(self, bms_hardware: BMSHardware):
 59        """Create the csv object."""
 60        self.bms_hardware = bms_hardware
 61        self.filename: Path | None = None
 62        self.postfix_fn = lambda: ...
 63
 64    @property
 65    @abstractmethod
 66    def header(self) -> list[str]:
 67        """The csv header."""
 68
 69    def create_file(self, prefix: str = "", postfix: str = "") -> Path:
 70        """Create a csv file."""
 71
 72        # Return file if it was already created
 73        if self.filename or (
 74            hasattr(pytest, "flags")
 75            and isinstance(pytest.flags, BMSFlags)
 76            and (pytest.flags.doc_generation or pytest.flags.dry_run)
 77        ):
 78            return self.filename or Path(".")
 79
 80        # Generate filename
 81        path = Path(self.bms_hardware.report_filename)
 82        path = path.with_stem(f"{prefix}{path.stem}")
 83        filename = type(self).__name__.lower()
 84        self.filename = Path(f"{path}_{filename}{postfix}.csv")
 85
 86        # Create file
 87        with open(self.filename, "w", encoding="utf-8") as csv_file:
 88            csv.writer(csv_file).writerow(self.header)
 89        return self.filename
 90
 91    @abstractmethod
 92    def record(self):
 93        """Generate row data."""
 94
 95    def write_row(self, row: list[Any]):
 96        """Write the row."""
 97        self.create_file()  # Create the file if it does not exist
 98        assert self.filename  # Appease pylint
 99
100        with open(self.filename, "a", encoding="utf-8") as csv_file:
101            csv.writer(csv_file).writerow(row)
102
103        self.postfix_fn()
104
105
106class OCV(CSVBase):
107    """Functions for the Open Circuit Voltage recording."""
108
109    header = ["Temperature (C)", "Capacity (%)", "Voltage (V)", "Impedance (mΩ)", "Remaining Capacity (mAh)"]
110
111    def calculate_impedance(self, raw_voltage_data: list[float], current_a: float = 1.0) -> float:
112        """Calculate the impedance from the raw voltages."""
113        for i, voltage in enumerate(raw_voltage_data):
114            if i > 0 and raw_voltage_data[i - 1] - voltage > 0.003:  # If the voltage drops by at least 3 mV
115                # Impedance = (High voltage - 3rd low voltage)
116                return (raw_voltage_data[i - 1] - raw_voltage_data[i + 2]) * 1000 / current_a
117        return math.nan
118
119    def measure_impedance_data(self, pulse_current: float = 1.0, timestamps: bool = False) -> list[float]:
120        """Record the voltage drop with a current pulse."""
121        assert self.bms_hardware.load and self.bms_hardware.dmm
122        self.bms_hardware.load.configure_pulse_trigger(pulse_current)
123        self.bms_hardware.dmm.configure_voltage_trigger(timestamps)
124        self.bms_hardware.load.enable()
125        self.bms_hardware.dmm.send_trigger()  # Begin voltage measurements
126        self.bms_hardware.load.send_trigger()  # Pulse the current
127        self.bms_hardware.load.wait_for_pulse()  # Wait for load pulse to complete
128        self.bms_hardware.load.disable()
129        result = self.bms_hardware.dmm.read_internal_memory()  # Read voltage measurements
130        self.bms_hardware.dmm.configure_voltage_normal()
131        return result
132
133    def record(self, remaining_capacity_ah: float = 0.0):
134        """Record the OCV."""
135        logger.write_info_to_report("Recording OCV")
136
137        # FIXME(JA): determine if timestamps are useful or not
138        impedance = 0.0
139        impedance_timestamps = 0.0
140        assert self.bms_hardware.dmm
141        config = self.bms_hardware.dmm.impedance_config
142        for i in range(config.total_readings):
143            raw_voltage_data = self.measure_impedance_data(config.pulse_current)
144            raw_voltage_and_time = self.measure_impedance_data(config.pulse_current, timestamps=True)
145
146            # Cumulative average of impedance
147            impedance = (impedance * i + self.calculate_impedance(raw_voltage_data, config.pulse_current)) / (i + 1)
148            impedance_timestamps = (
149                impedance_timestamps * i + self.calculate_impedance(raw_voltage_and_time, config.pulse_current)
150            ) / (i + 1)
151
152            # For debugging impedance calculations
153            self.bms_hardware.csv.raw_impedance.record(impedance, raw_voltage_data)
154            self.bms_hardware.csv.raw_impedance_time.create_file(postfix="_time")
155            self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time)  # Both
156            self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[1::2])  # Time
157            self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[::2])  # Voltage
158
159        assert self.bms_hardware.dmm
160        self.write_row(
161            [
162                self.bms_hardware.temperature,
163                self.bms_hardware.remaining_capacity_percentage,
164                self.bms_hardware.dmm.volts,
165                impedance,
166                remaining_capacity_ah * 1000,
167            ]
168        )
169
170
171class Cycle(CSVBase):
172    """Functions for recording any run cycle."""
173
174    header = [
175        "Cycle",
176        "Initial Capacity (%)",
177        "Target Capacity (%)",
178        "Capacity (Ah)",
179        "Current (A)",
180        "Voltage (V)",
181        "Temperature (C)",
182        "Resistance (Ω)",
183        "Elapsed Time (s)",
184    ]
185
186    def record(self, elapsed_time: float = 0, current: float = 0, amp_hrs: float | None = None, resistance: float = 0):
187        """Record various cycle data."""
188        assert self.bms_hardware.dmm
189        self.write_row(
190            [
191                inspect.stack()[1].function,
192                self.bms_hardware.remaining_capacity_percentage,
193                self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100,
194                amp_hrs,
195                current,
196                self.bms_hardware.dmm.volts,
197                self.bms_hardware.temperature,
198                resistance,
199                elapsed_time,
200            ]
201        )
202
203
204class CycleSMBus(CSVBase):
205    """Functions for recording any run cycle, as well as the smbus."""
206
207    def __init__(self, bms_hardware: BMSHardware):
208        """Add variable to detect first run."""
209        super().__init__(bms_hardware)
210        self.smbus = SMBus()
211        self.adc_plate = ADCPlate()
212        self._header_cache: list[str] = []
213        self.last_row: dict[str, float | str | None] = {}
214        self.last_serial_data: dict[str, int | str] = {}
215        self.last_cell_data: dict[int, dict[str, int]] = {}
216
217    @property
218    def header(self) -> list[str]:
219        if self._header_cache:
220            return self._header_cache
221
222        header_text = [
223            "Cycle",
224            "Initial Capacity (%)",
225            "Target Capacity (%)",
226            "HITL Capacity (Ah)",
227            "HITL Current (A)",
228            "HITL Voltage (V)",
229            "ADC Plate Terminal Voltage (V)",
230            "HITL Temperature (C)",
231            "HITL Resistance (Ω)",
232            "HITL Elapsed Time (s)",
233        ]
234
235        # Create dynamic headers
236        for cell_id in self.adc_plate.cell_ids:
237            header_text.append(f"ADC Plate Cell {cell_id} Voltage (V)")
238
239        for name in ("SOC (%)", "Volts (V)", "Measured Volts (V)", "Current (A)", "Ohms (Ω)"):
240            for i in sorted(self.bms_hardware.cells):
241                header_text.append(f"Cell Sim {i} {name}")
242
243        header_text.append("|SERIAL|")
244        header_text += serial_monitor.parse_serial_data(dict.fromkeys(serial_monitor.get_headers(), VERSION))
245
246        header_text.append("|SMBUS|")
247        with suppress(SMBusError):
248            header_text += self.smbus.parse_smbus_data()
249
250        self._header_cache = header_text
251        return header_text
252
253    def record(
254        self,
255        elapsed_time: float = 0,
256        current: float = 0,
257        amp_hrs: float | None = None,
258        resistance: float = 0,
259        state: str = "",
260        serial_data: dict[str, int | str] | None = None,
261        suppress_smbus: bool = False,
262    ):
263        """Record various cycle data."""
264
265        # Get cell simulator data
266        sim_data = []
267        self.last_cell_data = {}
268        for attr in ("state_of_charge", "volts", "measured_volts", "amps", "ohms"):
269            for cell in self.bms_hardware.cells.values():
270                self.last_cell_data[cell.id] = self.last_cell_data.get(cell.id, {})
271                self.last_cell_data[cell.id][attr] = getattr(cell, attr)
272                sim_data.append(
273                    f"{self.last_cell_data[cell.id][attr]:.2%}"
274                    if attr == "state_of_charge"
275                    else str(self.last_cell_data[cell.id][attr])
276                )
277
278        # Get serial data
279        if serial_data is None:
280            serial_data = serial_monitor.read()
281            assert serial_data is not None
282        self.last_serial_data = serial_data
283        parsed_serial_data = serial_monitor.parse_serial_data(serial_data)
284
285        # Get smbus data
286        try:
287            parsed_smbus_data = self.smbus.parse_smbus_data()
288        except SMBusError as e:
289            parsed_smbus_data = {}
290            if not suppress_smbus:
291                logger.write_error_to_report(str(e))
292
293        assert self.bms_hardware.dmm
294        row = [
295            state or inspect.stack()[1].function,
296            self.bms_hardware.remaining_capacity_percentage,
297            self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100,
298            amp_hrs,
299            current,
300            self.bms_hardware.dmm.volts,
301            self.adc_plate.terminal_volts,
302            self.bms_hardware.temperature,
303            resistance,
304            elapsed_time,
305            *[self.adc_plate.cell_volts(cell_id) for cell_id in self.adc_plate.cell_ids],
306            *sim_data,
307            None,  # Spacing
308            *parsed_serial_data.values(),
309            None,  # Spacing
310            *parsed_smbus_data.values(),
311        ]
312        self.last_row = dict(zip(self.header, cast(list[str | float | None], row)))
313        self.write_row(row)
314
315
316class RawImpedance(CSVBase):
317    """Functions for recording the Impedance."""
318
319    header = ["Capacity (%)", "Impedance (mΩ)", "Raw Voltages (V)"]
320
321    def record(self, impedance: float = math.nan, raw_voltage_data: list[float] | None = None):
322        """Record the raw impedance data."""
323        logger.write_info_to_report("Recording Raw Impedance")
324
325        if raw_voltage_data is None:
326            raw_voltage_data = []
327        self.write_row(
328            [
329                self.bms_hardware.remaining_capacity_percentage,
330                impedance,
331                *raw_voltage_data,
332            ]
333        )
334
335
336class RawVoltages(CSVBase):
337    """Functions for recording the voltages."""
338
339    header = ["Elapsed time (s)", "Measurement time (s)", "Raw Voltages (V)"]
340
341    def record(self, elapsed_time: float = 0, measurement_time: float = 0, raw_voltage_data: list[float] | None = None):
342        """Record the raw voltage data."""
343        logger.write_info_to_report("Recording Raw Voltages")
344
345        if raw_voltage_data is None:
346            raw_voltage_data = []
347        self.write_row(
348            [
349                elapsed_time,
350                measurement_time,
351                *raw_voltage_data,
352            ]
353        )
354
355
356class RawVoltagesAverage(CSVBase):
357    """Functions for recording the voltages."""
358
359    header = [
360        "Elapsed time (s)",
361        "Measurement time (s)",
362        "Average Voltage (V)",
363        "Max Voltage (V)",
364        "Min Voltage (V)",
365        "Total Measurements",
366    ]
367
368    def record(
369        self,
370        elapsed_time: float = 0,
371        measurement_time: float = 0,
372        average_volts: float = 0,
373        max_volts: float = 0,
374        min_volts: float = 0,
375        measurement_count: float = 0,
376    ):
377        """Record the raw voltage data."""
378        logger.write_info_to_report("Recording Raw Voltages")
379
380        self.write_row(
381            [
382                elapsed_time,
383                measurement_time,
384                average_volts,
385                max_volts,
386                min_volts,
387                measurement_count,
388            ]
389        )
390
391
392class NiCdQC(CSVBase):
393    """Functions for recording a NiCd cell during QC."""
394
395    header = [
396        "Elapsed time (s)",
397        "Elapsed time discharge (s)",
398        "Board",
399        "Channel",
400        "Voltage (V)",
401        "State",
402        "1.21V Time (s)",
403        "1.20V Time (s)",
404    ]
405
406    def record(
407        self,
408        elapsed_time: float = 0,
409        elapsed_time_discharge: float = 0,
410        board: int = 0,
411        channel: int = 0,
412        voltage: float = 0.0,
413        state: Any = None,
414        time_121: float = 0.0,
415        time_120: float = 0.0,
416    ):
417        """Record the NiCd cell data."""
418        self.write_row([elapsed_time, elapsed_time_discharge, board, channel, voltage, state, time_121, time_120])
419
420
421class DMMAC(CSVBase):
422    """Functions for recording the DMM AC current."""
423
424    header = [
425        "Elapsed time (s)",
426        "AC Current (A)",
427    ]
428
429    def record(self, elapsed_time: float = 0, current: float = 0):
430        """Record the DMM AC current."""
431        self.write_row([elapsed_time, current])
432
433
434class NiCdKorad(CSVBase):
435    """Functions for recording the DMM AC current."""
436
437    header = [
438        "Elapsed time (s)",
439        "Status flags",
440        "Current (A)",
441        "Voltage (V)",
442    ]
443
444    def record(self, elapsed_time: float = 0, status_flags: str = "", current: float = 0, voltage: float = 0):
445        """Record the DMM AC current."""
446        self.write_row([elapsed_time, status_flags, current, voltage])
447
448
449class Thermo(CSVBase):
450    """Functions for recording the thermocouples."""
451
452    @property
453    def header(self) -> list[str]:
454        header_list: list[str] = ["Elapsed Time (s)", "Seconds since epoch (s)"]
455        header_list.extend(f"Channel {thermocouple_id}" for thermocouple_id in self.bms_hardware.thermo_couples)
456        return header_list
457
458    def record(self, elapsed_time: float = 0):
459        """Record the DMM AC current."""
460        temps: list[float] = []
461        for thermocouple_id, thermocouple in self.bms_hardware.thermo_couples.items():
462            temps.append(thermocouple.temperature)
463            logger.write_debug_to_report(f"Thermocouple channel {thermocouple_id}: {temps[-1]} °C")
464        self.write_row([elapsed_time, time.time(), *temps])
465
466
467class CSVRecorders:
468    """Access to CSV recorders."""
469
470    def __init__(self, bms_hardware: BMSHardware):
471        """Initialize CSV recorders."""
472        self.ocv = OCV(bms_hardware)
473        self.cycle: CSVBase = Cycle(bms_hardware)
474        self.raw_voltage = RawVoltages(bms_hardware)
475        self.raw_voltage_average = RawVoltagesAverage(bms_hardware)
476        self.cycle_smbus = CycleSMBus(bms_hardware)
477        self.nicd = NiCdQC(bms_hardware)
478        self.dmm_ac = DMMAC(bms_hardware)
479        self.nicd_korad = NiCdKorad(bms_hardware)
480        self.raw_impedance = RawImpedance(bms_hardware)
481        self.raw_impedance_time = RawImpedance(bms_hardware)
482        self.thermo = Thermo(bms_hardware)
483
484        # This can be done dynamically, but of course the type checker really doesn't like it
485        # for csv_child in CSVBase.__subclasses__():
486        #     if not inspect.isabstract(csv_child):  # Child is confirmed concrete
487        #         setattr(self, csv_child.__name__.lower(), csv_child(bms_hardware))  # type: ignore
488
489    # def __getattr__(self, name):
490    #     """Called for undefined attributes."""
491    #     raise AttributeError(f"'{self.__name__}' object has no attribute '{name}'")
class CSVBase(abc.ABC):
 56class CSVBase(ABC):
 57    """Base class for csv writers."""
 58
 59    def __init__(self, bms_hardware: BMSHardware):
 60        """Create the csv object."""
 61        self.bms_hardware = bms_hardware
 62        self.filename: Path | None = None
 63        self.postfix_fn = lambda: ...
 64
 65    @property
 66    @abstractmethod
 67    def header(self) -> list[str]:
 68        """The csv header."""
 69
 70    def create_file(self, prefix: str = "", postfix: str = "") -> Path:
 71        """Create a csv file."""
 72
 73        # Return file if it was already created
 74        if self.filename or (
 75            hasattr(pytest, "flags")
 76            and isinstance(pytest.flags, BMSFlags)
 77            and (pytest.flags.doc_generation or pytest.flags.dry_run)
 78        ):
 79            return self.filename or Path(".")
 80
 81        # Generate filename
 82        path = Path(self.bms_hardware.report_filename)
 83        path = path.with_stem(f"{prefix}{path.stem}")
 84        filename = type(self).__name__.lower()
 85        self.filename = Path(f"{path}_{filename}{postfix}.csv")
 86
 87        # Create file
 88        with open(self.filename, "w", encoding="utf-8") as csv_file:
 89            csv.writer(csv_file).writerow(self.header)
 90        return self.filename
 91
 92    @abstractmethod
 93    def record(self):
 94        """Generate row data."""
 95
 96    def write_row(self, row: list[Any]):
 97        """Write the row."""
 98        self.create_file()  # Create the file if it does not exist
 99        assert self.filename  # Appease pylint
100
101        with open(self.filename, "a", encoding="utf-8") as csv_file:
102            csv.writer(csv_file).writerow(row)
103
104        self.postfix_fn()

Base class for csv writers.

CSVBase(bms_hardware: hitl_tester.modules.bms.bms_hw.BMSHardware)
59    def __init__(self, bms_hardware: BMSHardware):
60        """Create the csv object."""
61        self.bms_hardware = bms_hardware
62        self.filename: Path | None = None
63        self.postfix_fn = lambda: ...

Create the csv object.

bms_hardware
filename: pathlib.Path | None
postfix_fn
header: list[str]
65    @property
66    @abstractmethod
67    def header(self) -> list[str]:
68        """The csv header."""

The csv header.

def create_file(self, prefix: str = '', postfix: str = '') -> pathlib.Path:
70    def create_file(self, prefix: str = "", postfix: str = "") -> Path:
71        """Create a csv file."""
72
73        # Return file if it was already created
74        if self.filename or (
75            hasattr(pytest, "flags")
76            and isinstance(pytest.flags, BMSFlags)
77            and (pytest.flags.doc_generation or pytest.flags.dry_run)
78        ):
79            return self.filename or Path(".")
80
81        # Generate filename
82        path = Path(self.bms_hardware.report_filename)
83        path = path.with_stem(f"{prefix}{path.stem}")
84        filename = type(self).__name__.lower()
85        self.filename = Path(f"{path}_{filename}{postfix}.csv")
86
87        # Create file
88        with open(self.filename, "w", encoding="utf-8") as csv_file:
89            csv.writer(csv_file).writerow(self.header)
90        return self.filename

Create a csv file.

@abstractmethod
def record(self):
92    @abstractmethod
93    def record(self):
94        """Generate row data."""

Generate row data.

def write_row(self, row: list[typing.Any]):
 96    def write_row(self, row: list[Any]):
 97        """Write the row."""
 98        self.create_file()  # Create the file if it does not exist
 99        assert self.filename  # Appease pylint
100
101        with open(self.filename, "a", encoding="utf-8") as csv_file:
102            csv.writer(csv_file).writerow(row)
103
104        self.postfix_fn()

Write the row.

class OCV(CSVBase):
107class OCV(CSVBase):
108    """Functions for the Open Circuit Voltage recording."""
109
110    header = ["Temperature (C)", "Capacity (%)", "Voltage (V)", "Impedance (mΩ)", "Remaining Capacity (mAh)"]
111
112    def calculate_impedance(self, raw_voltage_data: list[float], current_a: float = 1.0) -> float:
113        """Calculate the impedance from the raw voltages."""
114        for i, voltage in enumerate(raw_voltage_data):
115            if i > 0 and raw_voltage_data[i - 1] - voltage > 0.003:  # If the voltage drops by at least 3 mV
116                # Impedance = (High voltage - 3rd low voltage)
117                return (raw_voltage_data[i - 1] - raw_voltage_data[i + 2]) * 1000 / current_a
118        return math.nan
119
120    def measure_impedance_data(self, pulse_current: float = 1.0, timestamps: bool = False) -> list[float]:
121        """Record the voltage drop with a current pulse."""
122        assert self.bms_hardware.load and self.bms_hardware.dmm
123        self.bms_hardware.load.configure_pulse_trigger(pulse_current)
124        self.bms_hardware.dmm.configure_voltage_trigger(timestamps)
125        self.bms_hardware.load.enable()
126        self.bms_hardware.dmm.send_trigger()  # Begin voltage measurements
127        self.bms_hardware.load.send_trigger()  # Pulse the current
128        self.bms_hardware.load.wait_for_pulse()  # Wait for load pulse to complete
129        self.bms_hardware.load.disable()
130        result = self.bms_hardware.dmm.read_internal_memory()  # Read voltage measurements
131        self.bms_hardware.dmm.configure_voltage_normal()
132        return result
133
134    def record(self, remaining_capacity_ah: float = 0.0):
135        """Record the OCV."""
136        logger.write_info_to_report("Recording OCV")
137
138        # FIXME(JA): determine if timestamps are useful or not
139        impedance = 0.0
140        impedance_timestamps = 0.0
141        assert self.bms_hardware.dmm
142        config = self.bms_hardware.dmm.impedance_config
143        for i in range(config.total_readings):
144            raw_voltage_data = self.measure_impedance_data(config.pulse_current)
145            raw_voltage_and_time = self.measure_impedance_data(config.pulse_current, timestamps=True)
146
147            # Cumulative average of impedance
148            impedance = (impedance * i + self.calculate_impedance(raw_voltage_data, config.pulse_current)) / (i + 1)
149            impedance_timestamps = (
150                impedance_timestamps * i + self.calculate_impedance(raw_voltage_and_time, config.pulse_current)
151            ) / (i + 1)
152
153            # For debugging impedance calculations
154            self.bms_hardware.csv.raw_impedance.record(impedance, raw_voltage_data)
155            self.bms_hardware.csv.raw_impedance_time.create_file(postfix="_time")
156            self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time)  # Both
157            self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[1::2])  # Time
158            self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[::2])  # Voltage
159
160        assert self.bms_hardware.dmm
161        self.write_row(
162            [
163                self.bms_hardware.temperature,
164                self.bms_hardware.remaining_capacity_percentage,
165                self.bms_hardware.dmm.volts,
166                impedance,
167                remaining_capacity_ah * 1000,
168            ]
169        )

Functions for the Open Circuit Voltage recording.

header = ['Temperature (C)', 'Capacity (%)', 'Voltage (V)', 'Impedance (mΩ)', 'Remaining Capacity (mAh)']

The csv header.

def calculate_impedance(self, raw_voltage_data: list[float], current_a: float = 1.0) -> float:
112    def calculate_impedance(self, raw_voltage_data: list[float], current_a: float = 1.0) -> float:
113        """Calculate the impedance from the raw voltages."""
114        for i, voltage in enumerate(raw_voltage_data):
115            if i > 0 and raw_voltage_data[i - 1] - voltage > 0.003:  # If the voltage drops by at least 3 mV
116                # Impedance = (High voltage - 3rd low voltage)
117                return (raw_voltage_data[i - 1] - raw_voltage_data[i + 2]) * 1000 / current_a
118        return math.nan

Calculate the impedance from the raw voltages.

def measure_impedance_data( self, pulse_current: float = 1.0, timestamps: bool = False) -> list[float]:
120    def measure_impedance_data(self, pulse_current: float = 1.0, timestamps: bool = False) -> list[float]:
121        """Record the voltage drop with a current pulse."""
122        assert self.bms_hardware.load and self.bms_hardware.dmm
123        self.bms_hardware.load.configure_pulse_trigger(pulse_current)
124        self.bms_hardware.dmm.configure_voltage_trigger(timestamps)
125        self.bms_hardware.load.enable()
126        self.bms_hardware.dmm.send_trigger()  # Begin voltage measurements
127        self.bms_hardware.load.send_trigger()  # Pulse the current
128        self.bms_hardware.load.wait_for_pulse()  # Wait for load pulse to complete
129        self.bms_hardware.load.disable()
130        result = self.bms_hardware.dmm.read_internal_memory()  # Read voltage measurements
131        self.bms_hardware.dmm.configure_voltage_normal()
132        return result

Record the voltage drop with a current pulse.

def record(self, remaining_capacity_ah: float = 0.0):
134    def record(self, remaining_capacity_ah: float = 0.0):
135        """Record the OCV."""
136        logger.write_info_to_report("Recording OCV")
137
138        # FIXME(JA): determine if timestamps are useful or not
139        impedance = 0.0
140        impedance_timestamps = 0.0
141        assert self.bms_hardware.dmm
142        config = self.bms_hardware.dmm.impedance_config
143        for i in range(config.total_readings):
144            raw_voltage_data = self.measure_impedance_data(config.pulse_current)
145            raw_voltage_and_time = self.measure_impedance_data(config.pulse_current, timestamps=True)
146
147            # Cumulative average of impedance
148            impedance = (impedance * i + self.calculate_impedance(raw_voltage_data, config.pulse_current)) / (i + 1)
149            impedance_timestamps = (
150                impedance_timestamps * i + self.calculate_impedance(raw_voltage_and_time, config.pulse_current)
151            ) / (i + 1)
152
153            # For debugging impedance calculations
154            self.bms_hardware.csv.raw_impedance.record(impedance, raw_voltage_data)
155            self.bms_hardware.csv.raw_impedance_time.create_file(postfix="_time")
156            self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time)  # Both
157            self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[1::2])  # Time
158            self.bms_hardware.csv.raw_impedance_time.record(impedance_timestamps, raw_voltage_and_time[::2])  # Voltage
159
160        assert self.bms_hardware.dmm
161        self.write_row(
162            [
163                self.bms_hardware.temperature,
164                self.bms_hardware.remaining_capacity_percentage,
165                self.bms_hardware.dmm.volts,
166                impedance,
167                remaining_capacity_ah * 1000,
168            ]
169        )

Record the OCV.

class Cycle(CSVBase):
172class Cycle(CSVBase):
173    """Functions for recording any run cycle."""
174
175    header = [
176        "Cycle",
177        "Initial Capacity (%)",
178        "Target Capacity (%)",
179        "Capacity (Ah)",
180        "Current (A)",
181        "Voltage (V)",
182        "Temperature (C)",
183        "Resistance (Ω)",
184        "Elapsed Time (s)",
185    ]
186
187    def record(self, elapsed_time: float = 0, current: float = 0, amp_hrs: float | None = None, resistance: float = 0):
188        """Record various cycle data."""
189        assert self.bms_hardware.dmm
190        self.write_row(
191            [
192                inspect.stack()[1].function,
193                self.bms_hardware.remaining_capacity_percentage,
194                self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100,
195                amp_hrs,
196                current,
197                self.bms_hardware.dmm.volts,
198                self.bms_hardware.temperature,
199                resistance,
200                elapsed_time,
201            ]
202        )

Functions for recording any run cycle.

header = ['Cycle', 'Initial Capacity (%)', 'Target Capacity (%)', 'Capacity (Ah)', 'Current (A)', 'Voltage (V)', 'Temperature (C)', 'Resistance (Ω)', 'Elapsed Time (s)']

The csv header.

def record( self, elapsed_time: float = 0, current: float = 0, amp_hrs: float | None = None, resistance: float = 0):
187    def record(self, elapsed_time: float = 0, current: float = 0, amp_hrs: float | None = None, resistance: float = 0):
188        """Record various cycle data."""
189        assert self.bms_hardware.dmm
190        self.write_row(
191            [
192                inspect.stack()[1].function,
193                self.bms_hardware.remaining_capacity_percentage,
194                self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100,
195                amp_hrs,
196                current,
197                self.bms_hardware.dmm.volts,
198                self.bms_hardware.temperature,
199                resistance,
200                elapsed_time,
201            ]
202        )

Record various cycle data.

class CycleSMBus(CSVBase):
205class CycleSMBus(CSVBase):
206    """Functions for recording any run cycle, as well as the smbus."""
207
208    def __init__(self, bms_hardware: BMSHardware):
209        """Add variable to detect first run."""
210        super().__init__(bms_hardware)
211        self.smbus = SMBus()
212        self.adc_plate = ADCPlate()
213        self._header_cache: list[str] = []
214        self.last_row: dict[str, float | str | None] = {}
215        self.last_serial_data: dict[str, int | str] = {}
216        self.last_cell_data: dict[int, dict[str, int]] = {}
217
218    @property
219    def header(self) -> list[str]:
220        if self._header_cache:
221            return self._header_cache
222
223        header_text = [
224            "Cycle",
225            "Initial Capacity (%)",
226            "Target Capacity (%)",
227            "HITL Capacity (Ah)",
228            "HITL Current (A)",
229            "HITL Voltage (V)",
230            "ADC Plate Terminal Voltage (V)",
231            "HITL Temperature (C)",
232            "HITL Resistance (Ω)",
233            "HITL Elapsed Time (s)",
234        ]
235
236        # Create dynamic headers
237        for cell_id in self.adc_plate.cell_ids:
238            header_text.append(f"ADC Plate Cell {cell_id} Voltage (V)")
239
240        for name in ("SOC (%)", "Volts (V)", "Measured Volts (V)", "Current (A)", "Ohms (Ω)"):
241            for i in sorted(self.bms_hardware.cells):
242                header_text.append(f"Cell Sim {i} {name}")
243
244        header_text.append("|SERIAL|")
245        header_text += serial_monitor.parse_serial_data(dict.fromkeys(serial_monitor.get_headers(), VERSION))
246
247        header_text.append("|SMBUS|")
248        with suppress(SMBusError):
249            header_text += self.smbus.parse_smbus_data()
250
251        self._header_cache = header_text
252        return header_text
253
254    def record(
255        self,
256        elapsed_time: float = 0,
257        current: float = 0,
258        amp_hrs: float | None = None,
259        resistance: float = 0,
260        state: str = "",
261        serial_data: dict[str, int | str] | None = None,
262        suppress_smbus: bool = False,
263    ):
264        """Record various cycle data."""
265
266        # Get cell simulator data
267        sim_data = []
268        self.last_cell_data = {}
269        for attr in ("state_of_charge", "volts", "measured_volts", "amps", "ohms"):
270            for cell in self.bms_hardware.cells.values():
271                self.last_cell_data[cell.id] = self.last_cell_data.get(cell.id, {})
272                self.last_cell_data[cell.id][attr] = getattr(cell, attr)
273                sim_data.append(
274                    f"{self.last_cell_data[cell.id][attr]:.2%}"
275                    if attr == "state_of_charge"
276                    else str(self.last_cell_data[cell.id][attr])
277                )
278
279        # Get serial data
280        if serial_data is None:
281            serial_data = serial_monitor.read()
282            assert serial_data is not None
283        self.last_serial_data = serial_data
284        parsed_serial_data = serial_monitor.parse_serial_data(serial_data)
285
286        # Get smbus data
287        try:
288            parsed_smbus_data = self.smbus.parse_smbus_data()
289        except SMBusError as e:
290            parsed_smbus_data = {}
291            if not suppress_smbus:
292                logger.write_error_to_report(str(e))
293
294        assert self.bms_hardware.dmm
295        row = [
296            state or inspect.stack()[1].function,
297            self.bms_hardware.remaining_capacity_percentage,
298            self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100,
299            amp_hrs,
300            current,
301            self.bms_hardware.dmm.volts,
302            self.adc_plate.terminal_volts,
303            self.bms_hardware.temperature,
304            resistance,
305            elapsed_time,
306            *[self.adc_plate.cell_volts(cell_id) for cell_id in self.adc_plate.cell_ids],
307            *sim_data,
308            None,  # Spacing
309            *parsed_serial_data.values(),
310            None,  # Spacing
311            *parsed_smbus_data.values(),
312        ]
313        self.last_row = dict(zip(self.header, cast(list[str | float | None], row)))
314        self.write_row(row)

Functions for recording any run cycle, as well as the smbus.

CycleSMBus(bms_hardware: hitl_tester.modules.bms.bms_hw.BMSHardware)
208    def __init__(self, bms_hardware: BMSHardware):
209        """Add variable to detect first run."""
210        super().__init__(bms_hardware)
211        self.smbus = SMBus()
212        self.adc_plate = ADCPlate()
213        self._header_cache: list[str] = []
214        self.last_row: dict[str, float | str | None] = {}
215        self.last_serial_data: dict[str, int | str] = {}
216        self.last_cell_data: dict[int, dict[str, int]] = {}

Add variable to detect first run.

smbus
adc_plate
last_row: dict[str, float | str | None]
last_serial_data: dict[str, int | str]
last_cell_data: dict[int, dict[str, int]]
header: list[str]
218    @property
219    def header(self) -> list[str]:
220        if self._header_cache:
221            return self._header_cache
222
223        header_text = [
224            "Cycle",
225            "Initial Capacity (%)",
226            "Target Capacity (%)",
227            "HITL Capacity (Ah)",
228            "HITL Current (A)",
229            "HITL Voltage (V)",
230            "ADC Plate Terminal Voltage (V)",
231            "HITL Temperature (C)",
232            "HITL Resistance (Ω)",
233            "HITL Elapsed Time (s)",
234        ]
235
236        # Create dynamic headers
237        for cell_id in self.adc_plate.cell_ids:
238            header_text.append(f"ADC Plate Cell {cell_id} Voltage (V)")
239
240        for name in ("SOC (%)", "Volts (V)", "Measured Volts (V)", "Current (A)", "Ohms (Ω)"):
241            for i in sorted(self.bms_hardware.cells):
242                header_text.append(f"Cell Sim {i} {name}")
243
244        header_text.append("|SERIAL|")
245        header_text += serial_monitor.parse_serial_data(dict.fromkeys(serial_monitor.get_headers(), VERSION))
246
247        header_text.append("|SMBUS|")
248        with suppress(SMBusError):
249            header_text += self.smbus.parse_smbus_data()
250
251        self._header_cache = header_text
252        return header_text

The csv header.

def record( self, elapsed_time: float = 0, current: float = 0, amp_hrs: float | None = None, resistance: float = 0, state: str = '', serial_data: dict[str, int | str] | None = None, suppress_smbus: bool = False):
254    def record(
255        self,
256        elapsed_time: float = 0,
257        current: float = 0,
258        amp_hrs: float | None = None,
259        resistance: float = 0,
260        state: str = "",
261        serial_data: dict[str, int | str] | None = None,
262        suppress_smbus: bool = False,
263    ):
264        """Record various cycle data."""
265
266        # Get cell simulator data
267        sim_data = []
268        self.last_cell_data = {}
269        for attr in ("state_of_charge", "volts", "measured_volts", "amps", "ohms"):
270            for cell in self.bms_hardware.cells.values():
271                self.last_cell_data[cell.id] = self.last_cell_data.get(cell.id, {})
272                self.last_cell_data[cell.id][attr] = getattr(cell, attr)
273                sim_data.append(
274                    f"{self.last_cell_data[cell.id][attr]:.2%}"
275                    if attr == "state_of_charge"
276                    else str(self.last_cell_data[cell.id][attr])
277                )
278
279        # Get serial data
280        if serial_data is None:
281            serial_data = serial_monitor.read()
282            assert serial_data is not None
283        self.last_serial_data = serial_data
284        parsed_serial_data = serial_monitor.parse_serial_data(serial_data)
285
286        # Get smbus data
287        try:
288            parsed_smbus_data = self.smbus.parse_smbus_data()
289        except SMBusError as e:
290            parsed_smbus_data = {}
291            if not suppress_smbus:
292                logger.write_error_to_report(str(e))
293
294        assert self.bms_hardware.dmm
295        row = [
296            state or inspect.stack()[1].function,
297            self.bms_hardware.remaining_capacity_percentage,
298            self.bms_hardware.remaining_capacity_percentage - self.bms_hardware.percent_discharge * 100,
299            amp_hrs,
300            current,
301            self.bms_hardware.dmm.volts,
302            self.adc_plate.terminal_volts,
303            self.bms_hardware.temperature,
304            resistance,
305            elapsed_time,
306            *[self.adc_plate.cell_volts(cell_id) for cell_id in self.adc_plate.cell_ids],
307            *sim_data,
308            None,  # Spacing
309            *parsed_serial_data.values(),
310            None,  # Spacing
311            *parsed_smbus_data.values(),
312        ]
313        self.last_row = dict(zip(self.header, cast(list[str | float | None], row)))
314        self.write_row(row)

Record various cycle data.

class RawImpedance(CSVBase):
317class RawImpedance(CSVBase):
318    """Functions for recording the Impedance."""
319
320    header = ["Capacity (%)", "Impedance (mΩ)", "Raw Voltages (V)"]
321
322    def record(self, impedance: float = math.nan, raw_voltage_data: list[float] | None = None):
323        """Record the raw impedance data."""
324        logger.write_info_to_report("Recording Raw Impedance")
325
326        if raw_voltage_data is None:
327            raw_voltage_data = []
328        self.write_row(
329            [
330                self.bms_hardware.remaining_capacity_percentage,
331                impedance,
332                *raw_voltage_data,
333            ]
334        )

Functions for recording the Impedance.

header = ['Capacity (%)', 'Impedance (mΩ)', 'Raw Voltages (V)']

The csv header.

def record( self, impedance: float = nan, raw_voltage_data: list[float] | None = None):
322    def record(self, impedance: float = math.nan, raw_voltage_data: list[float] | None = None):
323        """Record the raw impedance data."""
324        logger.write_info_to_report("Recording Raw Impedance")
325
326        if raw_voltage_data is None:
327            raw_voltage_data = []
328        self.write_row(
329            [
330                self.bms_hardware.remaining_capacity_percentage,
331                impedance,
332                *raw_voltage_data,
333            ]
334        )

Record the raw impedance data.

class RawVoltages(CSVBase):
337class RawVoltages(CSVBase):
338    """Functions for recording the voltages."""
339
340    header = ["Elapsed time (s)", "Measurement time (s)", "Raw Voltages (V)"]
341
342    def record(self, elapsed_time: float = 0, measurement_time: float = 0, raw_voltage_data: list[float] | None = None):
343        """Record the raw voltage data."""
344        logger.write_info_to_report("Recording Raw Voltages")
345
346        if raw_voltage_data is None:
347            raw_voltage_data = []
348        self.write_row(
349            [
350                elapsed_time,
351                measurement_time,
352                *raw_voltage_data,
353            ]
354        )

Functions for recording the voltages.

header = ['Elapsed time (s)', 'Measurement time (s)', 'Raw Voltages (V)']

The csv header.

def record( self, elapsed_time: float = 0, measurement_time: float = 0, raw_voltage_data: list[float] | None = None):
342    def record(self, elapsed_time: float = 0, measurement_time: float = 0, raw_voltage_data: list[float] | None = None):
343        """Record the raw voltage data."""
344        logger.write_info_to_report("Recording Raw Voltages")
345
346        if raw_voltage_data is None:
347            raw_voltage_data = []
348        self.write_row(
349            [
350                elapsed_time,
351                measurement_time,
352                *raw_voltage_data,
353            ]
354        )

Record the raw voltage data.

class RawVoltagesAverage(CSVBase):
357class RawVoltagesAverage(CSVBase):
358    """Functions for recording the voltages."""
359
360    header = [
361        "Elapsed time (s)",
362        "Measurement time (s)",
363        "Average Voltage (V)",
364        "Max Voltage (V)",
365        "Min Voltage (V)",
366        "Total Measurements",
367    ]
368
369    def record(
370        self,
371        elapsed_time: float = 0,
372        measurement_time: float = 0,
373        average_volts: float = 0,
374        max_volts: float = 0,
375        min_volts: float = 0,
376        measurement_count: float = 0,
377    ):
378        """Record the raw voltage data."""
379        logger.write_info_to_report("Recording Raw Voltages")
380
381        self.write_row(
382            [
383                elapsed_time,
384                measurement_time,
385                average_volts,
386                max_volts,
387                min_volts,
388                measurement_count,
389            ]
390        )

Functions for recording the voltages.

header = ['Elapsed time (s)', 'Measurement time (s)', 'Average Voltage (V)', 'Max Voltage (V)', 'Min Voltage (V)', 'Total Measurements']

The csv header.

def record( self, elapsed_time: float = 0, measurement_time: float = 0, average_volts: float = 0, max_volts: float = 0, min_volts: float = 0, measurement_count: float = 0):
369    def record(
370        self,
371        elapsed_time: float = 0,
372        measurement_time: float = 0,
373        average_volts: float = 0,
374        max_volts: float = 0,
375        min_volts: float = 0,
376        measurement_count: float = 0,
377    ):
378        """Record the raw voltage data."""
379        logger.write_info_to_report("Recording Raw Voltages")
380
381        self.write_row(
382            [
383                elapsed_time,
384                measurement_time,
385                average_volts,
386                max_volts,
387                min_volts,
388                measurement_count,
389            ]
390        )

Record the raw voltage data.

class NiCdQC(CSVBase):
393class NiCdQC(CSVBase):
394    """Functions for recording a NiCd cell during QC."""
395
396    header = [
397        "Elapsed time (s)",
398        "Elapsed time discharge (s)",
399        "Board",
400        "Channel",
401        "Voltage (V)",
402        "State",
403        "1.21V Time (s)",
404        "1.20V Time (s)",
405    ]
406
407    def record(
408        self,
409        elapsed_time: float = 0,
410        elapsed_time_discharge: float = 0,
411        board: int = 0,
412        channel: int = 0,
413        voltage: float = 0.0,
414        state: Any = None,
415        time_121: float = 0.0,
416        time_120: float = 0.0,
417    ):
418        """Record the NiCd cell data."""
419        self.write_row([elapsed_time, elapsed_time_discharge, board, channel, voltage, state, time_121, time_120])

Functions for recording a NiCd cell during QC.

header = ['Elapsed time (s)', 'Elapsed time discharge (s)', 'Board', 'Channel', 'Voltage (V)', 'State', '1.21V Time (s)', '1.20V Time (s)']

The csv header.

def record( self, elapsed_time: float = 0, elapsed_time_discharge: float = 0, board: int = 0, channel: int = 0, voltage: float = 0.0, state: Any = None, time_121: float = 0.0, time_120: float = 0.0):
407    def record(
408        self,
409        elapsed_time: float = 0,
410        elapsed_time_discharge: float = 0,
411        board: int = 0,
412        channel: int = 0,
413        voltage: float = 0.0,
414        state: Any = None,
415        time_121: float = 0.0,
416        time_120: float = 0.0,
417    ):
418        """Record the NiCd cell data."""
419        self.write_row([elapsed_time, elapsed_time_discharge, board, channel, voltage, state, time_121, time_120])

Record the NiCd cell data.

class DMMAC(CSVBase):
422class DMMAC(CSVBase):
423    """Functions for recording the DMM AC current."""
424
425    header = [
426        "Elapsed time (s)",
427        "AC Current (A)",
428    ]
429
430    def record(self, elapsed_time: float = 0, current: float = 0):
431        """Record the DMM AC current."""
432        self.write_row([elapsed_time, current])

Functions for recording the DMM AC current.

header = ['Elapsed time (s)', 'AC Current (A)']

The csv header.

def record(self, elapsed_time: float = 0, current: float = 0):
430    def record(self, elapsed_time: float = 0, current: float = 0):
431        """Record the DMM AC current."""
432        self.write_row([elapsed_time, current])

Record the DMM AC current.

class NiCdKorad(CSVBase):
435class NiCdKorad(CSVBase):
436    """Functions for recording the DMM AC current."""
437
438    header = [
439        "Elapsed time (s)",
440        "Status flags",
441        "Current (A)",
442        "Voltage (V)",
443    ]
444
445    def record(self, elapsed_time: float = 0, status_flags: str = "", current: float = 0, voltage: float = 0):
446        """Record the DMM AC current."""
447        self.write_row([elapsed_time, status_flags, current, voltage])

Functions for recording the DMM AC current.

header = ['Elapsed time (s)', 'Status flags', 'Current (A)', 'Voltage (V)']

The csv header.

def record( self, elapsed_time: float = 0, status_flags: str = '', current: float = 0, voltage: float = 0):
445    def record(self, elapsed_time: float = 0, status_flags: str = "", current: float = 0, voltage: float = 0):
446        """Record the DMM AC current."""
447        self.write_row([elapsed_time, status_flags, current, voltage])

Record the DMM AC current.

class Thermo(CSVBase):
450class Thermo(CSVBase):
451    """Functions for recording the thermocouples."""
452
453    @property
454    def header(self) -> list[str]:
455        header_list: list[str] = ["Elapsed Time (s)", "Seconds since epoch (s)"]
456        header_list.extend(f"Channel {thermocouple_id}" for thermocouple_id in self.bms_hardware.thermo_couples)
457        return header_list
458
459    def record(self, elapsed_time: float = 0):
460        """Record the DMM AC current."""
461        temps: list[float] = []
462        for thermocouple_id, thermocouple in self.bms_hardware.thermo_couples.items():
463            temps.append(thermocouple.temperature)
464            logger.write_debug_to_report(f"Thermocouple channel {thermocouple_id}: {temps[-1]} °C")
465        self.write_row([elapsed_time, time.time(), *temps])

Functions for recording the thermocouples.

header: list[str]
453    @property
454    def header(self) -> list[str]:
455        header_list: list[str] = ["Elapsed Time (s)", "Seconds since epoch (s)"]
456        header_list.extend(f"Channel {thermocouple_id}" for thermocouple_id in self.bms_hardware.thermo_couples)
457        return header_list

The csv header.

def record(self, elapsed_time: float = 0):
459    def record(self, elapsed_time: float = 0):
460        """Record the DMM AC current."""
461        temps: list[float] = []
462        for thermocouple_id, thermocouple in self.bms_hardware.thermo_couples.items():
463            temps.append(thermocouple.temperature)
464            logger.write_debug_to_report(f"Thermocouple channel {thermocouple_id}: {temps[-1]} °C")
465        self.write_row([elapsed_time, time.time(), *temps])

Record the DMM AC current.

class CSVRecorders:
468class CSVRecorders:
469    """Access to CSV recorders."""
470
471    def __init__(self, bms_hardware: BMSHardware):
472        """Initialize CSV recorders."""
473        self.ocv = OCV(bms_hardware)
474        self.cycle: CSVBase = Cycle(bms_hardware)
475        self.raw_voltage = RawVoltages(bms_hardware)
476        self.raw_voltage_average = RawVoltagesAverage(bms_hardware)
477        self.cycle_smbus = CycleSMBus(bms_hardware)
478        self.nicd = NiCdQC(bms_hardware)
479        self.dmm_ac = DMMAC(bms_hardware)
480        self.nicd_korad = NiCdKorad(bms_hardware)
481        self.raw_impedance = RawImpedance(bms_hardware)
482        self.raw_impedance_time = RawImpedance(bms_hardware)
483        self.thermo = Thermo(bms_hardware)
484
485        # This can be done dynamically, but of course the type checker really doesn't like it
486        # for csv_child in CSVBase.__subclasses__():
487        #     if not inspect.isabstract(csv_child):  # Child is confirmed concrete
488        #         setattr(self, csv_child.__name__.lower(), csv_child(bms_hardware))  # type: ignore
489
490    # def __getattr__(self, name):
491    #     """Called for undefined attributes."""
492    #     raise AttributeError(f"'{self.__name__}' object has no attribute '{name}'")

Access to CSV recorders.

CSVRecorders(bms_hardware: hitl_tester.modules.bms.bms_hw.BMSHardware)
471    def __init__(self, bms_hardware: BMSHardware):
472        """Initialize CSV recorders."""
473        self.ocv = OCV(bms_hardware)
474        self.cycle: CSVBase = Cycle(bms_hardware)
475        self.raw_voltage = RawVoltages(bms_hardware)
476        self.raw_voltage_average = RawVoltagesAverage(bms_hardware)
477        self.cycle_smbus = CycleSMBus(bms_hardware)
478        self.nicd = NiCdQC(bms_hardware)
479        self.dmm_ac = DMMAC(bms_hardware)
480        self.nicd_korad = NiCdKorad(bms_hardware)
481        self.raw_impedance = RawImpedance(bms_hardware)
482        self.raw_impedance_time = RawImpedance(bms_hardware)
483        self.thermo = Thermo(bms_hardware)
484
485        # This can be done dynamically, but of course the type checker really doesn't like it
486        # for csv_child in CSVBase.__subclasses__():
487        #     if not inspect.isabstract(csv_child):  # Child is confirmed concrete
488        #         setattr(self, csv_child.__name__.lower(), csv_child(bms_hardware))  # type: ignore

Initialize CSV recorders.

ocv
cycle: CSVBase
raw_voltage
raw_voltage_average
cycle_smbus
nicd
dmm_ac
nicd_korad
raw_impedance
raw_impedance_time
thermo