hitl_tester.modules.bms.bms_hw

The main API for accessing all hardware.

(c) 2020-2024 TurnAround Factor, Inc.

#

CUI DISTRIBUTION CONTROL

Controlled by: DLA J68 R&D SBIP

CUI Category: Small Business Research and Technology

Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS

POC: GOV SBIP Program Manager Denise Price, 571-767-0111

Distribution authorized to U.S. Government Agencies only, to protect information not owned by the

U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that

it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests

for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,

Fort Belvoir, VA 22060-6221

#

SBIR DATA RIGHTS

Contract No.:SP4701-23-C-0083

Contractor Name: TurnAround Factor, Inc.

Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005

Expiration of SBIR Data Rights Period: September 24, 2029

The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer

software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights

in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause

contained in the above identified contract. No restrictions apply after the expiration date shown above. Any

reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce

the markings.

flowchart LR classDef highlighted stroke:#f00, stroke-width:3px HITL_TESTER[hitl_tester.py] --> PLAN(Test Plan) PLAN -->|Args| CASE1[Test Case 1.py] PLAN -->|Args| CASE2[Test Case 2.py] PLAN -->|Args| CASE3[Test Case 3.py] CASE1 --> BMS_HW[bms_hw.py]:::highlighted CASE2 --> BMS_HW CASE3 --> BMS_HW CONFIG(Configuration File) --> BMS_HW BMS_HW --> HW(Hardware) & LOG(Logging) click HITL_TESTER "../hitl_tester.html" click CASE1 "../test_cases.html" click CASE2 "../test_cases.html" click CASE3 "../test_cases.html" click BMS_HW "bms_hw.html"
  1"""
  2The main API for accessing all hardware.
  3
  4# (c) 2020-2024 TurnAround Factor, Inc.
  5#
  6# CUI DISTRIBUTION CONTROL
  7# Controlled by: DLA J68 R&D SBIP
  8# CUI Category: Small Business Research and Technology
  9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111
 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
 15# Fort Belvoir, VA 22060-6221
 16#
 17# SBIR DATA RIGHTS
 18# Contract No.:SP4701-23-C-0083
 19# Contractor Name: TurnAround Factor, Inc.
 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
 21# Expiration of SBIR Data Rights Period: September 24, 2029
 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
 27# the markings.
 28
 29```mermaid
 30flowchart LR
 31
 32classDef highlighted stroke:#f00, stroke-width:3px
 33
 34HITL_TESTER[hitl_tester.py] --> PLAN(Test Plan)
 35PLAN -->|Args| CASE1[Test Case 1.py]
 36PLAN -->|Args| CASE2[Test Case 2.py]
 37PLAN -->|Args| CASE3[Test Case 3.py]
 38
 39CASE1 --> BMS_HW[bms_hw.py]:::highlighted
 40CASE2 --> BMS_HW
 41CASE3 --> BMS_HW
 42
 43CONFIG(Configuration File) --> BMS_HW
 44
 45BMS_HW --> HW(Hardware) & LOG(Logging)
 46
 47click HITL_TESTER "../hitl_tester.html"
 48click CASE1 "../test_cases.html"
 49click CASE2 "../test_cases.html"
 50click CASE3 "../test_cases.html"
 51click BMS_HW "bms_hw.html"
 52```
 53"""
 54
 55from __future__ import annotations
 56
 57import inspect
 58import os
 59import pathlib
 60import signal
 61import time
 62from collections import deque
 63from threading import Event
 64from types import FrameType
 65from typing import ClassVar, Any, cast
 66
 67import pytest
 68import pyvisa as visa
 69from typing_extensions import Self
 70
 71import hitl_tester.modules.bms.bms_hw
 72from hitl_tester.modules.bms import pseudo_hardware
 73from hitl_tester.modules.bms.cell import Cell
 74from hitl_tester.modules.bms.charger import Charger
 75from hitl_tester.modules.bms.chroma_sim import ChromaCell
 76from hitl_tester.modules.bms.dmm import Dmm
 77from hitl_tester.modules.bms.korad import Korad
 78from hitl_tester.modules.bms.load import Load
 79from hitl_tester.modules.bms.m300_dmm import M300Dmm
 80from hitl_tester.modules.bms.ngi_sim import NGICell
 81from hitl_tester.modules.bms.plateset import Plateset
 82from hitl_tester.modules.bms.thermal_chamber_modbus import ThermalChamber
 83from hitl_tester.modules.bms.thermoplate import ThermoCouple
 84from hitl_tester.modules.bms_types import (
 85    OverVoltageError,
 86    UnderVoltageError,
 87    BMSFlags,
 88    TimeoutExceededError,
 89    ValueLogError,
 90    ChargerOrLoad,
 91    ResourceNotFoundError,
 92    SafeResource,
 93    StopWatch,
 94    BatteryType,
 95    NiCdChargeCycle,
 96    DischargeType,
 97    OverTemperatureError,
 98)
 99from hitl_tester.modules.bms.csv_tables import CSVRecorders
100from hitl_tester.modules.file_lock import FileEvent
101from hitl_tester.modules.logger import logger
102
103# Program Constants
104ABS_PATH = pathlib.Path(__file__).parent.resolve()
105HITL_CONFIG = ABS_PATH / ".." / "hitl_config.yml"
106TEST_CASES_PATH = ABS_PATH / ".." / "test_cases"
107TEST_PLANS_PATH = ABS_PATH / ".." / "test_plans"
108OVERTEMP_C = 65
109
110
111class SafeShutdown:
112    """Manage shutting down safely."""
113
114    SIGNALS = [signal.SIGABRT, signal.SIGHUP, signal.SIGINT, signal.SIGTERM]
115
116    @staticmethod
117    def handler(_signo, _stack_frame):
118        """
119        This handler will be called during normal / abnormal termination.
120        i.e. ssh connection drops out. In this event, we're going to
121        ensure that the test setup is transitioned to a safe state.
122        """
123        # pylint: disable=comparison-with-callable  # We want to check handler function
124        # Failsafe: CTRL-Z sends kill
125        signal.signal(signal.SIGTSTP, lambda _signo, _stack_frame: signal.raise_signal(signal.SIGKILL))
126        if _signo:
127            for signal_no in signal.valid_signals():  # Ignore all future signals if already shutting down
128                if signal.getsignal(signal_no) == SafeShutdown.handler and signal_no != signal.SIGINT:
129                    logger.write_info_to_report(f"Ignoring {signal.Signals(signal_no).name}")
130                    signal.signal(signal_no, signal.SIG_IGN)
131            logger.write_critical_to_report(f"Caught {signal.Signals(_signo).name}, Powering OFF the BMS!!!")
132        pytest.exit("BMS died, exiting test suite early", 1)  # Exit gracefully
133
134
135class UserInterrupt:
136    """Suspend / Continue the program on CTRL-Z events."""
137
138    interrupt_requested = Event()  # Suspend by default
139
140    @staticmethod
141    def init():
142        """Do not suspend by default."""
143        UserInterrupt.interrupt_requested.set()
144
145    @staticmethod
146    def handler(_signo, _stack_frame):
147        """Toggle program suspension."""
148        if UserInterrupt.interrupt_requested.is_set():
149            logger.write_info_to_report("User requested test suspension!")
150            UserInterrupt.interrupt_requested.clear()
151        else:
152            logger.write_info_to_report("User requested test continuation!")
153            UserInterrupt.interrupt_requested.set()
154
155    @staticmethod
156    def force_pause():
157        """Pause the program."""
158        logger.write_info_to_report("Program forced test suspension!")
159        UserInterrupt.interrupt_requested.clear()
160        UserInterrupt.check_user_interrupt()
161
162    @staticmethod
163    def check_user_interrupt(*relays: ChargerOrLoad):
164        """Cooperatively check if a user interrupt was received."""
165        assert BMSHardware.instance is not None
166
167        if not UserInterrupt.interrupt_requested.is_set():
168            logger.write_info_to_report("Suspending test")
169            for relay in relays:
170                relay.disable()
171            BMSHardware.instance.timer.stop()
172            UserInterrupt.interrupt_requested.wait()
173            BMSHardware.instance.timer.start()
174            logger.write_info_to_report("Continuing test")
175            for relay in relays:
176                relay.enable()
177
178
179class BMSHardware:
180    """The main API for accessing all hardware."""
181
182    instance: ClassVar[Self | None] = None
183    initialized: bool = False
184
185    def __new__(cls, flags: BMSFlags | None = None):
186        """Make BMS hardware a singleton."""
187        if cls.instance is None:
188            cls.instance = super().__new__(cls)
189        return cls.instance
190
191    def __init__(self, flags: BMSFlags | None = None):  # TODO(JA): document attributes
192        """Initialize the BMS hardware."""
193        if flags is None:
194            return
195
196        # Battery attributes
197        self.remaining_capacity_percentage = 0.0
198        self.battery_type: BatteryType = BatteryType.UNDEFINED  # Initialize to unknown
199        self.max_time = 0.0
200        self.sample_interval = 0.0
201        self.voltage = 0.0
202        self.current = 0.0
203        self.current_limit = 10.0
204        self.overtemp_c = OVERTEMP_C
205
206        # Charging attributes
207        self.minimum_readings = 0  # How many readings to take before passing (for erroneous readings).
208        self.nicd_charge_type: NiCdChargeCycle = NiCdChargeCycle.UNDEFINED
209        self.nicd_cell_count = 0
210        self.termination_current = 0.0
211        self.ov_protection = 0.0
212
213        # Discharging attributes
214        self.resistance = 0.0
215        self.discharge_type: DischargeType = DischargeType.UNDEFINED
216        self.discharge_until_undervoltage = False
217        self.percent_discharge = 0.0
218        self.total_ah = 0.0
219        self.uv_protection = 0.0
220
221        # Hardware attributes
222        self.charger: Charger | None = None
223        self.load: Load | None = None
224        self.dmm: Dmm | M300Dmm | None = None
225        self.power_dmm: Dmm | None = None
226        self.korads: dict[int, Korad] = {}
227        self.thermal_chamber = pseudo_hardware.ThermalChamber() if flags.dry_run else ThermalChamber()
228        self.plateset = Plateset()
229        self.plateset_id = flags.plateset_id
230        self.thermo_couples: dict[int, ThermoCouple] = {}
231        self.cells: dict[int, Cell | NGICell | ChromaCell] = {}
232        self.cell_chemistry = flags.cell_chemistry
233        self.dry_run = flags.dry_run
234
235        # Track total test time
236        self.timer = StopWatch()
237
238        # Logging / Documentation attributes
239        self.config = flags.config
240        self.doc_generation = flags.doc_generation
241        self.report_filename = flags.report_filename
242        self.csv: CSVRecorders = CSVRecorders(cast(hitl_tester.modules.bms.bms_hw.BMSHardware, self))  # Must occur last
243
244        # Modify globals of caller
245        assert isinstance(current_frame := inspect.currentframe(), FrameType)
246        assert isinstance(caller := current_frame.f_back, FrameType)
247        if flags.properties:
248            for global_var, value in flags.properties.items():
249                caller.f_globals[global_var] = value  # Globals must occur before init is called
250
251    def init(self):
252        """Initialize the BMSHardware object."""
253        if self.initialized:
254            return
255        self.initialized = True
256
257        def get_embed(header: str, key: str, result_type: type) -> Any:
258            """get embedded value from config (i.e. value in dict in list)."""
259            default_item: list[str | dict[str, Any]] = []
260            items = self.config.get(header, default_item)
261            if isinstance(items, list):
262                for item in items:
263                    if isinstance(item, dict) and (result := item.get(key)):
264                        try:
265                            return result_type(result)
266                        except (ValueError, TypeError) as exc:
267                            raise RuntimeError(
268                                f"Expected {result_type.__name__} type, got {type(result).__name__} type"
269                            ) from exc
270            raise RuntimeError(f"Could not find {key} key in {header}")
271
272        if self.doc_generation:  # Skip if generating docs
273            return
274
275        # Make sure the charger and the load switches are OFF before doing any testing
276        logger.write_info_to_report("Turning off charger and load switch")
277        self.plateset.charger_switch = False
278        self.plateset.load_switch = False
279
280        # setup some signals to a signal handler so that we can gracefully
281        # shutdown in the event that test is somehow interrupted
282        logger.write_debug_to_report("Setting up Safe Shutdown Handlers")
283        for signal_flag in SafeShutdown.SIGNALS:
284            signal.signal(signal_flag, SafeShutdown.handler)
285
286        # Set up signal terminal stop handler (CTRL-Z)
287        UserInterrupt.init()
288        signal.signal(signal.SIGTSTP, UserInterrupt.handler)
289
290        # FIXME(JA): device classes should manage their own configs/initializations
291        # FIXME(JA): don't assume config is correct
292
293        logger.write_info_to_report("Polling hardware")
294        if self.dry_run:
295            if self.config.get("power_supply_id", "") is not None:
296                logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DP711,DP7A252000573,00.01.05")
297                logger.write_info_to_report("Found DP711 Power Supply")
298                self.charger = pseudo_hardware.Charger()
299
300            if self.config.get("load_id", "") is not None:
301                logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DL3031A,DL3D243000300,00.01.05.00.01")
302                logger.write_info_to_report("Found DL3031A Electronic load")
303                self.load = pseudo_hardware.Load()
304
305            if self.config.get("dmm_id", "") is not None:
306                logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00")
307                logger.write_info_to_report("Found DM3068 DMM")
308                self.dmm = pseudo_hardware.Dmm()
309
310            if self.config.get("m300_dmm") is not None:
311                logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00")
312                logger.write_info_to_report("Found DM3068 DMM")
313                self.dmm = pseudo_hardware.M300Dmm(self.config["m300_dmm"])
314
315            for korad in self.config.get("korads", []):
316                logger.write_debug_to_report(f"Resource ID: KORAD, {korad['part_id']}")
317                logger.write_info_to_report("Found Korad")
318                new_korad = pseudo_hardware.Korad(korad["id"])
319                self.korads[new_korad.id] = new_korad
320
321            cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else None
322            if cell_model == "agilent":
323                for cell in self.config.get("cell_simulators", []):
324                    if "type" not in cell:
325                        logger.write_debug_to_report("Resource ID: Cell Sim, 04.08.15.16.23.42")
326                        logger.write_info_to_report("Found Cell Sim")
327                        new_cell = pseudo_hardware.Cell(cell["id"], self.cell_chemistry)
328                        self.cells[new_cell.id] = new_cell
329
330        else:
331            resource_manager = visa.ResourceManager()
332            resources = resource_manager.list_resources()
333            if not resources:
334                raise ResourceNotFoundError("No PyVISA resources found")
335
336            power_supply_id = self.config.get("power_supply_id", "RIGOL TECHNOLOGIES,DP711")
337            load_id = self.config.get("load_id", "RIGOL TECHNOLOGIES,DL3031A")
338            dmm_id = self.config.get("dmm_id", "Rigol Technologies,DM3068")
339            power_dmm_id = self.config.get("power_dmm_id")
340            if "m300_dmm" in self.config:
341                dmm_id = "M300"
342
343            cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else ""
344            if cell_model == "chroma":  # Add chroma to resources
345                address = get_embed("cell_simulators", "address", str)
346                resources = (*resources, (f"TCPIP::{address}::60000::SOCKET", "\n", "\n"))
347            config_cells = [cell for cell in self.config.get("cell_simulators", []) if "type" not in cell]
348
349            for resource_address in resources:
350                safe_resource = SafeResource(resource_address)
351                for baud_rate in (9600, 115200):
352                    safe_resource.baud_rate = baud_rate
353                    if (idn := safe_resource.query("*IDN?")) != safe_resource.default_result:
354                        logger.write_debug_to_report(f"Resource ID: {idn.strip(chr(10))}")
355
356                        if power_supply_id is not None and power_supply_id in idn:  # Power supply
357                            logger.write_info_to_report("Found DP711 Power Supply")
358                            self.charger = Charger(safe_resource)
359
360                        elif load_id is not None and load_id in idn:  # Electronic load
361                            logger.write_info_to_report("Found DL3031A Electronic load")
362                            self.load = Load(safe_resource)
363
364                        elif dmm_id is not None and dmm_id in idn:  # DMM
365                            logger.write_info_to_report("Found DMM")
366                            if "m300_dmm" in self.config:
367                                self.dmm = M300Dmm(safe_resource, self.config["m300_dmm"])
368                            else:
369                                self.dmm = Dmm(safe_resource)
370
371                        elif power_dmm_id is not None and power_dmm_id in idn:  # DMM
372                            logger.write_info_to_report("Found Power DMM")
373                            self.power_dmm = Dmm(safe_resource)
374
375                        elif any((match := korad)["part_id"] in idn for korad in self.config.get("korads", [])):
376                            logger.write_info_to_report("Found KA6003P Power Supply")
377                            new_korad = Korad(match["id"], safe_resource)
378                            self.korads[new_korad.id] = new_korad
379
380                        elif cell_model == "agilent" and any(
381                            (match := cell)["part_id"] in idn for cell in config_cells
382                        ):
383                            logger.write_info_to_report("Found Agilent Cell Sim")
384                            new_cell = Cell(match["id"], safe_resource, self.cell_chemistry)
385                            new_cell.reset()
386                            self.cells[new_cell.id] = new_cell
387
388                        elif cell_model == "ngi" and "NGI,N83624" in idn:
389                            logger.write_info_to_report("Found NGI Cell Sim")
390                            cell_count = get_embed("cell_simulators", "cell_count", int)
391                            for cell_id in range(1, cell_count + 1):
392                                new_cell = NGICell(cell_id, safe_resource, self.cell_chemistry)
393                                self.cells[new_cell.id] = new_cell
394                                if cell_id == cell_count:
395                                    new_cell.reset()
396
397                        elif cell_model == "chroma" and "Chroma,87001" in idn:
398                            logger.write_info_to_report("Found Chroma Cell Sim")
399                            new_cell = None
400                            for cell_id, cell in enumerate(config_cells):
401                                if "cell_slot" in cell:
402                                    new_cell = ChromaCell(
403                                        cell_id, safe_resource, self.cell_chemistry, cell["cell_slot"]
404                                    )
405                                    self.cells[new_cell.id] = new_cell
406                            if new_cell:
407                                new_cell.reset()  # FIXME(JA): does this cause problems for the other side if running?
408
409                        else:
410                            logger.write_warning_to_report("Unknown Resource")
411
412                        break
413
414                    logger.write_warning_to_report(f"Failed to connect to: {resource_address}")
415                    logger.write_debug_to_report("Attempting higher baud rate")
416
417            if power_supply_id is not None and self.charger is None:
418                raise ResourceNotFoundError("Power Supply Not Found")
419
420            if load_id is not None and self.load is None:
421                raise ResourceNotFoundError("Load Simulator Not Found")
422
423            if dmm_id is not None and self.dmm is None:
424                raise ResourceNotFoundError("DMM Not Found")
425
426            if power_dmm_id is not None and self.power_dmm is None:
427                raise ResourceNotFoundError("Power DMM Not Found")
428
429            if len(self.korads) < len(self.config.get("korads", [])):
430                for korad in self.config["korads"]:
431                    if korad["id"] not in self.korads:
432                        raise ResourceNotFoundError(f'Unable to locate Korad {korad["id"]}')
433            self.korads = dict(sorted(self.korads.items()))  # Sort korads by id
434
435            if cell_model == "agilent" and len(self.cells) < len(config_cells):
436                for cell in config_cells:
437                    if cell["id"] not in self.cells:
438                        raise ResourceNotFoundError(f'Unable to locate Cell {cell["id"]} Simulator')
439            if cell_model == "ngi" and len(self.cells) == 0:
440                raise ResourceNotFoundError("Unable to locate NGI Cell Simulator")
441            if cell_model == "chroma" and len(self.cells) == 0:
442                raise ResourceNotFoundError("Unable to locate Chroma Cell Simulator")
443            self.cells = dict(sorted(self.cells.items()))  # Sort cells by id
444
445        # Now go through the ThermoCouples
446        for thermo_couple in self.config.get("thermo_couples", []):
447            new_thermo_couple = ThermoCouple(thermo_couple["id"], thermo_couple["board"], thermo_couple["channel"])
448            self.thermo_couples[new_thermo_couple.thermocouple_id] = new_thermo_couple
449
450        logger.write_debug_to_report("Setting up SIGCHLD Handler")  # Must be set up afterward due to VISA raising it
451        signal.signal(signal.SIGCHLD, SafeShutdown.handler)
452
453    # FIXME(JA): move into its own module, use exceptions
454
455    @property
456    def temperature(self) -> float:
457        """Temperature measurement used by run cycles."""
458        return float(self.thermo_couples[1].temperature) if 1 in self.thermo_couples else self.plateset.temperature
459
460    def run_li_charge_cycle(self, digits=6):
461        """Run a Li charge cycle."""
462        if self.voltage == 0:
463            raise ValueLogError("Charging Voltage Invalid")
464
465        if self.ov_protection == 0:
466            raise ValueLogError("Over-Voltage Protection Invalid")
467
468        if self.current == 0:
469            raise ValueLogError("Charge Current Invalid")
470
471        if self.termination_current == 0:
472            raise ValueLogError("Termination Current Invalid")
473
474        if self.max_time == 0:
475            raise ValueLogError("Max Time Invalid")
476
477        if self.sample_interval == 0:
478            raise ValueLogError("Charging Sample Time Invalid")
479
480        if self.minimum_readings == 0:
481            raise ValueLogError("Minimum Readings Invalid")
482
483        self.percent_discharge = -1  # For capacity calculations
484
485        # Create a queue to hold the previous n readings. This is used to avoid erroneous errors.
486        current_measurement_queue = deque(maxlen=self.minimum_readings)
487
488        logger.write_info_to_report(
489            f"Starting charge cycle (CV: {self.voltage}V, OV: {self.ov_protection}V, "
490            f"CI: {self.current}A, TI: {self.termination_current}A, MT: {self.max_time}s, "
491            f"SI: {self.sample_interval}s, MR: {self.minimum_readings}"
492            ")"
493        )
494
495        with self.charger(self.voltage, self.current):
496
497            # Starting values for the while loop
498            latest_i = self.termination_current + 0.1  # placeholder for the latest charging current measurement
499            current_measurement_queue.append(latest_i)
500            amp_hrs = 0  # amp hours
501            latest_v = 0  # placeholder for the latest charging voltage measurement
502            elapsed_time = 0
503            temp_c = 0
504
505            self.timer.reset()  # Keep track of runtime
506
507            # charge until termination current is reached or ov_protection or max_time
508            while any(i >= self.termination_current for i in current_measurement_queue):
509                if latest_v >= self.ov_protection:
510                    self.charger.disable()
511                    raise OverVoltageError(
512                        f"Overvoltage protection triggered at {time.strftime('%x %X')}. "
513                        f"Voltage {latest_v} is higher than {self.ov_protection}."
514                    )
515                if elapsed_time >= self.max_time:
516                    self.charger.disable()
517                    raise TimeoutExceededError(
518                        f"Termination current of {self.termination_current}A was "
519                        f"not reached after {self.max_time} seconds"
520                    )
521                if temp_c >= self.overtemp_c:
522                    self.charger.disable()
523                    raise OverTemperatureError(
524                        f"Overtemperature protection triggered at {time.strftime('%x %X')}. "
525                        f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C."
526                    )
527
528                UserInterrupt.check_user_interrupt(self.charger)  # Pause if necessary, turning off charger relay
529
530                t1 = elapsed_time  # previous elapsed time for calculating delta t
531                elapsed_time = round(self.timer.elapsed_time, digits)
532                dt = round(elapsed_time - t1, digits)  # delta t
533
534                latest_v = self.dmm.volts
535
536                i1 = latest_i  # previous current reading
537                latest_i = self.charger.amps
538                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
539                current_measurement_queue.append(latest_i)
540
541                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # milliamp hour calculation
542                temp_c = round(self.temperature, digits)  # grab the temperature
543
544                logger.write_info_to_report(
545                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, "
546                    f"Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}"
547                )
548
549                self.csv.cycle.record(elapsed_time, latest_i, amp_hrs)
550                time.sleep(self.sample_interval)
551
552        self.remaining_capacity_percentage = 100
553
554    def run_nicd_charge_cycle(self, digits=6):
555        """Run a NiCad charge cycle."""
556
557        if self.nicd_cell_count == 0:
558            raise ValueLogError("Cell Count Invalid")
559
560        if self.max_time == 0:
561            raise ValueLogError("Charge Time Invalid")
562
563        if self.sample_interval == 0:
564            raise ValueLogError("Charge Sample Interval Invalid")
565
566        if self.current == 0:
567            raise ValueLogError("Charge Current Invalid")
568
569        if self.nicd_charge_type is NiCdChargeCycle.UNDEFINED:
570            raise ValueLogError("Charge Type Invalid")
571
572        self.percent_discharge = -1  # For capacity calculations
573        self.ov_protection = min(30.0, self.nicd_cell_count * 1.95)
574
575        logger.write_info_to_report(
576            f"Starting {self.nicd_charge_type.name.lower()} NiCd charge cycle ("
577            f"CV: {self.ov_protection}V, OV: {self.ov_protection}V, CI: {self.current}A, MT: {self.max_time}s, "
578            f"SI: {self.sample_interval}s)"
579        )
580
581        self.charger.set_profile(self.ov_protection, self.current)
582        self.charger.enable()  # Enables the output of the charger
583
584        # initialize charge control variables
585        latest_i = 0
586        amp_hrs = 0
587        latest_v = 0
588        elapsed_time = 0
589        max_v = 0
590        first_cycle = True
591
592        self.timer.reset()  # Keep track of runtime
593
594        if self.nicd_charge_type in (NiCdChargeCycle.STANDARD, NiCdChargeCycle.CUSTOM):
595            while latest_v < self.ov_protection and elapsed_time < self.max_time:
596                UserInterrupt.check_user_interrupt(self.charger)  # Pause if necessary, turning off charger relay
597                t1 = elapsed_time
598                elapsed_time = round(self.timer.elapsed_time, digits)
599                dt = round(elapsed_time - t1, digits)
600
601                # This section allows for fairly precise sample interval
602                if dt < self.sample_interval and not first_cycle:
603                    while dt < self.sample_interval:
604                        elapsed_time = round(self.timer.elapsed_time, digits)
605                        dt = round(elapsed_time - t1, digits)  # delta t
606                        time.sleep(0.0005)
607
608                latest_v = self.dmm.volts
609
610                # keep track of the highest cell voltage seen
611                max_v = max(max_v, latest_v)
612
613                i1 = latest_i  # previous current reading
614                latest_i = self.charger.amps  # current reading
615
616                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
617                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
618                temp_c = round(self.temperature, digits)  # grab the temperature
619
620                logger.write_info_to_report(
621                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, "
622                    f"Ah: {amp_hrs}, Temp: {temp_c}"
623                )
624                self.csv.cycle.record(elapsed_time, latest_i, amp_hrs)
625                first_cycle = False
626
627        elif self.nicd_charge_type == NiCdChargeCycle.DV_DT:
628            dv = 0.005 * self.nicd_cell_count
629            while (max_v - latest_v) < dv and latest_v < self.ov_protection and elapsed_time < self.max_time:
630                UserInterrupt.check_user_interrupt(self.charger)  # Pause if necessary, turning off charger relay
631                t1 = elapsed_time  # previous elapsed time for calculating delta t
632                elapsed_time = round(self.timer.elapsed_time, digits)
633                dt = round(elapsed_time - t1, digits)  # delta t
634
635                # This section allows for fairly precise sample interval
636                if dt < self.sample_interval and not first_cycle:
637                    while dt < self.sample_interval:
638                        elapsed_time = round(self.timer.elapsed_time, digits)
639                        dt = round(elapsed_time - t1, digits)  # delta t
640                        time.sleep(0.0005)
641
642                latest_v = self.dmm.volts
643                max_v = max(max_v, latest_v)  # keep track of highest cell voltage seen
644                i1 = latest_i  # previous current reading
645                latest_i = self.charger.amps  # current reading
646                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
647                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
648                temp_c = round(self.temperature, digits)  # grab the temperature
649
650                logger.write_info_to_report(
651                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, "
652                    f"Ah: {amp_hrs}, Temp: {temp_c}"
653                )
654                first_cycle = False
655
656        self.charger.disable()  # Charging is complete, turn off the charger
657        self.remaining_capacity_percentage = 100
658
659        logger.write_info_to_report(f"Charge Cycle complete, Ah: {amp_hrs}")
660        return amp_hrs
661
662    def run_discharge_step_cycle(self, digits=6):
663        """Run a discharge step cycle."""
664        if self.max_time == 0:
665            raise ValueLogError("Max Time Invalid")
666
667        if self.current == 0:
668            raise ValueLogError("Discharge rate Invalid")
669
670        if self.sample_interval == 0:
671            raise ValueLogError("Sample Interval Invalid")
672
673        if self.uv_protection == 0:
674            raise ValueLogError("Undervoltage Protection Invalid")
675
676        if self.percent_discharge == 0:
677            raise ValueLogError("Percent Discharge Invalid")
678
679        if self.total_ah == 0:
680            raise ValueLogError("Total Ah Invalid")
681
682        delta_ah = self.total_ah * self.percent_discharge
683        # discharge_time = ((self.total_ah * self.percent_discharge) / self.discharge_i) * 3600
684        logger.write_info_to_report(
685            f"Starting discharge step cycle ("
686            f"DA: {delta_ah}Ah, UV: {self.uv_protection}V, DI: {self.current}A, "
687            f"PD: {self.percent_discharge * 100}%, TA: {self.total_ah}Ah, SI: {self.sample_interval}s, "
688            f"RC: {self.remaining_capacity_percentage}, MT: {self.max_time}, DV: {self.discharge_until_undervoltage})"
689        )
690
691        self.load.mode_cc()
692        if self.current > 6:
693            self.load.amps_range = 60  # if current is > 6A set to high range
694        self.load.amps = self.current
695        self.load.enable()
696
697        # Place holder values
698        latest_i = 0
699        amp_hrs = 0  # mA hours
700        latest_v = self.uv_protection + 0.01
701        elapsed_time = 0
702        first_cycle = True
703        temp_c = 0
704        # resistance = 0
705
706        self.timer.reset()  # Keep track of runtime
707
708        # discharge until Ah reached or uv_protection or max_time
709        while abs(amp_hrs) < delta_ah or self.discharge_until_undervoltage:
710            if latest_v <= self.uv_protection:
711                exception = UnderVoltageError(
712                    f"Undervoltage protection triggered at {time.strftime('%x %X')}. "
713                    f"Voltage {latest_v} is lower than {self.uv_protection}"
714                )
715                if self.discharge_until_undervoltage:
716                    logger.write_info_to_report(f"{type(exception).__name__} - Continuing test")
717                    break
718                raise exception
719
720            if elapsed_time >= self.max_time:
721                raise TimeoutExceededError(f"Delta Ah of {delta_ah} was not reached after {self.max_time} seconds")
722
723            if temp_c >= self.overtemp_c:
724                raise OverTemperatureError(
725                    f"Overtemperature protection triggered at {time.strftime('%x %X')}. "
726                    f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C."
727                )
728
729            t1 = elapsed_time  # previous elapsed time for calculating delta t
730            elapsed_time = round(self.timer.elapsed_time, digits)
731            dt = round(elapsed_time - t1, digits)  # delta t
732
733            # This section allows for fairly precise sample interval
734            if dt < self.sample_interval and not first_cycle:
735                while dt < self.sample_interval:
736                    elapsed_time = round(self.timer.elapsed_time, digits)
737                    dt = round(elapsed_time - t1, digits)  # delta t
738                    time.sleep(0.0005)
739
740            latest_v = self.dmm.volts
741            # load_v = self.load.volts  # for measuring wiring harness resistance
742
743            i1 = latest_i  # previous current reading
744            # load_ohms = self.load.ohms
745            latest_i = round(-1 * self.load.amps, digits)
746
747            # if not first_cycle:  # skip the first cycle so that there are no divide by 0's
748            #     # wire harness resistance
749            #     resistance = round((latest_v - load_v) / (-1 * latest_i) + load_ohms, digits)
750
751            i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
752            amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
753            temp_c = round(self.temperature, digits)  # grab the temperature
754
755            logger.write_info_to_report(
756                f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}"
757            )
758            self.csv.cycle.record(elapsed_time, latest_i, amp_hrs)
759            first_cycle = False
760
761        self.load.disable()
762        self.remaining_capacity_percentage = round(
763            self.remaining_capacity_percentage - self.percent_discharge * 100, digits
764        )
765
766    def run_resting_cycle(self):
767        """Run a resting cycle"""
768        # Guard against calls before init
769        assert self.dmm is not None and self.csv is not None
770
771        if self.max_time == 0:
772            raise ValueLogError("Resting time Invalid")
773
774        if self.sample_interval == 0:
775            raise ValueLogError("Resting sample interval Invalid")
776
777        if self.remaining_capacity_percentage < 0:
778            raise ValueLogError("Remaining capacity percentage Invalid")
779
780        # Place holder values
781        self.percent_discharge = 0  # For capacity calculations
782        elapsed_time = 0.0
783        first_cycle = True
784
785        self.timer.reset()  # Keep track of runtime
786
787        logger.write_info_to_report(f"Starting rest cycle (RT: {self.max_time}, SI:{self.sample_interval})")
788
789        while elapsed_time < self.max_time:
790            UserInterrupt.check_user_interrupt()
791
792            t1 = elapsed_time  # previous elapsed time for calculating delta t
793            elapsed_time = self.timer.elapsed_time
794            dt = elapsed_time - t1  # delta t
795
796            # This section allows for fairly precise sample interval
797            if dt < self.sample_interval and not first_cycle:
798                while dt < self.sample_interval:
799                    elapsed_time = self.timer.elapsed_time
800                    dt = elapsed_time - t1  # delta t
801                    time.sleep(0.0005)
802
803            latest_v = self.dmm.volts
804            temp_c = self.temperature  # grab the temperature
805
806            logger.write_info_to_report(f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Temp: {temp_c}")
807            self.csv.cycle.record(elapsed_time)
808            first_cycle = False
809
810    def run_discharge_cycle(self, digits=6):
811        """Run a discharge cycle."""
812
813        if self.max_time == 0:
814            raise ValueLogError("Max Time Invalid")
815
816        if self.sample_interval == 0:
817            raise ValueLogError("Discharge Sample Interval Invalid")
818
819        if self.discharge_type == DischargeType.UNDEFINED:
820            raise ValueLogError("Discharge Mode Invalid")
821
822        if self.voltage == 0:
823            raise ValueLogError("Discharge Voltage Invalid")
824
825        if self.uv_protection == 0:
826            raise ValueLogError("Under Voltage Protection Invalid")
827
828        if self.discharge_type == DischargeType.CONSTANT_CURRENT:
829            # we're in current discharge mode, so make sure there is a valid current
830            if self.current == 0:
831                raise ValueLogError("Discharge Current mode selected, but Current is invalid")
832
833        elif self.discharge_type == DischargeType.CONSTANT_RESISTANCE:
834            # we're in resistance discharge mode so make sure there is a valid resistance.
835            if self.resistance == 0:
836                raise ValueLogError("Discharge Resistance mode selected, but Resistance is invalid")
837
838        logger.write_info_to_report(
839            f"Starting discharge cycle ("
840            f"DV: {self.voltage}V, UV: {self.uv_protection}V, DI: {self.current}A, DR: {self.resistance}Ohms, "
841            f"DM: {self.discharge_type}, MT: {self.max_time}s, SI: {self.sample_interval}s"
842            ")"
843        )
844
845        with self.load(self.current, self.discharge_type, self.resistance):
846
847            self.percent_discharge = 1  # For capacity calculations
848
849            # Place holder values
850            latest_i = 0
851            resistance = 0
852            amp_hrs = 0  # Amp hours
853            latest_v = self.voltage + 0.01
854            elapsed_time = 0
855            first_run = True
856            temp_c = 0
857
858            self.timer.reset()  # Keep track of runtime
859            kill_event = FileEvent("kill_discharge")
860            discharge_event = FileEvent("nicd_discharging")
861            discharge_event.set()
862
863            # discharge until discharge voltage is reached or uv_protection or max_time
864            while not (self.uv_protection < latest_v <= self.voltage or kill_event.is_set()):
865                if latest_v <= self.uv_protection:
866                    self.load.disable()
867                    raise UnderVoltageError(
868                        f"Undervoltage protection triggered at {time.strftime('%x %X')}. "
869                        f"Voltage {latest_v} is lower than {self.uv_protection}."
870                    )
871                if elapsed_time >= self.max_time:
872                    self.load.disable()
873                    raise TimeoutExceededError(
874                        f"Voltage of {self.voltage} was not reached after {self.max_time} seconds"
875                    )
876                if temp_c >= self.overtemp_c:
877                    self.load.disable()
878                    raise OverTemperatureError(
879                        f"Overtemperature protection triggered at {time.strftime('%x %X')}. "
880                        f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C."
881                    )
882
883                UserInterrupt.check_user_interrupt(self.load)  # Pause if necessary, turning off load relay
884
885                t1 = elapsed_time  # previous elapsed time for calculating delta t
886                elapsed_time = round(self.timer.elapsed_time, digits)
887                dt = round(elapsed_time - t1, digits)  # delta t
888
889                # This section allows for fairly precise sample interval
890                if dt < self.sample_interval and not first_run:
891                    while dt < self.sample_interval:
892                        elapsed_time = round(self.timer.elapsed_time, digits)
893                        dt = round(elapsed_time - t1, digits)  # delta t
894                        time.sleep(0.0005)
895
896                latest_v = self.dmm.volts
897                i1 = latest_i  # previous current reading
898                load_ohms = self.load.ohms
899                latest_i = round(-1 * self.load.amps, digits)
900
901                # skip the first cycle so that there are no divide by 0's
902                if not first_run and self.discharge_type == DischargeType.CONSTANT_RESISTANCE:
903                    load_v = self.load.volts  # for measuring wiring harness resistance
904                    try:
905                        resistance = (latest_v - load_v) / (-1 * latest_i) + load_ohms  # wire harness resistance
906                    except ZeroDivisionError:
907                        logger.write_error_to_report(
908                            "Zero division when calculating resistance. "
909                            f"Load Voltage: {load_v}, Current: {latest_i}, Resistance: {load_ohms}."
910                        )
911
912                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
913                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
914                temp_c = round(self.temperature, digits)  # grab the temperature
915
916                logger.write_info_to_report(
917                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, "
918                    f"Resistance: {resistance}, Temp: {temp_c}"
919                )
920                self.csv.cycle.record(elapsed_time, latest_i, amp_hrs, resistance)
921                first_run = False
922
923        self.remaining_capacity_percentage = 0
924        return abs(amp_hrs)  # Return a positive Ah
925
926    def delay(self, seconds: int | float):
927        """Sleep that counts towards elapsed time."""
928        time.sleep(seconds)
929
930    def pause_test(self, seconds_to_pause: float = 0, *relays: ChargerOrLoad):
931        """Pause the test for some amount of seconds."""
932        for relay in relays:
933            relay.disable()
934        if seconds_to_pause == 0:  # Wait for user input to continue
935            logger.write_info_to_report("Pausing: waiting for user input")
936            statuscode = 0xFFFF
937            while not os.WIFEXITED(statuscode):  # process terminated normally?
938                statuscode = os.system(
939                    '/bin/bash -c \'select CHOICE in "Type CONTINUE to resume test"; do '
940                    'if [ "$REPLY" = "CONTINUE" ]; then break; fi; done\''
941                )
942        else:
943            logger.write_info_to_report(f"Pausing test (S: {seconds_to_pause})")
944            time.sleep(seconds_to_pause)
945        for relay in relays:
946            relay.enable()
ABS_PATH = PosixPath('/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/site-packages/hitl_tester/modules/bms')
HITL_CONFIG = PosixPath('/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/site-packages/hitl_tester/modules/bms/../hitl_config.yml')
TEST_CASES_PATH = PosixPath('/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/site-packages/hitl_tester/modules/bms/../test_cases')
TEST_PLANS_PATH = PosixPath('/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/site-packages/hitl_tester/modules/bms/../test_plans')
OVERTEMP_C = 65
class SafeShutdown:
112class SafeShutdown:
113    """Manage shutting down safely."""
114
115    SIGNALS = [signal.SIGABRT, signal.SIGHUP, signal.SIGINT, signal.SIGTERM]
116
117    @staticmethod
118    def handler(_signo, _stack_frame):
119        """
120        This handler will be called during normal / abnormal termination.
121        i.e. ssh connection drops out. In this event, we're going to
122        ensure that the test setup is transitioned to a safe state.
123        """
124        # pylint: disable=comparison-with-callable  # We want to check handler function
125        # Failsafe: CTRL-Z sends kill
126        signal.signal(signal.SIGTSTP, lambda _signo, _stack_frame: signal.raise_signal(signal.SIGKILL))
127        if _signo:
128            for signal_no in signal.valid_signals():  # Ignore all future signals if already shutting down
129                if signal.getsignal(signal_no) == SafeShutdown.handler and signal_no != signal.SIGINT:
130                    logger.write_info_to_report(f"Ignoring {signal.Signals(signal_no).name}")
131                    signal.signal(signal_no, signal.SIG_IGN)
132            logger.write_critical_to_report(f"Caught {signal.Signals(_signo).name}, Powering OFF the BMS!!!")
133        pytest.exit("BMS died, exiting test suite early", 1)  # Exit gracefully

Manage shutting down safely.

SIGNALS = [<Signals.SIGABRT: 6>, <Signals.SIGHUP: 1>, <Signals.SIGINT: 2>, <Signals.SIGTERM: 15>]
@staticmethod
def handler(_signo, _stack_frame):
117    @staticmethod
118    def handler(_signo, _stack_frame):
119        """
120        This handler will be called during normal / abnormal termination.
121        i.e. ssh connection drops out. In this event, we're going to
122        ensure that the test setup is transitioned to a safe state.
123        """
124        # pylint: disable=comparison-with-callable  # We want to check handler function
125        # Failsafe: CTRL-Z sends kill
126        signal.signal(signal.SIGTSTP, lambda _signo, _stack_frame: signal.raise_signal(signal.SIGKILL))
127        if _signo:
128            for signal_no in signal.valid_signals():  # Ignore all future signals if already shutting down
129                if signal.getsignal(signal_no) == SafeShutdown.handler and signal_no != signal.SIGINT:
130                    logger.write_info_to_report(f"Ignoring {signal.Signals(signal_no).name}")
131                    signal.signal(signal_no, signal.SIG_IGN)
132            logger.write_critical_to_report(f"Caught {signal.Signals(_signo).name}, Powering OFF the BMS!!!")
133        pytest.exit("BMS died, exiting test suite early", 1)  # Exit gracefully

This handler will be called during normal / abnormal termination. i.e. ssh connection drops out. In this event, we're going to ensure that the test setup is transitioned to a safe state.

class UserInterrupt:
136class UserInterrupt:
137    """Suspend / Continue the program on CTRL-Z events."""
138
139    interrupt_requested = Event()  # Suspend by default
140
141    @staticmethod
142    def init():
143        """Do not suspend by default."""
144        UserInterrupt.interrupt_requested.set()
145
146    @staticmethod
147    def handler(_signo, _stack_frame):
148        """Toggle program suspension."""
149        if UserInterrupt.interrupt_requested.is_set():
150            logger.write_info_to_report("User requested test suspension!")
151            UserInterrupt.interrupt_requested.clear()
152        else:
153            logger.write_info_to_report("User requested test continuation!")
154            UserInterrupt.interrupt_requested.set()
155
156    @staticmethod
157    def force_pause():
158        """Pause the program."""
159        logger.write_info_to_report("Program forced test suspension!")
160        UserInterrupt.interrupt_requested.clear()
161        UserInterrupt.check_user_interrupt()
162
163    @staticmethod
164    def check_user_interrupt(*relays: ChargerOrLoad):
165        """Cooperatively check if a user interrupt was received."""
166        assert BMSHardware.instance is not None
167
168        if not UserInterrupt.interrupt_requested.is_set():
169            logger.write_info_to_report("Suspending test")
170            for relay in relays:
171                relay.disable()
172            BMSHardware.instance.timer.stop()
173            UserInterrupt.interrupt_requested.wait()
174            BMSHardware.instance.timer.start()
175            logger.write_info_to_report("Continuing test")
176            for relay in relays:
177                relay.enable()

Suspend / Continue the program on CTRL-Z events.

interrupt_requested = <threading.Event at 0x7f34798d27b0: unset>
@staticmethod
def init():
141    @staticmethod
142    def init():
143        """Do not suspend by default."""
144        UserInterrupt.interrupt_requested.set()

Do not suspend by default.

@staticmethod
def handler(_signo, _stack_frame):
146    @staticmethod
147    def handler(_signo, _stack_frame):
148        """Toggle program suspension."""
149        if UserInterrupt.interrupt_requested.is_set():
150            logger.write_info_to_report("User requested test suspension!")
151            UserInterrupt.interrupt_requested.clear()
152        else:
153            logger.write_info_to_report("User requested test continuation!")
154            UserInterrupt.interrupt_requested.set()

Toggle program suspension.

@staticmethod
def force_pause():
156    @staticmethod
157    def force_pause():
158        """Pause the program."""
159        logger.write_info_to_report("Program forced test suspension!")
160        UserInterrupt.interrupt_requested.clear()
161        UserInterrupt.check_user_interrupt()

Pause the program.

@staticmethod
def check_user_interrupt(*relays: hitl_tester.modules.bms_types.ChargerOrLoad):
163    @staticmethod
164    def check_user_interrupt(*relays: ChargerOrLoad):
165        """Cooperatively check if a user interrupt was received."""
166        assert BMSHardware.instance is not None
167
168        if not UserInterrupt.interrupt_requested.is_set():
169            logger.write_info_to_report("Suspending test")
170            for relay in relays:
171                relay.disable()
172            BMSHardware.instance.timer.stop()
173            UserInterrupt.interrupt_requested.wait()
174            BMSHardware.instance.timer.start()
175            logger.write_info_to_report("Continuing test")
176            for relay in relays:
177                relay.enable()

Cooperatively check if a user interrupt was received.

class BMSHardware:
180class BMSHardware:
181    """The main API for accessing all hardware."""
182
183    instance: ClassVar[Self | None] = None
184    initialized: bool = False
185
186    def __new__(cls, flags: BMSFlags | None = None):
187        """Make BMS hardware a singleton."""
188        if cls.instance is None:
189            cls.instance = super().__new__(cls)
190        return cls.instance
191
192    def __init__(self, flags: BMSFlags | None = None):  # TODO(JA): document attributes
193        """Initialize the BMS hardware."""
194        if flags is None:
195            return
196
197        # Battery attributes
198        self.remaining_capacity_percentage = 0.0
199        self.battery_type: BatteryType = BatteryType.UNDEFINED  # Initialize to unknown
200        self.max_time = 0.0
201        self.sample_interval = 0.0
202        self.voltage = 0.0
203        self.current = 0.0
204        self.current_limit = 10.0
205        self.overtemp_c = OVERTEMP_C
206
207        # Charging attributes
208        self.minimum_readings = 0  # How many readings to take before passing (for erroneous readings).
209        self.nicd_charge_type: NiCdChargeCycle = NiCdChargeCycle.UNDEFINED
210        self.nicd_cell_count = 0
211        self.termination_current = 0.0
212        self.ov_protection = 0.0
213
214        # Discharging attributes
215        self.resistance = 0.0
216        self.discharge_type: DischargeType = DischargeType.UNDEFINED
217        self.discharge_until_undervoltage = False
218        self.percent_discharge = 0.0
219        self.total_ah = 0.0
220        self.uv_protection = 0.0
221
222        # Hardware attributes
223        self.charger: Charger | None = None
224        self.load: Load | None = None
225        self.dmm: Dmm | M300Dmm | None = None
226        self.power_dmm: Dmm | None = None
227        self.korads: dict[int, Korad] = {}
228        self.thermal_chamber = pseudo_hardware.ThermalChamber() if flags.dry_run else ThermalChamber()
229        self.plateset = Plateset()
230        self.plateset_id = flags.plateset_id
231        self.thermo_couples: dict[int, ThermoCouple] = {}
232        self.cells: dict[int, Cell | NGICell | ChromaCell] = {}
233        self.cell_chemistry = flags.cell_chemistry
234        self.dry_run = flags.dry_run
235
236        # Track total test time
237        self.timer = StopWatch()
238
239        # Logging / Documentation attributes
240        self.config = flags.config
241        self.doc_generation = flags.doc_generation
242        self.report_filename = flags.report_filename
243        self.csv: CSVRecorders = CSVRecorders(cast(hitl_tester.modules.bms.bms_hw.BMSHardware, self))  # Must occur last
244
245        # Modify globals of caller
246        assert isinstance(current_frame := inspect.currentframe(), FrameType)
247        assert isinstance(caller := current_frame.f_back, FrameType)
248        if flags.properties:
249            for global_var, value in flags.properties.items():
250                caller.f_globals[global_var] = value  # Globals must occur before init is called
251
252    def init(self):
253        """Initialize the BMSHardware object."""
254        if self.initialized:
255            return
256        self.initialized = True
257
258        def get_embed(header: str, key: str, result_type: type) -> Any:
259            """get embedded value from config (i.e. value in dict in list)."""
260            default_item: list[str | dict[str, Any]] = []
261            items = self.config.get(header, default_item)
262            if isinstance(items, list):
263                for item in items:
264                    if isinstance(item, dict) and (result := item.get(key)):
265                        try:
266                            return result_type(result)
267                        except (ValueError, TypeError) as exc:
268                            raise RuntimeError(
269                                f"Expected {result_type.__name__} type, got {type(result).__name__} type"
270                            ) from exc
271            raise RuntimeError(f"Could not find {key} key in {header}")
272
273        if self.doc_generation:  # Skip if generating docs
274            return
275
276        # Make sure the charger and the load switches are OFF before doing any testing
277        logger.write_info_to_report("Turning off charger and load switch")
278        self.plateset.charger_switch = False
279        self.plateset.load_switch = False
280
281        # setup some signals to a signal handler so that we can gracefully
282        # shutdown in the event that test is somehow interrupted
283        logger.write_debug_to_report("Setting up Safe Shutdown Handlers")
284        for signal_flag in SafeShutdown.SIGNALS:
285            signal.signal(signal_flag, SafeShutdown.handler)
286
287        # Set up signal terminal stop handler (CTRL-Z)
288        UserInterrupt.init()
289        signal.signal(signal.SIGTSTP, UserInterrupt.handler)
290
291        # FIXME(JA): device classes should manage their own configs/initializations
292        # FIXME(JA): don't assume config is correct
293
294        logger.write_info_to_report("Polling hardware")
295        if self.dry_run:
296            if self.config.get("power_supply_id", "") is not None:
297                logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DP711,DP7A252000573,00.01.05")
298                logger.write_info_to_report("Found DP711 Power Supply")
299                self.charger = pseudo_hardware.Charger()
300
301            if self.config.get("load_id", "") is not None:
302                logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DL3031A,DL3D243000300,00.01.05.00.01")
303                logger.write_info_to_report("Found DL3031A Electronic load")
304                self.load = pseudo_hardware.Load()
305
306            if self.config.get("dmm_id", "") is not None:
307                logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00")
308                logger.write_info_to_report("Found DM3068 DMM")
309                self.dmm = pseudo_hardware.Dmm()
310
311            if self.config.get("m300_dmm") is not None:
312                logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00")
313                logger.write_info_to_report("Found DM3068 DMM")
314                self.dmm = pseudo_hardware.M300Dmm(self.config["m300_dmm"])
315
316            for korad in self.config.get("korads", []):
317                logger.write_debug_to_report(f"Resource ID: KORAD, {korad['part_id']}")
318                logger.write_info_to_report("Found Korad")
319                new_korad = pseudo_hardware.Korad(korad["id"])
320                self.korads[new_korad.id] = new_korad
321
322            cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else None
323            if cell_model == "agilent":
324                for cell in self.config.get("cell_simulators", []):
325                    if "type" not in cell:
326                        logger.write_debug_to_report("Resource ID: Cell Sim, 04.08.15.16.23.42")
327                        logger.write_info_to_report("Found Cell Sim")
328                        new_cell = pseudo_hardware.Cell(cell["id"], self.cell_chemistry)
329                        self.cells[new_cell.id] = new_cell
330
331        else:
332            resource_manager = visa.ResourceManager()
333            resources = resource_manager.list_resources()
334            if not resources:
335                raise ResourceNotFoundError("No PyVISA resources found")
336
337            power_supply_id = self.config.get("power_supply_id", "RIGOL TECHNOLOGIES,DP711")
338            load_id = self.config.get("load_id", "RIGOL TECHNOLOGIES,DL3031A")
339            dmm_id = self.config.get("dmm_id", "Rigol Technologies,DM3068")
340            power_dmm_id = self.config.get("power_dmm_id")
341            if "m300_dmm" in self.config:
342                dmm_id = "M300"
343
344            cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else ""
345            if cell_model == "chroma":  # Add chroma to resources
346                address = get_embed("cell_simulators", "address", str)
347                resources = (*resources, (f"TCPIP::{address}::60000::SOCKET", "\n", "\n"))
348            config_cells = [cell for cell in self.config.get("cell_simulators", []) if "type" not in cell]
349
350            for resource_address in resources:
351                safe_resource = SafeResource(resource_address)
352                for baud_rate in (9600, 115200):
353                    safe_resource.baud_rate = baud_rate
354                    if (idn := safe_resource.query("*IDN?")) != safe_resource.default_result:
355                        logger.write_debug_to_report(f"Resource ID: {idn.strip(chr(10))}")
356
357                        if power_supply_id is not None and power_supply_id in idn:  # Power supply
358                            logger.write_info_to_report("Found DP711 Power Supply")
359                            self.charger = Charger(safe_resource)
360
361                        elif load_id is not None and load_id in idn:  # Electronic load
362                            logger.write_info_to_report("Found DL3031A Electronic load")
363                            self.load = Load(safe_resource)
364
365                        elif dmm_id is not None and dmm_id in idn:  # DMM
366                            logger.write_info_to_report("Found DMM")
367                            if "m300_dmm" in self.config:
368                                self.dmm = M300Dmm(safe_resource, self.config["m300_dmm"])
369                            else:
370                                self.dmm = Dmm(safe_resource)
371
372                        elif power_dmm_id is not None and power_dmm_id in idn:  # DMM
373                            logger.write_info_to_report("Found Power DMM")
374                            self.power_dmm = Dmm(safe_resource)
375
376                        elif any((match := korad)["part_id"] in idn for korad in self.config.get("korads", [])):
377                            logger.write_info_to_report("Found KA6003P Power Supply")
378                            new_korad = Korad(match["id"], safe_resource)
379                            self.korads[new_korad.id] = new_korad
380
381                        elif cell_model == "agilent" and any(
382                            (match := cell)["part_id"] in idn for cell in config_cells
383                        ):
384                            logger.write_info_to_report("Found Agilent Cell Sim")
385                            new_cell = Cell(match["id"], safe_resource, self.cell_chemistry)
386                            new_cell.reset()
387                            self.cells[new_cell.id] = new_cell
388
389                        elif cell_model == "ngi" and "NGI,N83624" in idn:
390                            logger.write_info_to_report("Found NGI Cell Sim")
391                            cell_count = get_embed("cell_simulators", "cell_count", int)
392                            for cell_id in range(1, cell_count + 1):
393                                new_cell = NGICell(cell_id, safe_resource, self.cell_chemistry)
394                                self.cells[new_cell.id] = new_cell
395                                if cell_id == cell_count:
396                                    new_cell.reset()
397
398                        elif cell_model == "chroma" and "Chroma,87001" in idn:
399                            logger.write_info_to_report("Found Chroma Cell Sim")
400                            new_cell = None
401                            for cell_id, cell in enumerate(config_cells):
402                                if "cell_slot" in cell:
403                                    new_cell = ChromaCell(
404                                        cell_id, safe_resource, self.cell_chemistry, cell["cell_slot"]
405                                    )
406                                    self.cells[new_cell.id] = new_cell
407                            if new_cell:
408                                new_cell.reset()  # FIXME(JA): does this cause problems for the other side if running?
409
410                        else:
411                            logger.write_warning_to_report("Unknown Resource")
412
413                        break
414
415                    logger.write_warning_to_report(f"Failed to connect to: {resource_address}")
416                    logger.write_debug_to_report("Attempting higher baud rate")
417
418            if power_supply_id is not None and self.charger is None:
419                raise ResourceNotFoundError("Power Supply Not Found")
420
421            if load_id is not None and self.load is None:
422                raise ResourceNotFoundError("Load Simulator Not Found")
423
424            if dmm_id is not None and self.dmm is None:
425                raise ResourceNotFoundError("DMM Not Found")
426
427            if power_dmm_id is not None and self.power_dmm is None:
428                raise ResourceNotFoundError("Power DMM Not Found")
429
430            if len(self.korads) < len(self.config.get("korads", [])):
431                for korad in self.config["korads"]:
432                    if korad["id"] not in self.korads:
433                        raise ResourceNotFoundError(f'Unable to locate Korad {korad["id"]}')
434            self.korads = dict(sorted(self.korads.items()))  # Sort korads by id
435
436            if cell_model == "agilent" and len(self.cells) < len(config_cells):
437                for cell in config_cells:
438                    if cell["id"] not in self.cells:
439                        raise ResourceNotFoundError(f'Unable to locate Cell {cell["id"]} Simulator')
440            if cell_model == "ngi" and len(self.cells) == 0:
441                raise ResourceNotFoundError("Unable to locate NGI Cell Simulator")
442            if cell_model == "chroma" and len(self.cells) == 0:
443                raise ResourceNotFoundError("Unable to locate Chroma Cell Simulator")
444            self.cells = dict(sorted(self.cells.items()))  # Sort cells by id
445
446        # Now go through the ThermoCouples
447        for thermo_couple in self.config.get("thermo_couples", []):
448            new_thermo_couple = ThermoCouple(thermo_couple["id"], thermo_couple["board"], thermo_couple["channel"])
449            self.thermo_couples[new_thermo_couple.thermocouple_id] = new_thermo_couple
450
451        logger.write_debug_to_report("Setting up SIGCHLD Handler")  # Must be set up afterward due to VISA raising it
452        signal.signal(signal.SIGCHLD, SafeShutdown.handler)
453
454    # FIXME(JA): move into its own module, use exceptions
455
456    @property
457    def temperature(self) -> float:
458        """Temperature measurement used by run cycles."""
459        return float(self.thermo_couples[1].temperature) if 1 in self.thermo_couples else self.plateset.temperature
460
461    def run_li_charge_cycle(self, digits=6):
462        """Run a Li charge cycle."""
463        if self.voltage == 0:
464            raise ValueLogError("Charging Voltage Invalid")
465
466        if self.ov_protection == 0:
467            raise ValueLogError("Over-Voltage Protection Invalid")
468
469        if self.current == 0:
470            raise ValueLogError("Charge Current Invalid")
471
472        if self.termination_current == 0:
473            raise ValueLogError("Termination Current Invalid")
474
475        if self.max_time == 0:
476            raise ValueLogError("Max Time Invalid")
477
478        if self.sample_interval == 0:
479            raise ValueLogError("Charging Sample Time Invalid")
480
481        if self.minimum_readings == 0:
482            raise ValueLogError("Minimum Readings Invalid")
483
484        self.percent_discharge = -1  # For capacity calculations
485
486        # Create a queue to hold the previous n readings. This is used to avoid erroneous errors.
487        current_measurement_queue = deque(maxlen=self.minimum_readings)
488
489        logger.write_info_to_report(
490            f"Starting charge cycle (CV: {self.voltage}V, OV: {self.ov_protection}V, "
491            f"CI: {self.current}A, TI: {self.termination_current}A, MT: {self.max_time}s, "
492            f"SI: {self.sample_interval}s, MR: {self.minimum_readings}"
493            ")"
494        )
495
496        with self.charger(self.voltage, self.current):
497
498            # Starting values for the while loop
499            latest_i = self.termination_current + 0.1  # placeholder for the latest charging current measurement
500            current_measurement_queue.append(latest_i)
501            amp_hrs = 0  # amp hours
502            latest_v = 0  # placeholder for the latest charging voltage measurement
503            elapsed_time = 0
504            temp_c = 0
505
506            self.timer.reset()  # Keep track of runtime
507
508            # charge until termination current is reached or ov_protection or max_time
509            while any(i >= self.termination_current for i in current_measurement_queue):
510                if latest_v >= self.ov_protection:
511                    self.charger.disable()
512                    raise OverVoltageError(
513                        f"Overvoltage protection triggered at {time.strftime('%x %X')}. "
514                        f"Voltage {latest_v} is higher than {self.ov_protection}."
515                    )
516                if elapsed_time >= self.max_time:
517                    self.charger.disable()
518                    raise TimeoutExceededError(
519                        f"Termination current of {self.termination_current}A was "
520                        f"not reached after {self.max_time} seconds"
521                    )
522                if temp_c >= self.overtemp_c:
523                    self.charger.disable()
524                    raise OverTemperatureError(
525                        f"Overtemperature protection triggered at {time.strftime('%x %X')}. "
526                        f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C."
527                    )
528
529                UserInterrupt.check_user_interrupt(self.charger)  # Pause if necessary, turning off charger relay
530
531                t1 = elapsed_time  # previous elapsed time for calculating delta t
532                elapsed_time = round(self.timer.elapsed_time, digits)
533                dt = round(elapsed_time - t1, digits)  # delta t
534
535                latest_v = self.dmm.volts
536
537                i1 = latest_i  # previous current reading
538                latest_i = self.charger.amps
539                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
540                current_measurement_queue.append(latest_i)
541
542                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # milliamp hour calculation
543                temp_c = round(self.temperature, digits)  # grab the temperature
544
545                logger.write_info_to_report(
546                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, "
547                    f"Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}"
548                )
549
550                self.csv.cycle.record(elapsed_time, latest_i, amp_hrs)
551                time.sleep(self.sample_interval)
552
553        self.remaining_capacity_percentage = 100
554
555    def run_nicd_charge_cycle(self, digits=6):
556        """Run a NiCad charge cycle."""
557
558        if self.nicd_cell_count == 0:
559            raise ValueLogError("Cell Count Invalid")
560
561        if self.max_time == 0:
562            raise ValueLogError("Charge Time Invalid")
563
564        if self.sample_interval == 0:
565            raise ValueLogError("Charge Sample Interval Invalid")
566
567        if self.current == 0:
568            raise ValueLogError("Charge Current Invalid")
569
570        if self.nicd_charge_type is NiCdChargeCycle.UNDEFINED:
571            raise ValueLogError("Charge Type Invalid")
572
573        self.percent_discharge = -1  # For capacity calculations
574        self.ov_protection = min(30.0, self.nicd_cell_count * 1.95)
575
576        logger.write_info_to_report(
577            f"Starting {self.nicd_charge_type.name.lower()} NiCd charge cycle ("
578            f"CV: {self.ov_protection}V, OV: {self.ov_protection}V, CI: {self.current}A, MT: {self.max_time}s, "
579            f"SI: {self.sample_interval}s)"
580        )
581
582        self.charger.set_profile(self.ov_protection, self.current)
583        self.charger.enable()  # Enables the output of the charger
584
585        # initialize charge control variables
586        latest_i = 0
587        amp_hrs = 0
588        latest_v = 0
589        elapsed_time = 0
590        max_v = 0
591        first_cycle = True
592
593        self.timer.reset()  # Keep track of runtime
594
595        if self.nicd_charge_type in (NiCdChargeCycle.STANDARD, NiCdChargeCycle.CUSTOM):
596            while latest_v < self.ov_protection and elapsed_time < self.max_time:
597                UserInterrupt.check_user_interrupt(self.charger)  # Pause if necessary, turning off charger relay
598                t1 = elapsed_time
599                elapsed_time = round(self.timer.elapsed_time, digits)
600                dt = round(elapsed_time - t1, digits)
601
602                # This section allows for fairly precise sample interval
603                if dt < self.sample_interval and not first_cycle:
604                    while dt < self.sample_interval:
605                        elapsed_time = round(self.timer.elapsed_time, digits)
606                        dt = round(elapsed_time - t1, digits)  # delta t
607                        time.sleep(0.0005)
608
609                latest_v = self.dmm.volts
610
611                # keep track of the highest cell voltage seen
612                max_v = max(max_v, latest_v)
613
614                i1 = latest_i  # previous current reading
615                latest_i = self.charger.amps  # current reading
616
617                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
618                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
619                temp_c = round(self.temperature, digits)  # grab the temperature
620
621                logger.write_info_to_report(
622                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, "
623                    f"Ah: {amp_hrs}, Temp: {temp_c}"
624                )
625                self.csv.cycle.record(elapsed_time, latest_i, amp_hrs)
626                first_cycle = False
627
628        elif self.nicd_charge_type == NiCdChargeCycle.DV_DT:
629            dv = 0.005 * self.nicd_cell_count
630            while (max_v - latest_v) < dv and latest_v < self.ov_protection and elapsed_time < self.max_time:
631                UserInterrupt.check_user_interrupt(self.charger)  # Pause if necessary, turning off charger relay
632                t1 = elapsed_time  # previous elapsed time for calculating delta t
633                elapsed_time = round(self.timer.elapsed_time, digits)
634                dt = round(elapsed_time - t1, digits)  # delta t
635
636                # This section allows for fairly precise sample interval
637                if dt < self.sample_interval and not first_cycle:
638                    while dt < self.sample_interval:
639                        elapsed_time = round(self.timer.elapsed_time, digits)
640                        dt = round(elapsed_time - t1, digits)  # delta t
641                        time.sleep(0.0005)
642
643                latest_v = self.dmm.volts
644                max_v = max(max_v, latest_v)  # keep track of highest cell voltage seen
645                i1 = latest_i  # previous current reading
646                latest_i = self.charger.amps  # current reading
647                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
648                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
649                temp_c = round(self.temperature, digits)  # grab the temperature
650
651                logger.write_info_to_report(
652                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, "
653                    f"Ah: {amp_hrs}, Temp: {temp_c}"
654                )
655                first_cycle = False
656
657        self.charger.disable()  # Charging is complete, turn off the charger
658        self.remaining_capacity_percentage = 100
659
660        logger.write_info_to_report(f"Charge Cycle complete, Ah: {amp_hrs}")
661        return amp_hrs
662
663    def run_discharge_step_cycle(self, digits=6):
664        """Run a discharge step cycle."""
665        if self.max_time == 0:
666            raise ValueLogError("Max Time Invalid")
667
668        if self.current == 0:
669            raise ValueLogError("Discharge rate Invalid")
670
671        if self.sample_interval == 0:
672            raise ValueLogError("Sample Interval Invalid")
673
674        if self.uv_protection == 0:
675            raise ValueLogError("Undervoltage Protection Invalid")
676
677        if self.percent_discharge == 0:
678            raise ValueLogError("Percent Discharge Invalid")
679
680        if self.total_ah == 0:
681            raise ValueLogError("Total Ah Invalid")
682
683        delta_ah = self.total_ah * self.percent_discharge
684        # discharge_time = ((self.total_ah * self.percent_discharge) / self.discharge_i) * 3600
685        logger.write_info_to_report(
686            f"Starting discharge step cycle ("
687            f"DA: {delta_ah}Ah, UV: {self.uv_protection}V, DI: {self.current}A, "
688            f"PD: {self.percent_discharge * 100}%, TA: {self.total_ah}Ah, SI: {self.sample_interval}s, "
689            f"RC: {self.remaining_capacity_percentage}, MT: {self.max_time}, DV: {self.discharge_until_undervoltage})"
690        )
691
692        self.load.mode_cc()
693        if self.current > 6:
694            self.load.amps_range = 60  # if current is > 6A set to high range
695        self.load.amps = self.current
696        self.load.enable()
697
698        # Place holder values
699        latest_i = 0
700        amp_hrs = 0  # mA hours
701        latest_v = self.uv_protection + 0.01
702        elapsed_time = 0
703        first_cycle = True
704        temp_c = 0
705        # resistance = 0
706
707        self.timer.reset()  # Keep track of runtime
708
709        # discharge until Ah reached or uv_protection or max_time
710        while abs(amp_hrs) < delta_ah or self.discharge_until_undervoltage:
711            if latest_v <= self.uv_protection:
712                exception = UnderVoltageError(
713                    f"Undervoltage protection triggered at {time.strftime('%x %X')}. "
714                    f"Voltage {latest_v} is lower than {self.uv_protection}"
715                )
716                if self.discharge_until_undervoltage:
717                    logger.write_info_to_report(f"{type(exception).__name__} - Continuing test")
718                    break
719                raise exception
720
721            if elapsed_time >= self.max_time:
722                raise TimeoutExceededError(f"Delta Ah of {delta_ah} was not reached after {self.max_time} seconds")
723
724            if temp_c >= self.overtemp_c:
725                raise OverTemperatureError(
726                    f"Overtemperature protection triggered at {time.strftime('%x %X')}. "
727                    f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C."
728                )
729
730            t1 = elapsed_time  # previous elapsed time for calculating delta t
731            elapsed_time = round(self.timer.elapsed_time, digits)
732            dt = round(elapsed_time - t1, digits)  # delta t
733
734            # This section allows for fairly precise sample interval
735            if dt < self.sample_interval and not first_cycle:
736                while dt < self.sample_interval:
737                    elapsed_time = round(self.timer.elapsed_time, digits)
738                    dt = round(elapsed_time - t1, digits)  # delta t
739                    time.sleep(0.0005)
740
741            latest_v = self.dmm.volts
742            # load_v = self.load.volts  # for measuring wiring harness resistance
743
744            i1 = latest_i  # previous current reading
745            # load_ohms = self.load.ohms
746            latest_i = round(-1 * self.load.amps, digits)
747
748            # if not first_cycle:  # skip the first cycle so that there are no divide by 0's
749            #     # wire harness resistance
750            #     resistance = round((latest_v - load_v) / (-1 * latest_i) + load_ohms, digits)
751
752            i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
753            amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
754            temp_c = round(self.temperature, digits)  # grab the temperature
755
756            logger.write_info_to_report(
757                f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}"
758            )
759            self.csv.cycle.record(elapsed_time, latest_i, amp_hrs)
760            first_cycle = False
761
762        self.load.disable()
763        self.remaining_capacity_percentage = round(
764            self.remaining_capacity_percentage - self.percent_discharge * 100, digits
765        )
766
767    def run_resting_cycle(self):
768        """Run a resting cycle"""
769        # Guard against calls before init
770        assert self.dmm is not None and self.csv is not None
771
772        if self.max_time == 0:
773            raise ValueLogError("Resting time Invalid")
774
775        if self.sample_interval == 0:
776            raise ValueLogError("Resting sample interval Invalid")
777
778        if self.remaining_capacity_percentage < 0:
779            raise ValueLogError("Remaining capacity percentage Invalid")
780
781        # Place holder values
782        self.percent_discharge = 0  # For capacity calculations
783        elapsed_time = 0.0
784        first_cycle = True
785
786        self.timer.reset()  # Keep track of runtime
787
788        logger.write_info_to_report(f"Starting rest cycle (RT: {self.max_time}, SI:{self.sample_interval})")
789
790        while elapsed_time < self.max_time:
791            UserInterrupt.check_user_interrupt()
792
793            t1 = elapsed_time  # previous elapsed time for calculating delta t
794            elapsed_time = self.timer.elapsed_time
795            dt = elapsed_time - t1  # delta t
796
797            # This section allows for fairly precise sample interval
798            if dt < self.sample_interval and not first_cycle:
799                while dt < self.sample_interval:
800                    elapsed_time = self.timer.elapsed_time
801                    dt = elapsed_time - t1  # delta t
802                    time.sleep(0.0005)
803
804            latest_v = self.dmm.volts
805            temp_c = self.temperature  # grab the temperature
806
807            logger.write_info_to_report(f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Temp: {temp_c}")
808            self.csv.cycle.record(elapsed_time)
809            first_cycle = False
810
811    def run_discharge_cycle(self, digits=6):
812        """Run a discharge cycle."""
813
814        if self.max_time == 0:
815            raise ValueLogError("Max Time Invalid")
816
817        if self.sample_interval == 0:
818            raise ValueLogError("Discharge Sample Interval Invalid")
819
820        if self.discharge_type == DischargeType.UNDEFINED:
821            raise ValueLogError("Discharge Mode Invalid")
822
823        if self.voltage == 0:
824            raise ValueLogError("Discharge Voltage Invalid")
825
826        if self.uv_protection == 0:
827            raise ValueLogError("Under Voltage Protection Invalid")
828
829        if self.discharge_type == DischargeType.CONSTANT_CURRENT:
830            # we're in current discharge mode, so make sure there is a valid current
831            if self.current == 0:
832                raise ValueLogError("Discharge Current mode selected, but Current is invalid")
833
834        elif self.discharge_type == DischargeType.CONSTANT_RESISTANCE:
835            # we're in resistance discharge mode so make sure there is a valid resistance.
836            if self.resistance == 0:
837                raise ValueLogError("Discharge Resistance mode selected, but Resistance is invalid")
838
839        logger.write_info_to_report(
840            f"Starting discharge cycle ("
841            f"DV: {self.voltage}V, UV: {self.uv_protection}V, DI: {self.current}A, DR: {self.resistance}Ohms, "
842            f"DM: {self.discharge_type}, MT: {self.max_time}s, SI: {self.sample_interval}s"
843            ")"
844        )
845
846        with self.load(self.current, self.discharge_type, self.resistance):
847
848            self.percent_discharge = 1  # For capacity calculations
849
850            # Place holder values
851            latest_i = 0
852            resistance = 0
853            amp_hrs = 0  # Amp hours
854            latest_v = self.voltage + 0.01
855            elapsed_time = 0
856            first_run = True
857            temp_c = 0
858
859            self.timer.reset()  # Keep track of runtime
860            kill_event = FileEvent("kill_discharge")
861            discharge_event = FileEvent("nicd_discharging")
862            discharge_event.set()
863
864            # discharge until discharge voltage is reached or uv_protection or max_time
865            while not (self.uv_protection < latest_v <= self.voltage or kill_event.is_set()):
866                if latest_v <= self.uv_protection:
867                    self.load.disable()
868                    raise UnderVoltageError(
869                        f"Undervoltage protection triggered at {time.strftime('%x %X')}. "
870                        f"Voltage {latest_v} is lower than {self.uv_protection}."
871                    )
872                if elapsed_time >= self.max_time:
873                    self.load.disable()
874                    raise TimeoutExceededError(
875                        f"Voltage of {self.voltage} was not reached after {self.max_time} seconds"
876                    )
877                if temp_c >= self.overtemp_c:
878                    self.load.disable()
879                    raise OverTemperatureError(
880                        f"Overtemperature protection triggered at {time.strftime('%x %X')}. "
881                        f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C."
882                    )
883
884                UserInterrupt.check_user_interrupt(self.load)  # Pause if necessary, turning off load relay
885
886                t1 = elapsed_time  # previous elapsed time for calculating delta t
887                elapsed_time = round(self.timer.elapsed_time, digits)
888                dt = round(elapsed_time - t1, digits)  # delta t
889
890                # This section allows for fairly precise sample interval
891                if dt < self.sample_interval and not first_run:
892                    while dt < self.sample_interval:
893                        elapsed_time = round(self.timer.elapsed_time, digits)
894                        dt = round(elapsed_time - t1, digits)  # delta t
895                        time.sleep(0.0005)
896
897                latest_v = self.dmm.volts
898                i1 = latest_i  # previous current reading
899                load_ohms = self.load.ohms
900                latest_i = round(-1 * self.load.amps, digits)
901
902                # skip the first cycle so that there are no divide by 0's
903                if not first_run and self.discharge_type == DischargeType.CONSTANT_RESISTANCE:
904                    load_v = self.load.volts  # for measuring wiring harness resistance
905                    try:
906                        resistance = (latest_v - load_v) / (-1 * latest_i) + load_ohms  # wire harness resistance
907                    except ZeroDivisionError:
908                        logger.write_error_to_report(
909                            "Zero division when calculating resistance. "
910                            f"Load Voltage: {load_v}, Current: {latest_i}, Resistance: {load_ohms}."
911                        )
912
913                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
914                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
915                temp_c = round(self.temperature, digits)  # grab the temperature
916
917                logger.write_info_to_report(
918                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, "
919                    f"Resistance: {resistance}, Temp: {temp_c}"
920                )
921                self.csv.cycle.record(elapsed_time, latest_i, amp_hrs, resistance)
922                first_run = False
923
924        self.remaining_capacity_percentage = 0
925        return abs(amp_hrs)  # Return a positive Ah
926
927    def delay(self, seconds: int | float):
928        """Sleep that counts towards elapsed time."""
929        time.sleep(seconds)
930
931    def pause_test(self, seconds_to_pause: float = 0, *relays: ChargerOrLoad):
932        """Pause the test for some amount of seconds."""
933        for relay in relays:
934            relay.disable()
935        if seconds_to_pause == 0:  # Wait for user input to continue
936            logger.write_info_to_report("Pausing: waiting for user input")
937            statuscode = 0xFFFF
938            while not os.WIFEXITED(statuscode):  # process terminated normally?
939                statuscode = os.system(
940                    '/bin/bash -c \'select CHOICE in "Type CONTINUE to resume test"; do '
941                    'if [ "$REPLY" = "CONTINUE" ]; then break; fi; done\''
942                )
943        else:
944            logger.write_info_to_report(f"Pausing test (S: {seconds_to_pause})")
945            time.sleep(seconds_to_pause)
946        for relay in relays:
947            relay.enable()

The main API for accessing all hardware.

BMSHardware(flags: hitl_tester.modules.bms_types.BMSFlags | None = None)
192    def __init__(self, flags: BMSFlags | None = None):  # TODO(JA): document attributes
193        """Initialize the BMS hardware."""
194        if flags is None:
195            return
196
197        # Battery attributes
198        self.remaining_capacity_percentage = 0.0
199        self.battery_type: BatteryType = BatteryType.UNDEFINED  # Initialize to unknown
200        self.max_time = 0.0
201        self.sample_interval = 0.0
202        self.voltage = 0.0
203        self.current = 0.0
204        self.current_limit = 10.0
205        self.overtemp_c = OVERTEMP_C
206
207        # Charging attributes
208        self.minimum_readings = 0  # How many readings to take before passing (for erroneous readings).
209        self.nicd_charge_type: NiCdChargeCycle = NiCdChargeCycle.UNDEFINED
210        self.nicd_cell_count = 0
211        self.termination_current = 0.0
212        self.ov_protection = 0.0
213
214        # Discharging attributes
215        self.resistance = 0.0
216        self.discharge_type: DischargeType = DischargeType.UNDEFINED
217        self.discharge_until_undervoltage = False
218        self.percent_discharge = 0.0
219        self.total_ah = 0.0
220        self.uv_protection = 0.0
221
222        # Hardware attributes
223        self.charger: Charger | None = None
224        self.load: Load | None = None
225        self.dmm: Dmm | M300Dmm | None = None
226        self.power_dmm: Dmm | None = None
227        self.korads: dict[int, Korad] = {}
228        self.thermal_chamber = pseudo_hardware.ThermalChamber() if flags.dry_run else ThermalChamber()
229        self.plateset = Plateset()
230        self.plateset_id = flags.plateset_id
231        self.thermo_couples: dict[int, ThermoCouple] = {}
232        self.cells: dict[int, Cell | NGICell | ChromaCell] = {}
233        self.cell_chemistry = flags.cell_chemistry
234        self.dry_run = flags.dry_run
235
236        # Track total test time
237        self.timer = StopWatch()
238
239        # Logging / Documentation attributes
240        self.config = flags.config
241        self.doc_generation = flags.doc_generation
242        self.report_filename = flags.report_filename
243        self.csv: CSVRecorders = CSVRecorders(cast(hitl_tester.modules.bms.bms_hw.BMSHardware, self))  # Must occur last
244
245        # Modify globals of caller
246        assert isinstance(current_frame := inspect.currentframe(), FrameType)
247        assert isinstance(caller := current_frame.f_back, FrameType)
248        if flags.properties:
249            for global_var, value in flags.properties.items():
250                caller.f_globals[global_var] = value  # Globals must occur before init is called

Initialize the BMS hardware.

instance: ClassVar[Optional[Self]] = <BMSHardware object>
initialized: bool = False
remaining_capacity_percentage
max_time
sample_interval
voltage
current
current_limit
overtemp_c
minimum_readings
nicd_cell_count
termination_current
ov_protection
resistance
discharge_until_undervoltage
percent_discharge
total_ah
uv_protection
thermal_chamber
plateset
plateset_id
cell_chemistry
dry_run
timer
config
doc_generation
report_filename
def init(self):
252    def init(self):
253        """Initialize the BMSHardware object."""
254        if self.initialized:
255            return
256        self.initialized = True
257
258        def get_embed(header: str, key: str, result_type: type) -> Any:
259            """get embedded value from config (i.e. value in dict in list)."""
260            default_item: list[str | dict[str, Any]] = []
261            items = self.config.get(header, default_item)
262            if isinstance(items, list):
263                for item in items:
264                    if isinstance(item, dict) and (result := item.get(key)):
265                        try:
266                            return result_type(result)
267                        except (ValueError, TypeError) as exc:
268                            raise RuntimeError(
269                                f"Expected {result_type.__name__} type, got {type(result).__name__} type"
270                            ) from exc
271            raise RuntimeError(f"Could not find {key} key in {header}")
272
273        if self.doc_generation:  # Skip if generating docs
274            return
275
276        # Make sure the charger and the load switches are OFF before doing any testing
277        logger.write_info_to_report("Turning off charger and load switch")
278        self.plateset.charger_switch = False
279        self.plateset.load_switch = False
280
281        # setup some signals to a signal handler so that we can gracefully
282        # shutdown in the event that test is somehow interrupted
283        logger.write_debug_to_report("Setting up Safe Shutdown Handlers")
284        for signal_flag in SafeShutdown.SIGNALS:
285            signal.signal(signal_flag, SafeShutdown.handler)
286
287        # Set up signal terminal stop handler (CTRL-Z)
288        UserInterrupt.init()
289        signal.signal(signal.SIGTSTP, UserInterrupt.handler)
290
291        # FIXME(JA): device classes should manage their own configs/initializations
292        # FIXME(JA): don't assume config is correct
293
294        logger.write_info_to_report("Polling hardware")
295        if self.dry_run:
296            if self.config.get("power_supply_id", "") is not None:
297                logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DP711,DP7A252000573,00.01.05")
298                logger.write_info_to_report("Found DP711 Power Supply")
299                self.charger = pseudo_hardware.Charger()
300
301            if self.config.get("load_id", "") is not None:
302                logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DL3031A,DL3D243000300,00.01.05.00.01")
303                logger.write_info_to_report("Found DL3031A Electronic load")
304                self.load = pseudo_hardware.Load()
305
306            if self.config.get("dmm_id", "") is not None:
307                logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00")
308                logger.write_info_to_report("Found DM3068 DMM")
309                self.dmm = pseudo_hardware.Dmm()
310
311            if self.config.get("m300_dmm") is not None:
312                logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00")
313                logger.write_info_to_report("Found DM3068 DMM")
314                self.dmm = pseudo_hardware.M300Dmm(self.config["m300_dmm"])
315
316            for korad in self.config.get("korads", []):
317                logger.write_debug_to_report(f"Resource ID: KORAD, {korad['part_id']}")
318                logger.write_info_to_report("Found Korad")
319                new_korad = pseudo_hardware.Korad(korad["id"])
320                self.korads[new_korad.id] = new_korad
321
322            cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else None
323            if cell_model == "agilent":
324                for cell in self.config.get("cell_simulators", []):
325                    if "type" not in cell:
326                        logger.write_debug_to_report("Resource ID: Cell Sim, 04.08.15.16.23.42")
327                        logger.write_info_to_report("Found Cell Sim")
328                        new_cell = pseudo_hardware.Cell(cell["id"], self.cell_chemistry)
329                        self.cells[new_cell.id] = new_cell
330
331        else:
332            resource_manager = visa.ResourceManager()
333            resources = resource_manager.list_resources()
334            if not resources:
335                raise ResourceNotFoundError("No PyVISA resources found")
336
337            power_supply_id = self.config.get("power_supply_id", "RIGOL TECHNOLOGIES,DP711")
338            load_id = self.config.get("load_id", "RIGOL TECHNOLOGIES,DL3031A")
339            dmm_id = self.config.get("dmm_id", "Rigol Technologies,DM3068")
340            power_dmm_id = self.config.get("power_dmm_id")
341            if "m300_dmm" in self.config:
342                dmm_id = "M300"
343
344            cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else ""
345            if cell_model == "chroma":  # Add chroma to resources
346                address = get_embed("cell_simulators", "address", str)
347                resources = (*resources, (f"TCPIP::{address}::60000::SOCKET", "\n", "\n"))
348            config_cells = [cell for cell in self.config.get("cell_simulators", []) if "type" not in cell]
349
350            for resource_address in resources:
351                safe_resource = SafeResource(resource_address)
352                for baud_rate in (9600, 115200):
353                    safe_resource.baud_rate = baud_rate
354                    if (idn := safe_resource.query("*IDN?")) != safe_resource.default_result:
355                        logger.write_debug_to_report(f"Resource ID: {idn.strip(chr(10))}")
356
357                        if power_supply_id is not None and power_supply_id in idn:  # Power supply
358                            logger.write_info_to_report("Found DP711 Power Supply")
359                            self.charger = Charger(safe_resource)
360
361                        elif load_id is not None and load_id in idn:  # Electronic load
362                            logger.write_info_to_report("Found DL3031A Electronic load")
363                            self.load = Load(safe_resource)
364
365                        elif dmm_id is not None and dmm_id in idn:  # DMM
366                            logger.write_info_to_report("Found DMM")
367                            if "m300_dmm" in self.config:
368                                self.dmm = M300Dmm(safe_resource, self.config["m300_dmm"])
369                            else:
370                                self.dmm = Dmm(safe_resource)
371
372                        elif power_dmm_id is not None and power_dmm_id in idn:  # DMM
373                            logger.write_info_to_report("Found Power DMM")
374                            self.power_dmm = Dmm(safe_resource)
375
376                        elif any((match := korad)["part_id"] in idn for korad in self.config.get("korads", [])):
377                            logger.write_info_to_report("Found KA6003P Power Supply")
378                            new_korad = Korad(match["id"], safe_resource)
379                            self.korads[new_korad.id] = new_korad
380
381                        elif cell_model == "agilent" and any(
382                            (match := cell)["part_id"] in idn for cell in config_cells
383                        ):
384                            logger.write_info_to_report("Found Agilent Cell Sim")
385                            new_cell = Cell(match["id"], safe_resource, self.cell_chemistry)
386                            new_cell.reset()
387                            self.cells[new_cell.id] = new_cell
388
389                        elif cell_model == "ngi" and "NGI,N83624" in idn:
390                            logger.write_info_to_report("Found NGI Cell Sim")
391                            cell_count = get_embed("cell_simulators", "cell_count", int)
392                            for cell_id in range(1, cell_count + 1):
393                                new_cell = NGICell(cell_id, safe_resource, self.cell_chemistry)
394                                self.cells[new_cell.id] = new_cell
395                                if cell_id == cell_count:
396                                    new_cell.reset()
397
398                        elif cell_model == "chroma" and "Chroma,87001" in idn:
399                            logger.write_info_to_report("Found Chroma Cell Sim")
400                            new_cell = None
401                            for cell_id, cell in enumerate(config_cells):
402                                if "cell_slot" in cell:
403                                    new_cell = ChromaCell(
404                                        cell_id, safe_resource, self.cell_chemistry, cell["cell_slot"]
405                                    )
406                                    self.cells[new_cell.id] = new_cell
407                            if new_cell:
408                                new_cell.reset()  # FIXME(JA): does this cause problems for the other side if running?
409
410                        else:
411                            logger.write_warning_to_report("Unknown Resource")
412
413                        break
414
415                    logger.write_warning_to_report(f"Failed to connect to: {resource_address}")
416                    logger.write_debug_to_report("Attempting higher baud rate")
417
418            if power_supply_id is not None and self.charger is None:
419                raise ResourceNotFoundError("Power Supply Not Found")
420
421            if load_id is not None and self.load is None:
422                raise ResourceNotFoundError("Load Simulator Not Found")
423
424            if dmm_id is not None and self.dmm is None:
425                raise ResourceNotFoundError("DMM Not Found")
426
427            if power_dmm_id is not None and self.power_dmm is None:
428                raise ResourceNotFoundError("Power DMM Not Found")
429
430            if len(self.korads) < len(self.config.get("korads", [])):
431                for korad in self.config["korads"]:
432                    if korad["id"] not in self.korads:
433                        raise ResourceNotFoundError(f'Unable to locate Korad {korad["id"]}')
434            self.korads = dict(sorted(self.korads.items()))  # Sort korads by id
435
436            if cell_model == "agilent" and len(self.cells) < len(config_cells):
437                for cell in config_cells:
438                    if cell["id"] not in self.cells:
439                        raise ResourceNotFoundError(f'Unable to locate Cell {cell["id"]} Simulator')
440            if cell_model == "ngi" and len(self.cells) == 0:
441                raise ResourceNotFoundError("Unable to locate NGI Cell Simulator")
442            if cell_model == "chroma" and len(self.cells) == 0:
443                raise ResourceNotFoundError("Unable to locate Chroma Cell Simulator")
444            self.cells = dict(sorted(self.cells.items()))  # Sort cells by id
445
446        # Now go through the ThermoCouples
447        for thermo_couple in self.config.get("thermo_couples", []):
448            new_thermo_couple = ThermoCouple(thermo_couple["id"], thermo_couple["board"], thermo_couple["channel"])
449            self.thermo_couples[new_thermo_couple.thermocouple_id] = new_thermo_couple
450
451        logger.write_debug_to_report("Setting up SIGCHLD Handler")  # Must be set up afterward due to VISA raising it
452        signal.signal(signal.SIGCHLD, SafeShutdown.handler)

Initialize the BMSHardware object.

temperature: float
456    @property
457    def temperature(self) -> float:
458        """Temperature measurement used by run cycles."""
459        return float(self.thermo_couples[1].temperature) if 1 in self.thermo_couples else self.plateset.temperature

Temperature measurement used by run cycles.

def run_li_charge_cycle(self, digits=6):
461    def run_li_charge_cycle(self, digits=6):
462        """Run a Li charge cycle."""
463        if self.voltage == 0:
464            raise ValueLogError("Charging Voltage Invalid")
465
466        if self.ov_protection == 0:
467            raise ValueLogError("Over-Voltage Protection Invalid")
468
469        if self.current == 0:
470            raise ValueLogError("Charge Current Invalid")
471
472        if self.termination_current == 0:
473            raise ValueLogError("Termination Current Invalid")
474
475        if self.max_time == 0:
476            raise ValueLogError("Max Time Invalid")
477
478        if self.sample_interval == 0:
479            raise ValueLogError("Charging Sample Time Invalid")
480
481        if self.minimum_readings == 0:
482            raise ValueLogError("Minimum Readings Invalid")
483
484        self.percent_discharge = -1  # For capacity calculations
485
486        # Create a queue to hold the previous n readings. This is used to avoid erroneous errors.
487        current_measurement_queue = deque(maxlen=self.minimum_readings)
488
489        logger.write_info_to_report(
490            f"Starting charge cycle (CV: {self.voltage}V, OV: {self.ov_protection}V, "
491            f"CI: {self.current}A, TI: {self.termination_current}A, MT: {self.max_time}s, "
492            f"SI: {self.sample_interval}s, MR: {self.minimum_readings}"
493            ")"
494        )
495
496        with self.charger(self.voltage, self.current):
497
498            # Starting values for the while loop
499            latest_i = self.termination_current + 0.1  # placeholder for the latest charging current measurement
500            current_measurement_queue.append(latest_i)
501            amp_hrs = 0  # amp hours
502            latest_v = 0  # placeholder for the latest charging voltage measurement
503            elapsed_time = 0
504            temp_c = 0
505
506            self.timer.reset()  # Keep track of runtime
507
508            # charge until termination current is reached or ov_protection or max_time
509            while any(i >= self.termination_current for i in current_measurement_queue):
510                if latest_v >= self.ov_protection:
511                    self.charger.disable()
512                    raise OverVoltageError(
513                        f"Overvoltage protection triggered at {time.strftime('%x %X')}. "
514                        f"Voltage {latest_v} is higher than {self.ov_protection}."
515                    )
516                if elapsed_time >= self.max_time:
517                    self.charger.disable()
518                    raise TimeoutExceededError(
519                        f"Termination current of {self.termination_current}A was "
520                        f"not reached after {self.max_time} seconds"
521                    )
522                if temp_c >= self.overtemp_c:
523                    self.charger.disable()
524                    raise OverTemperatureError(
525                        f"Overtemperature protection triggered at {time.strftime('%x %X')}. "
526                        f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C."
527                    )
528
529                UserInterrupt.check_user_interrupt(self.charger)  # Pause if necessary, turning off charger relay
530
531                t1 = elapsed_time  # previous elapsed time for calculating delta t
532                elapsed_time = round(self.timer.elapsed_time, digits)
533                dt = round(elapsed_time - t1, digits)  # delta t
534
535                latest_v = self.dmm.volts
536
537                i1 = latest_i  # previous current reading
538                latest_i = self.charger.amps
539                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
540                current_measurement_queue.append(latest_i)
541
542                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # milliamp hour calculation
543                temp_c = round(self.temperature, digits)  # grab the temperature
544
545                logger.write_info_to_report(
546                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, "
547                    f"Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}"
548                )
549
550                self.csv.cycle.record(elapsed_time, latest_i, amp_hrs)
551                time.sleep(self.sample_interval)
552
553        self.remaining_capacity_percentage = 100

Run a Li charge cycle.

def run_nicd_charge_cycle(self, digits=6):
555    def run_nicd_charge_cycle(self, digits=6):
556        """Run a NiCad charge cycle."""
557
558        if self.nicd_cell_count == 0:
559            raise ValueLogError("Cell Count Invalid")
560
561        if self.max_time == 0:
562            raise ValueLogError("Charge Time Invalid")
563
564        if self.sample_interval == 0:
565            raise ValueLogError("Charge Sample Interval Invalid")
566
567        if self.current == 0:
568            raise ValueLogError("Charge Current Invalid")
569
570        if self.nicd_charge_type is NiCdChargeCycle.UNDEFINED:
571            raise ValueLogError("Charge Type Invalid")
572
573        self.percent_discharge = -1  # For capacity calculations
574        self.ov_protection = min(30.0, self.nicd_cell_count * 1.95)
575
576        logger.write_info_to_report(
577            f"Starting {self.nicd_charge_type.name.lower()} NiCd charge cycle ("
578            f"CV: {self.ov_protection}V, OV: {self.ov_protection}V, CI: {self.current}A, MT: {self.max_time}s, "
579            f"SI: {self.sample_interval}s)"
580        )
581
582        self.charger.set_profile(self.ov_protection, self.current)
583        self.charger.enable()  # Enables the output of the charger
584
585        # initialize charge control variables
586        latest_i = 0
587        amp_hrs = 0
588        latest_v = 0
589        elapsed_time = 0
590        max_v = 0
591        first_cycle = True
592
593        self.timer.reset()  # Keep track of runtime
594
595        if self.nicd_charge_type in (NiCdChargeCycle.STANDARD, NiCdChargeCycle.CUSTOM):
596            while latest_v < self.ov_protection and elapsed_time < self.max_time:
597                UserInterrupt.check_user_interrupt(self.charger)  # Pause if necessary, turning off charger relay
598                t1 = elapsed_time
599                elapsed_time = round(self.timer.elapsed_time, digits)
600                dt = round(elapsed_time - t1, digits)
601
602                # This section allows for fairly precise sample interval
603                if dt < self.sample_interval and not first_cycle:
604                    while dt < self.sample_interval:
605                        elapsed_time = round(self.timer.elapsed_time, digits)
606                        dt = round(elapsed_time - t1, digits)  # delta t
607                        time.sleep(0.0005)
608
609                latest_v = self.dmm.volts
610
611                # keep track of the highest cell voltage seen
612                max_v = max(max_v, latest_v)
613
614                i1 = latest_i  # previous current reading
615                latest_i = self.charger.amps  # current reading
616
617                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
618                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
619                temp_c = round(self.temperature, digits)  # grab the temperature
620
621                logger.write_info_to_report(
622                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, "
623                    f"Ah: {amp_hrs}, Temp: {temp_c}"
624                )
625                self.csv.cycle.record(elapsed_time, latest_i, amp_hrs)
626                first_cycle = False
627
628        elif self.nicd_charge_type == NiCdChargeCycle.DV_DT:
629            dv = 0.005 * self.nicd_cell_count
630            while (max_v - latest_v) < dv and latest_v < self.ov_protection and elapsed_time < self.max_time:
631                UserInterrupt.check_user_interrupt(self.charger)  # Pause if necessary, turning off charger relay
632                t1 = elapsed_time  # previous elapsed time for calculating delta t
633                elapsed_time = round(self.timer.elapsed_time, digits)
634                dt = round(elapsed_time - t1, digits)  # delta t
635
636                # This section allows for fairly precise sample interval
637                if dt < self.sample_interval and not first_cycle:
638                    while dt < self.sample_interval:
639                        elapsed_time = round(self.timer.elapsed_time, digits)
640                        dt = round(elapsed_time - t1, digits)  # delta t
641                        time.sleep(0.0005)
642
643                latest_v = self.dmm.volts
644                max_v = max(max_v, latest_v)  # keep track of highest cell voltage seen
645                i1 = latest_i  # previous current reading
646                latest_i = self.charger.amps  # current reading
647                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
648                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
649                temp_c = round(self.temperature, digits)  # grab the temperature
650
651                logger.write_info_to_report(
652                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, "
653                    f"Ah: {amp_hrs}, Temp: {temp_c}"
654                )
655                first_cycle = False
656
657        self.charger.disable()  # Charging is complete, turn off the charger
658        self.remaining_capacity_percentage = 100
659
660        logger.write_info_to_report(f"Charge Cycle complete, Ah: {amp_hrs}")
661        return amp_hrs

Run a NiCad charge cycle.

def run_discharge_step_cycle(self, digits=6):
663    def run_discharge_step_cycle(self, digits=6):
664        """Run a discharge step cycle."""
665        if self.max_time == 0:
666            raise ValueLogError("Max Time Invalid")
667
668        if self.current == 0:
669            raise ValueLogError("Discharge rate Invalid")
670
671        if self.sample_interval == 0:
672            raise ValueLogError("Sample Interval Invalid")
673
674        if self.uv_protection == 0:
675            raise ValueLogError("Undervoltage Protection Invalid")
676
677        if self.percent_discharge == 0:
678            raise ValueLogError("Percent Discharge Invalid")
679
680        if self.total_ah == 0:
681            raise ValueLogError("Total Ah Invalid")
682
683        delta_ah = self.total_ah * self.percent_discharge
684        # discharge_time = ((self.total_ah * self.percent_discharge) / self.discharge_i) * 3600
685        logger.write_info_to_report(
686            f"Starting discharge step cycle ("
687            f"DA: {delta_ah}Ah, UV: {self.uv_protection}V, DI: {self.current}A, "
688            f"PD: {self.percent_discharge * 100}%, TA: {self.total_ah}Ah, SI: {self.sample_interval}s, "
689            f"RC: {self.remaining_capacity_percentage}, MT: {self.max_time}, DV: {self.discharge_until_undervoltage})"
690        )
691
692        self.load.mode_cc()
693        if self.current > 6:
694            self.load.amps_range = 60  # if current is > 6A set to high range
695        self.load.amps = self.current
696        self.load.enable()
697
698        # Place holder values
699        latest_i = 0
700        amp_hrs = 0  # mA hours
701        latest_v = self.uv_protection + 0.01
702        elapsed_time = 0
703        first_cycle = True
704        temp_c = 0
705        # resistance = 0
706
707        self.timer.reset()  # Keep track of runtime
708
709        # discharge until Ah reached or uv_protection or max_time
710        while abs(amp_hrs) < delta_ah or self.discharge_until_undervoltage:
711            if latest_v <= self.uv_protection:
712                exception = UnderVoltageError(
713                    f"Undervoltage protection triggered at {time.strftime('%x %X')}. "
714                    f"Voltage {latest_v} is lower than {self.uv_protection}"
715                )
716                if self.discharge_until_undervoltage:
717                    logger.write_info_to_report(f"{type(exception).__name__} - Continuing test")
718                    break
719                raise exception
720
721            if elapsed_time >= self.max_time:
722                raise TimeoutExceededError(f"Delta Ah of {delta_ah} was not reached after {self.max_time} seconds")
723
724            if temp_c >= self.overtemp_c:
725                raise OverTemperatureError(
726                    f"Overtemperature protection triggered at {time.strftime('%x %X')}. "
727                    f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C."
728                )
729
730            t1 = elapsed_time  # previous elapsed time for calculating delta t
731            elapsed_time = round(self.timer.elapsed_time, digits)
732            dt = round(elapsed_time - t1, digits)  # delta t
733
734            # This section allows for fairly precise sample interval
735            if dt < self.sample_interval and not first_cycle:
736                while dt < self.sample_interval:
737                    elapsed_time = round(self.timer.elapsed_time, digits)
738                    dt = round(elapsed_time - t1, digits)  # delta t
739                    time.sleep(0.0005)
740
741            latest_v = self.dmm.volts
742            # load_v = self.load.volts  # for measuring wiring harness resistance
743
744            i1 = latest_i  # previous current reading
745            # load_ohms = self.load.ohms
746            latest_i = round(-1 * self.load.amps, digits)
747
748            # if not first_cycle:  # skip the first cycle so that there are no divide by 0's
749            #     # wire harness resistance
750            #     resistance = round((latest_v - load_v) / (-1 * latest_i) + load_ohms, digits)
751
752            i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
753            amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
754            temp_c = round(self.temperature, digits)  # grab the temperature
755
756            logger.write_info_to_report(
757                f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}"
758            )
759            self.csv.cycle.record(elapsed_time, latest_i, amp_hrs)
760            first_cycle = False
761
762        self.load.disable()
763        self.remaining_capacity_percentage = round(
764            self.remaining_capacity_percentage - self.percent_discharge * 100, digits
765        )

Run a discharge step cycle.

def run_resting_cycle(self):
767    def run_resting_cycle(self):
768        """Run a resting cycle"""
769        # Guard against calls before init
770        assert self.dmm is not None and self.csv is not None
771
772        if self.max_time == 0:
773            raise ValueLogError("Resting time Invalid")
774
775        if self.sample_interval == 0:
776            raise ValueLogError("Resting sample interval Invalid")
777
778        if self.remaining_capacity_percentage < 0:
779            raise ValueLogError("Remaining capacity percentage Invalid")
780
781        # Place holder values
782        self.percent_discharge = 0  # For capacity calculations
783        elapsed_time = 0.0
784        first_cycle = True
785
786        self.timer.reset()  # Keep track of runtime
787
788        logger.write_info_to_report(f"Starting rest cycle (RT: {self.max_time}, SI:{self.sample_interval})")
789
790        while elapsed_time < self.max_time:
791            UserInterrupt.check_user_interrupt()
792
793            t1 = elapsed_time  # previous elapsed time for calculating delta t
794            elapsed_time = self.timer.elapsed_time
795            dt = elapsed_time - t1  # delta t
796
797            # This section allows for fairly precise sample interval
798            if dt < self.sample_interval and not first_cycle:
799                while dt < self.sample_interval:
800                    elapsed_time = self.timer.elapsed_time
801                    dt = elapsed_time - t1  # delta t
802                    time.sleep(0.0005)
803
804            latest_v = self.dmm.volts
805            temp_c = self.temperature  # grab the temperature
806
807            logger.write_info_to_report(f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Temp: {temp_c}")
808            self.csv.cycle.record(elapsed_time)
809            first_cycle = False

Run a resting cycle

def run_discharge_cycle(self, digits=6):
811    def run_discharge_cycle(self, digits=6):
812        """Run a discharge cycle."""
813
814        if self.max_time == 0:
815            raise ValueLogError("Max Time Invalid")
816
817        if self.sample_interval == 0:
818            raise ValueLogError("Discharge Sample Interval Invalid")
819
820        if self.discharge_type == DischargeType.UNDEFINED:
821            raise ValueLogError("Discharge Mode Invalid")
822
823        if self.voltage == 0:
824            raise ValueLogError("Discharge Voltage Invalid")
825
826        if self.uv_protection == 0:
827            raise ValueLogError("Under Voltage Protection Invalid")
828
829        if self.discharge_type == DischargeType.CONSTANT_CURRENT:
830            # we're in current discharge mode, so make sure there is a valid current
831            if self.current == 0:
832                raise ValueLogError("Discharge Current mode selected, but Current is invalid")
833
834        elif self.discharge_type == DischargeType.CONSTANT_RESISTANCE:
835            # we're in resistance discharge mode so make sure there is a valid resistance.
836            if self.resistance == 0:
837                raise ValueLogError("Discharge Resistance mode selected, but Resistance is invalid")
838
839        logger.write_info_to_report(
840            f"Starting discharge cycle ("
841            f"DV: {self.voltage}V, UV: {self.uv_protection}V, DI: {self.current}A, DR: {self.resistance}Ohms, "
842            f"DM: {self.discharge_type}, MT: {self.max_time}s, SI: {self.sample_interval}s"
843            ")"
844        )
845
846        with self.load(self.current, self.discharge_type, self.resistance):
847
848            self.percent_discharge = 1  # For capacity calculations
849
850            # Place holder values
851            latest_i = 0
852            resistance = 0
853            amp_hrs = 0  # Amp hours
854            latest_v = self.voltage + 0.01
855            elapsed_time = 0
856            first_run = True
857            temp_c = 0
858
859            self.timer.reset()  # Keep track of runtime
860            kill_event = FileEvent("kill_discharge")
861            discharge_event = FileEvent("nicd_discharging")
862            discharge_event.set()
863
864            # discharge until discharge voltage is reached or uv_protection or max_time
865            while not (self.uv_protection < latest_v <= self.voltage or kill_event.is_set()):
866                if latest_v <= self.uv_protection:
867                    self.load.disable()
868                    raise UnderVoltageError(
869                        f"Undervoltage protection triggered at {time.strftime('%x %X')}. "
870                        f"Voltage {latest_v} is lower than {self.uv_protection}."
871                    )
872                if elapsed_time >= self.max_time:
873                    self.load.disable()
874                    raise TimeoutExceededError(
875                        f"Voltage of {self.voltage} was not reached after {self.max_time} seconds"
876                    )
877                if temp_c >= self.overtemp_c:
878                    self.load.disable()
879                    raise OverTemperatureError(
880                        f"Overtemperature protection triggered at {time.strftime('%x %X')}. "
881                        f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C."
882                    )
883
884                UserInterrupt.check_user_interrupt(self.load)  # Pause if necessary, turning off load relay
885
886                t1 = elapsed_time  # previous elapsed time for calculating delta t
887                elapsed_time = round(self.timer.elapsed_time, digits)
888                dt = round(elapsed_time - t1, digits)  # delta t
889
890                # This section allows for fairly precise sample interval
891                if dt < self.sample_interval and not first_run:
892                    while dt < self.sample_interval:
893                        elapsed_time = round(self.timer.elapsed_time, digits)
894                        dt = round(elapsed_time - t1, digits)  # delta t
895                        time.sleep(0.0005)
896
897                latest_v = self.dmm.volts
898                i1 = latest_i  # previous current reading
899                load_ohms = self.load.ohms
900                latest_i = round(-1 * self.load.amps, digits)
901
902                # skip the first cycle so that there are no divide by 0's
903                if not first_run and self.discharge_type == DischargeType.CONSTANT_RESISTANCE:
904                    load_v = self.load.volts  # for measuring wiring harness resistance
905                    try:
906                        resistance = (latest_v - load_v) / (-1 * latest_i) + load_ohms  # wire harness resistance
907                    except ZeroDivisionError:
908                        logger.write_error_to_report(
909                            "Zero division when calculating resistance. "
910                            f"Load Voltage: {load_v}, Current: {latest_i}, Resistance: {load_ohms}."
911                        )
912
913                i_avg = round((i1 + latest_i) / 2, digits)  # average current of the last 2 readings
914                amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits)  # amp hour calculation
915                temp_c = round(self.temperature, digits)  # grab the temperature
916
917                logger.write_info_to_report(
918                    f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, "
919                    f"Resistance: {resistance}, Temp: {temp_c}"
920                )
921                self.csv.cycle.record(elapsed_time, latest_i, amp_hrs, resistance)
922                first_run = False
923
924        self.remaining_capacity_percentage = 0
925        return abs(amp_hrs)  # Return a positive Ah

Run a discharge cycle.

def delay(self, seconds: int | float):
927    def delay(self, seconds: int | float):
928        """Sleep that counts towards elapsed time."""
929        time.sleep(seconds)

Sleep that counts towards elapsed time.

def pause_test( self, seconds_to_pause: float = 0, *relays: hitl_tester.modules.bms_types.ChargerOrLoad):
931    def pause_test(self, seconds_to_pause: float = 0, *relays: ChargerOrLoad):
932        """Pause the test for some amount of seconds."""
933        for relay in relays:
934            relay.disable()
935        if seconds_to_pause == 0:  # Wait for user input to continue
936            logger.write_info_to_report("Pausing: waiting for user input")
937            statuscode = 0xFFFF
938            while not os.WIFEXITED(statuscode):  # process terminated normally?
939                statuscode = os.system(
940                    '/bin/bash -c \'select CHOICE in "Type CONTINUE to resume test"; do '
941                    'if [ "$REPLY" = "CONTINUE" ]; then break; fi; done\''
942                )
943        else:
944            logger.write_info_to_report(f"Pausing test (S: {seconds_to_pause})")
945            time.sleep(seconds_to_pause)
946        for relay in relays:
947            relay.enable()

Pause the test for some amount of seconds.