hitl_tester.modules.bms.bms_hw
The main API for accessing all hardware.
(c) 2020-2024 TurnAround Factor, Inc.
#
CUI DISTRIBUTION CONTROL
Controlled by: DLA J68 R&D SBIP
CUI Category: Small Business Research and Technology
Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
POC: GOV SBIP Program Manager Denise Price, 571-767-0111
Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
Fort Belvoir, VA 22060-6221
#
SBIR DATA RIGHTS
Contract No.:SP4701-23-C-0083
Contractor Name: TurnAround Factor, Inc.
Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
Expiration of SBIR Data Rights Period: September 24, 2029
The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
the markings.
flowchart LR classDef highlighted stroke:#f00, stroke-width:3px HITL_TESTER[hitl_tester.py] --> PLAN(Test Plan) PLAN -->|Args| CASE1[Test Case 1.py] PLAN -->|Args| CASE2[Test Case 2.py] PLAN -->|Args| CASE3[Test Case 3.py] CASE1 --> BMS_HW[bms_hw.py]:::highlighted CASE2 --> BMS_HW CASE3 --> BMS_HW CONFIG(Configuration File) --> BMS_HW BMS_HW --> HW(Hardware) & LOG(Logging) click HITL_TESTER "../hitl_tester.html" click CASE1 "../test_cases.html" click CASE2 "../test_cases.html" click CASE3 "../test_cases.html" click BMS_HW "bms_hw.html"
1""" 2The main API for accessing all hardware. 3 4# (c) 2020-2024 TurnAround Factor, Inc. 5# 6# CUI DISTRIBUTION CONTROL 7# Controlled by: DLA J68 R&D SBIP 8# CUI Category: Small Business Research and Technology 9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, 15# Fort Belvoir, VA 22060-6221 16# 17# SBIR DATA RIGHTS 18# Contract No.:SP4701-23-C-0083 19# Contractor Name: TurnAround Factor, Inc. 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 21# Expiration of SBIR Data Rights Period: September 24, 2029 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce 27# the markings. 28 29```mermaid 30flowchart LR 31 32classDef highlighted stroke:#f00, stroke-width:3px 33 34HITL_TESTER[hitl_tester.py] --> PLAN(Test Plan) 35PLAN -->|Args| CASE1[Test Case 1.py] 36PLAN -->|Args| CASE2[Test Case 2.py] 37PLAN -->|Args| CASE3[Test Case 3.py] 38 39CASE1 --> BMS_HW[bms_hw.py]:::highlighted 40CASE2 --> BMS_HW 41CASE3 --> BMS_HW 42 43CONFIG(Configuration File) --> BMS_HW 44 45BMS_HW --> HW(Hardware) & LOG(Logging) 46 47click HITL_TESTER "../hitl_tester.html" 48click CASE1 "../test_cases.html" 49click CASE2 "../test_cases.html" 50click CASE3 "../test_cases.html" 51click BMS_HW "bms_hw.html" 52``` 53""" 54 55from __future__ import annotations 56 57import inspect 58import os 59import pathlib 60import signal 61import time 62from collections import deque 63from threading import Event 64from types import FrameType 65from typing import ClassVar, Any, cast 66 67import pytest 68import pyvisa as visa 69from typing_extensions import Self 70 71import hitl_tester.modules.bms.bms_hw 72from hitl_tester.modules.bms import pseudo_hardware 73from hitl_tester.modules.bms.cell import Cell 74from hitl_tester.modules.bms.charger import Charger 75from hitl_tester.modules.bms.chroma_sim import ChromaCell 76from hitl_tester.modules.bms.dmm import Dmm 77from hitl_tester.modules.bms.korad import Korad 78from hitl_tester.modules.bms.load import Load 79from hitl_tester.modules.bms.m300_dmm import M300Dmm 80from hitl_tester.modules.bms.ngi_sim import NGICell 81from hitl_tester.modules.bms.plateset import Plateset 82from hitl_tester.modules.bms.thermal_chamber_modbus import ThermalChamber 83from hitl_tester.modules.bms.thermoplate import ThermoCouple 84from hitl_tester.modules.bms_types import ( 85 OverVoltageError, 86 UnderVoltageError, 87 BMSFlags, 88 TimeoutExceededError, 89 ValueLogError, 90 ChargerOrLoad, 91 ResourceNotFoundError, 92 SafeResource, 93 StopWatch, 94 BatteryType, 95 NiCdChargeCycle, 96 DischargeType, 97 OverTemperatureError, 98) 99from hitl_tester.modules.bms.csv_tables import CSVRecorders 100from hitl_tester.modules.file_lock import FileEvent 101from hitl_tester.modules.logger import logger 102 103# Program Constants 104ABS_PATH = pathlib.Path(__file__).parent.resolve() 105HITL_CONFIG = ABS_PATH / ".." / "hitl_config.yml" 106TEST_CASES_PATH = ABS_PATH / ".." / "test_cases" 107TEST_PLANS_PATH = ABS_PATH / ".." / "test_plans" 108OVERTEMP_C = 65 109 110 111class SafeShutdown: 112 """Manage shutting down safely.""" 113 114 SIGNALS = [signal.SIGABRT, signal.SIGHUP, signal.SIGINT, signal.SIGTERM] 115 116 @staticmethod 117 def handler(_signo, _stack_frame): 118 """ 119 This handler will be called during normal / abnormal termination. 120 i.e. ssh connection drops out. In this event, we're going to 121 ensure that the test setup is transitioned to a safe state. 122 """ 123 # pylint: disable=comparison-with-callable # We want to check handler function 124 # Failsafe: CTRL-Z sends kill 125 signal.signal(signal.SIGTSTP, lambda _signo, _stack_frame: signal.raise_signal(signal.SIGKILL)) 126 if _signo: 127 for signal_no in signal.valid_signals(): # Ignore all future signals if already shutting down 128 if signal.getsignal(signal_no) == SafeShutdown.handler and signal_no != signal.SIGINT: 129 logger.write_info_to_report(f"Ignoring {signal.Signals(signal_no).name}") 130 signal.signal(signal_no, signal.SIG_IGN) 131 logger.write_critical_to_report(f"Caught {signal.Signals(_signo).name}, Powering OFF the BMS!!!") 132 pytest.exit("BMS died, exiting test suite early", 1) # Exit gracefully 133 134 135class UserInterrupt: 136 """Suspend / Continue the program on CTRL-Z events.""" 137 138 interrupt_requested = Event() # Suspend by default 139 140 @staticmethod 141 def init(): 142 """Do not suspend by default.""" 143 UserInterrupt.interrupt_requested.set() 144 145 @staticmethod 146 def handler(_signo, _stack_frame): 147 """Toggle program suspension.""" 148 if UserInterrupt.interrupt_requested.is_set(): 149 logger.write_info_to_report("User requested test suspension!") 150 UserInterrupt.interrupt_requested.clear() 151 else: 152 logger.write_info_to_report("User requested test continuation!") 153 UserInterrupt.interrupt_requested.set() 154 155 @staticmethod 156 def force_pause(): 157 """Pause the program.""" 158 logger.write_info_to_report("Program forced test suspension!") 159 UserInterrupt.interrupt_requested.clear() 160 UserInterrupt.check_user_interrupt() 161 162 @staticmethod 163 def check_user_interrupt(*relays: ChargerOrLoad): 164 """Cooperatively check if a user interrupt was received.""" 165 assert BMSHardware.instance is not None 166 167 if not UserInterrupt.interrupt_requested.is_set(): 168 logger.write_info_to_report("Suspending test") 169 for relay in relays: 170 relay.disable() 171 BMSHardware.instance.timer.stop() 172 UserInterrupt.interrupt_requested.wait() 173 BMSHardware.instance.timer.start() 174 logger.write_info_to_report("Continuing test") 175 for relay in relays: 176 relay.enable() 177 178 179class BMSHardware: 180 """The main API for accessing all hardware.""" 181 182 instance: ClassVar[Self | None] = None 183 initialized: bool = False 184 185 def __new__(cls, flags: BMSFlags | None = None): 186 """Make BMS hardware a singleton.""" 187 if cls.instance is None: 188 cls.instance = super().__new__(cls) 189 return cls.instance 190 191 def __init__(self, flags: BMSFlags | None = None): # TODO(JA): document attributes 192 """Initialize the BMS hardware.""" 193 if flags is None: 194 return 195 196 # Battery attributes 197 self.remaining_capacity_percentage = 0.0 198 self.battery_type: BatteryType = BatteryType.UNDEFINED # Initialize to unknown 199 self.max_time = 0.0 200 self.sample_interval = 0.0 201 self.voltage = 0.0 202 self.current = 0.0 203 self.current_limit = 10.0 204 self.overtemp_c = OVERTEMP_C 205 206 # Charging attributes 207 self.minimum_readings = 0 # How many readings to take before passing (for erroneous readings). 208 self.nicd_charge_type: NiCdChargeCycle = NiCdChargeCycle.UNDEFINED 209 self.nicd_cell_count = 0 210 self.termination_current = 0.0 211 self.ov_protection = 0.0 212 213 # Discharging attributes 214 self.resistance = 0.0 215 self.discharge_type: DischargeType = DischargeType.UNDEFINED 216 self.discharge_until_undervoltage = False 217 self.percent_discharge = 0.0 218 self.total_ah = 0.0 219 self.uv_protection = 0.0 220 221 # Hardware attributes 222 self.charger: Charger | None = None 223 self.load: Load | None = None 224 self.dmm: Dmm | M300Dmm | None = None 225 self.power_dmm: Dmm | None = None 226 self.korads: dict[int, Korad] = {} 227 self.thermal_chamber = pseudo_hardware.ThermalChamber() if flags.dry_run else ThermalChamber() 228 self.plateset = Plateset() 229 self.plateset_id = flags.plateset_id 230 self.thermo_couples: dict[int, ThermoCouple] = {} 231 self.cells: dict[int, Cell | NGICell | ChromaCell] = {} 232 self.cell_chemistry = flags.cell_chemistry 233 self.dry_run = flags.dry_run 234 235 # Track total test time 236 self.timer = StopWatch() 237 238 # Logging / Documentation attributes 239 self.config = flags.config 240 self.doc_generation = flags.doc_generation 241 self.report_filename = flags.report_filename 242 self.csv: CSVRecorders = CSVRecorders(cast(hitl_tester.modules.bms.bms_hw.BMSHardware, self)) # Must occur last 243 244 # Modify globals of caller 245 assert isinstance(current_frame := inspect.currentframe(), FrameType) 246 assert isinstance(caller := current_frame.f_back, FrameType) 247 if flags.properties: 248 for global_var, value in flags.properties.items(): 249 caller.f_globals[global_var] = value # Globals must occur before init is called 250 251 def init(self): 252 """Initialize the BMSHardware object.""" 253 if self.initialized: 254 return 255 self.initialized = True 256 257 def get_embed(header: str, key: str, result_type: type) -> Any: 258 """get embedded value from config (i.e. value in dict in list).""" 259 default_item: list[str | dict[str, Any]] = [] 260 items = self.config.get(header, default_item) 261 if isinstance(items, list): 262 for item in items: 263 if isinstance(item, dict) and (result := item.get(key)): 264 try: 265 return result_type(result) 266 except (ValueError, TypeError) as exc: 267 raise RuntimeError( 268 f"Expected {result_type.__name__} type, got {type(result).__name__} type" 269 ) from exc 270 raise RuntimeError(f"Could not find {key} key in {header}") 271 272 if self.doc_generation: # Skip if generating docs 273 return 274 275 # Make sure the charger and the load switches are OFF before doing any testing 276 logger.write_info_to_report("Turning off charger and load switch") 277 self.plateset.charger_switch = False 278 self.plateset.load_switch = False 279 280 # setup some signals to a signal handler so that we can gracefully 281 # shutdown in the event that test is somehow interrupted 282 logger.write_debug_to_report("Setting up Safe Shutdown Handlers") 283 for signal_flag in SafeShutdown.SIGNALS: 284 signal.signal(signal_flag, SafeShutdown.handler) 285 286 # Set up signal terminal stop handler (CTRL-Z) 287 UserInterrupt.init() 288 signal.signal(signal.SIGTSTP, UserInterrupt.handler) 289 290 # FIXME(JA): device classes should manage their own configs/initializations 291 # FIXME(JA): don't assume config is correct 292 293 logger.write_info_to_report("Polling hardware") 294 if self.dry_run: 295 if self.config.get("power_supply_id", "") is not None: 296 logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DP711,DP7A252000573,00.01.05") 297 logger.write_info_to_report("Found DP711 Power Supply") 298 self.charger = pseudo_hardware.Charger() 299 300 if self.config.get("load_id", "") is not None: 301 logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DL3031A,DL3D243000300,00.01.05.00.01") 302 logger.write_info_to_report("Found DL3031A Electronic load") 303 self.load = pseudo_hardware.Load() 304 305 if self.config.get("dmm_id", "") is not None: 306 logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00") 307 logger.write_info_to_report("Found DM3068 DMM") 308 self.dmm = pseudo_hardware.Dmm() 309 310 if self.config.get("m300_dmm") is not None: 311 logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00") 312 logger.write_info_to_report("Found DM3068 DMM") 313 self.dmm = pseudo_hardware.M300Dmm(self.config["m300_dmm"]) 314 315 for korad in self.config.get("korads", []): 316 logger.write_debug_to_report(f"Resource ID: KORAD, {korad['part_id']}") 317 logger.write_info_to_report("Found Korad") 318 new_korad = pseudo_hardware.Korad(korad["id"]) 319 self.korads[new_korad.id] = new_korad 320 321 cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else None 322 if cell_model == "agilent": 323 for cell in self.config.get("cell_simulators", []): 324 if "type" not in cell: 325 logger.write_debug_to_report("Resource ID: Cell Sim, 04.08.15.16.23.42") 326 logger.write_info_to_report("Found Cell Sim") 327 new_cell = pseudo_hardware.Cell(cell["id"], self.cell_chemistry) 328 self.cells[new_cell.id] = new_cell 329 330 else: 331 resource_manager = visa.ResourceManager() 332 resources = resource_manager.list_resources() 333 if not resources: 334 raise ResourceNotFoundError("No PyVISA resources found") 335 336 power_supply_id = self.config.get("power_supply_id", "RIGOL TECHNOLOGIES,DP711") 337 load_id = self.config.get("load_id", "RIGOL TECHNOLOGIES,DL3031A") 338 dmm_id = self.config.get("dmm_id", "Rigol Technologies,DM3068") 339 power_dmm_id = self.config.get("power_dmm_id") 340 if "m300_dmm" in self.config: 341 dmm_id = "M300" 342 343 cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else "" 344 if cell_model == "chroma": # Add chroma to resources 345 address = get_embed("cell_simulators", "address", str) 346 resources = (*resources, (f"TCPIP::{address}::60000::SOCKET", "\n", "\n")) 347 config_cells = [cell for cell in self.config.get("cell_simulators", []) if "type" not in cell] 348 349 for resource_address in resources: 350 safe_resource = SafeResource(resource_address) 351 for baud_rate in (9600, 115200): 352 safe_resource.baud_rate = baud_rate 353 if (idn := safe_resource.query("*IDN?")) != safe_resource.default_result: 354 logger.write_debug_to_report(f"Resource ID: {idn.strip(chr(10))}") 355 356 if power_supply_id is not None and power_supply_id in idn: # Power supply 357 logger.write_info_to_report("Found DP711 Power Supply") 358 self.charger = Charger(safe_resource) 359 360 elif load_id is not None and load_id in idn: # Electronic load 361 logger.write_info_to_report("Found DL3031A Electronic load") 362 self.load = Load(safe_resource) 363 364 elif dmm_id is not None and dmm_id in idn: # DMM 365 logger.write_info_to_report("Found DMM") 366 if "m300_dmm" in self.config: 367 self.dmm = M300Dmm(safe_resource, self.config["m300_dmm"]) 368 else: 369 self.dmm = Dmm(safe_resource) 370 371 elif power_dmm_id is not None and power_dmm_id in idn: # DMM 372 logger.write_info_to_report("Found Power DMM") 373 self.power_dmm = Dmm(safe_resource) 374 375 elif any((match := korad)["part_id"] in idn for korad in self.config.get("korads", [])): 376 logger.write_info_to_report("Found KA6003P Power Supply") 377 new_korad = Korad(match["id"], safe_resource) 378 self.korads[new_korad.id] = new_korad 379 380 elif cell_model == "agilent" and any( 381 (match := cell)["part_id"] in idn for cell in config_cells 382 ): 383 logger.write_info_to_report("Found Agilent Cell Sim") 384 new_cell = Cell(match["id"], safe_resource, self.cell_chemistry) 385 new_cell.reset() 386 self.cells[new_cell.id] = new_cell 387 388 elif cell_model == "ngi" and "NGI,N83624" in idn: 389 logger.write_info_to_report("Found NGI Cell Sim") 390 cell_count = get_embed("cell_simulators", "cell_count", int) 391 for cell_id in range(1, cell_count + 1): 392 new_cell = NGICell(cell_id, safe_resource, self.cell_chemistry) 393 self.cells[new_cell.id] = new_cell 394 if cell_id == cell_count: 395 new_cell.reset() 396 397 elif cell_model == "chroma" and "Chroma,87001" in idn: 398 logger.write_info_to_report("Found Chroma Cell Sim") 399 new_cell = None 400 for cell_id, cell in enumerate(config_cells): 401 if "cell_slot" in cell: 402 new_cell = ChromaCell( 403 cell_id, safe_resource, self.cell_chemistry, cell["cell_slot"] 404 ) 405 self.cells[new_cell.id] = new_cell 406 if new_cell: 407 new_cell.reset() # FIXME(JA): does this cause problems for the other side if running? 408 409 else: 410 logger.write_warning_to_report("Unknown Resource") 411 412 break 413 414 logger.write_warning_to_report(f"Failed to connect to: {resource_address}") 415 logger.write_debug_to_report("Attempting higher baud rate") 416 417 if power_supply_id is not None and self.charger is None: 418 raise ResourceNotFoundError("Power Supply Not Found") 419 420 if load_id is not None and self.load is None: 421 raise ResourceNotFoundError("Load Simulator Not Found") 422 423 if dmm_id is not None and self.dmm is None: 424 raise ResourceNotFoundError("DMM Not Found") 425 426 if power_dmm_id is not None and self.power_dmm is None: 427 raise ResourceNotFoundError("Power DMM Not Found") 428 429 if len(self.korads) < len(self.config.get("korads", [])): 430 for korad in self.config["korads"]: 431 if korad["id"] not in self.korads: 432 raise ResourceNotFoundError(f'Unable to locate Korad {korad["id"]}') 433 self.korads = dict(sorted(self.korads.items())) # Sort korads by id 434 435 if cell_model == "agilent" and len(self.cells) < len(config_cells): 436 for cell in config_cells: 437 if cell["id"] not in self.cells: 438 raise ResourceNotFoundError(f'Unable to locate Cell {cell["id"]} Simulator') 439 if cell_model == "ngi" and len(self.cells) == 0: 440 raise ResourceNotFoundError("Unable to locate NGI Cell Simulator") 441 if cell_model == "chroma" and len(self.cells) == 0: 442 raise ResourceNotFoundError("Unable to locate Chroma Cell Simulator") 443 self.cells = dict(sorted(self.cells.items())) # Sort cells by id 444 445 # Now go through the ThermoCouples 446 for thermo_couple in self.config.get("thermo_couples", []): 447 new_thermo_couple = ThermoCouple(thermo_couple["id"], thermo_couple["board"], thermo_couple["channel"]) 448 self.thermo_couples[new_thermo_couple.thermocouple_id] = new_thermo_couple 449 450 logger.write_debug_to_report("Setting up SIGCHLD Handler") # Must be set up afterward due to VISA raising it 451 signal.signal(signal.SIGCHLD, SafeShutdown.handler) 452 453 # FIXME(JA): move into its own module, use exceptions 454 455 @property 456 def temperature(self) -> float: 457 """Temperature measurement used by run cycles.""" 458 return float(self.thermo_couples[1].temperature) if 1 in self.thermo_couples else self.plateset.temperature 459 460 def run_li_charge_cycle(self, digits=6): 461 """Run a Li charge cycle.""" 462 if self.voltage == 0: 463 raise ValueLogError("Charging Voltage Invalid") 464 465 if self.ov_protection == 0: 466 raise ValueLogError("Over-Voltage Protection Invalid") 467 468 if self.current == 0: 469 raise ValueLogError("Charge Current Invalid") 470 471 if self.termination_current == 0: 472 raise ValueLogError("Termination Current Invalid") 473 474 if self.max_time == 0: 475 raise ValueLogError("Max Time Invalid") 476 477 if self.sample_interval == 0: 478 raise ValueLogError("Charging Sample Time Invalid") 479 480 if self.minimum_readings == 0: 481 raise ValueLogError("Minimum Readings Invalid") 482 483 self.percent_discharge = -1 # For capacity calculations 484 485 # Create a queue to hold the previous n readings. This is used to avoid erroneous errors. 486 current_measurement_queue = deque(maxlen=self.minimum_readings) 487 488 logger.write_info_to_report( 489 f"Starting charge cycle (CV: {self.voltage}V, OV: {self.ov_protection}V, " 490 f"CI: {self.current}A, TI: {self.termination_current}A, MT: {self.max_time}s, " 491 f"SI: {self.sample_interval}s, MR: {self.minimum_readings}" 492 ")" 493 ) 494 495 with self.charger(self.voltage, self.current): 496 497 # Starting values for the while loop 498 latest_i = self.termination_current + 0.1 # placeholder for the latest charging current measurement 499 current_measurement_queue.append(latest_i) 500 amp_hrs = 0 # amp hours 501 latest_v = 0 # placeholder for the latest charging voltage measurement 502 elapsed_time = 0 503 temp_c = 0 504 505 self.timer.reset() # Keep track of runtime 506 507 # charge until termination current is reached or ov_protection or max_time 508 while any(i >= self.termination_current for i in current_measurement_queue): 509 if latest_v >= self.ov_protection: 510 self.charger.disable() 511 raise OverVoltageError( 512 f"Overvoltage protection triggered at {time.strftime('%x %X')}. " 513 f"Voltage {latest_v} is higher than {self.ov_protection}." 514 ) 515 if elapsed_time >= self.max_time: 516 self.charger.disable() 517 raise TimeoutExceededError( 518 f"Termination current of {self.termination_current}A was " 519 f"not reached after {self.max_time} seconds" 520 ) 521 if temp_c >= self.overtemp_c: 522 self.charger.disable() 523 raise OverTemperatureError( 524 f"Overtemperature protection triggered at {time.strftime('%x %X')}. " 525 f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C." 526 ) 527 528 UserInterrupt.check_user_interrupt(self.charger) # Pause if necessary, turning off charger relay 529 530 t1 = elapsed_time # previous elapsed time for calculating delta t 531 elapsed_time = round(self.timer.elapsed_time, digits) 532 dt = round(elapsed_time - t1, digits) # delta t 533 534 latest_v = self.dmm.volts 535 536 i1 = latest_i # previous current reading 537 latest_i = self.charger.amps 538 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 539 current_measurement_queue.append(latest_i) 540 541 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # milliamp hour calculation 542 temp_c = round(self.temperature, digits) # grab the temperature 543 544 logger.write_info_to_report( 545 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, " 546 f"Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}" 547 ) 548 549 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs) 550 time.sleep(self.sample_interval) 551 552 self.remaining_capacity_percentage = 100 553 554 def run_nicd_charge_cycle(self, digits=6): 555 """Run a NiCad charge cycle.""" 556 557 if self.nicd_cell_count == 0: 558 raise ValueLogError("Cell Count Invalid") 559 560 if self.max_time == 0: 561 raise ValueLogError("Charge Time Invalid") 562 563 if self.sample_interval == 0: 564 raise ValueLogError("Charge Sample Interval Invalid") 565 566 if self.current == 0: 567 raise ValueLogError("Charge Current Invalid") 568 569 if self.nicd_charge_type is NiCdChargeCycle.UNDEFINED: 570 raise ValueLogError("Charge Type Invalid") 571 572 self.percent_discharge = -1 # For capacity calculations 573 self.ov_protection = min(30.0, self.nicd_cell_count * 1.95) 574 575 logger.write_info_to_report( 576 f"Starting {self.nicd_charge_type.name.lower()} NiCd charge cycle (" 577 f"CV: {self.ov_protection}V, OV: {self.ov_protection}V, CI: {self.current}A, MT: {self.max_time}s, " 578 f"SI: {self.sample_interval}s)" 579 ) 580 581 self.charger.set_profile(self.ov_protection, self.current) 582 self.charger.enable() # Enables the output of the charger 583 584 # initialize charge control variables 585 latest_i = 0 586 amp_hrs = 0 587 latest_v = 0 588 elapsed_time = 0 589 max_v = 0 590 first_cycle = True 591 592 self.timer.reset() # Keep track of runtime 593 594 if self.nicd_charge_type in (NiCdChargeCycle.STANDARD, NiCdChargeCycle.CUSTOM): 595 while latest_v < self.ov_protection and elapsed_time < self.max_time: 596 UserInterrupt.check_user_interrupt(self.charger) # Pause if necessary, turning off charger relay 597 t1 = elapsed_time 598 elapsed_time = round(self.timer.elapsed_time, digits) 599 dt = round(elapsed_time - t1, digits) 600 601 # This section allows for fairly precise sample interval 602 if dt < self.sample_interval and not first_cycle: 603 while dt < self.sample_interval: 604 elapsed_time = round(self.timer.elapsed_time, digits) 605 dt = round(elapsed_time - t1, digits) # delta t 606 time.sleep(0.0005) 607 608 latest_v = self.dmm.volts 609 610 # keep track of the highest cell voltage seen 611 max_v = max(max_v, latest_v) 612 613 i1 = latest_i # previous current reading 614 latest_i = self.charger.amps # current reading 615 616 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 617 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 618 temp_c = round(self.temperature, digits) # grab the temperature 619 620 logger.write_info_to_report( 621 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, " 622 f"Ah: {amp_hrs}, Temp: {temp_c}" 623 ) 624 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs) 625 first_cycle = False 626 627 elif self.nicd_charge_type == NiCdChargeCycle.DV_DT: 628 dv = 0.005 * self.nicd_cell_count 629 while (max_v - latest_v) < dv and latest_v < self.ov_protection and elapsed_time < self.max_time: 630 UserInterrupt.check_user_interrupt(self.charger) # Pause if necessary, turning off charger relay 631 t1 = elapsed_time # previous elapsed time for calculating delta t 632 elapsed_time = round(self.timer.elapsed_time, digits) 633 dt = round(elapsed_time - t1, digits) # delta t 634 635 # This section allows for fairly precise sample interval 636 if dt < self.sample_interval and not first_cycle: 637 while dt < self.sample_interval: 638 elapsed_time = round(self.timer.elapsed_time, digits) 639 dt = round(elapsed_time - t1, digits) # delta t 640 time.sleep(0.0005) 641 642 latest_v = self.dmm.volts 643 max_v = max(max_v, latest_v) # keep track of highest cell voltage seen 644 i1 = latest_i # previous current reading 645 latest_i = self.charger.amps # current reading 646 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 647 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 648 temp_c = round(self.temperature, digits) # grab the temperature 649 650 logger.write_info_to_report( 651 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, " 652 f"Ah: {amp_hrs}, Temp: {temp_c}" 653 ) 654 first_cycle = False 655 656 self.charger.disable() # Charging is complete, turn off the charger 657 self.remaining_capacity_percentage = 100 658 659 logger.write_info_to_report(f"Charge Cycle complete, Ah: {amp_hrs}") 660 return amp_hrs 661 662 def run_discharge_step_cycle(self, digits=6): 663 """Run a discharge step cycle.""" 664 if self.max_time == 0: 665 raise ValueLogError("Max Time Invalid") 666 667 if self.current == 0: 668 raise ValueLogError("Discharge rate Invalid") 669 670 if self.sample_interval == 0: 671 raise ValueLogError("Sample Interval Invalid") 672 673 if self.uv_protection == 0: 674 raise ValueLogError("Undervoltage Protection Invalid") 675 676 if self.percent_discharge == 0: 677 raise ValueLogError("Percent Discharge Invalid") 678 679 if self.total_ah == 0: 680 raise ValueLogError("Total Ah Invalid") 681 682 delta_ah = self.total_ah * self.percent_discharge 683 # discharge_time = ((self.total_ah * self.percent_discharge) / self.discharge_i) * 3600 684 logger.write_info_to_report( 685 f"Starting discharge step cycle (" 686 f"DA: {delta_ah}Ah, UV: {self.uv_protection}V, DI: {self.current}A, " 687 f"PD: {self.percent_discharge * 100}%, TA: {self.total_ah}Ah, SI: {self.sample_interval}s, " 688 f"RC: {self.remaining_capacity_percentage}, MT: {self.max_time}, DV: {self.discharge_until_undervoltage})" 689 ) 690 691 self.load.mode_cc() 692 if self.current > 6: 693 self.load.amps_range = 60 # if current is > 6A set to high range 694 self.load.amps = self.current 695 self.load.enable() 696 697 # Place holder values 698 latest_i = 0 699 amp_hrs = 0 # mA hours 700 latest_v = self.uv_protection + 0.01 701 elapsed_time = 0 702 first_cycle = True 703 temp_c = 0 704 # resistance = 0 705 706 self.timer.reset() # Keep track of runtime 707 708 # discharge until Ah reached or uv_protection or max_time 709 while abs(amp_hrs) < delta_ah or self.discharge_until_undervoltage: 710 if latest_v <= self.uv_protection: 711 exception = UnderVoltageError( 712 f"Undervoltage protection triggered at {time.strftime('%x %X')}. " 713 f"Voltage {latest_v} is lower than {self.uv_protection}" 714 ) 715 if self.discharge_until_undervoltage: 716 logger.write_info_to_report(f"{type(exception).__name__} - Continuing test") 717 break 718 raise exception 719 720 if elapsed_time >= self.max_time: 721 raise TimeoutExceededError(f"Delta Ah of {delta_ah} was not reached after {self.max_time} seconds") 722 723 if temp_c >= self.overtemp_c: 724 raise OverTemperatureError( 725 f"Overtemperature protection triggered at {time.strftime('%x %X')}. " 726 f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C." 727 ) 728 729 t1 = elapsed_time # previous elapsed time for calculating delta t 730 elapsed_time = round(self.timer.elapsed_time, digits) 731 dt = round(elapsed_time - t1, digits) # delta t 732 733 # This section allows for fairly precise sample interval 734 if dt < self.sample_interval and not first_cycle: 735 while dt < self.sample_interval: 736 elapsed_time = round(self.timer.elapsed_time, digits) 737 dt = round(elapsed_time - t1, digits) # delta t 738 time.sleep(0.0005) 739 740 latest_v = self.dmm.volts 741 # load_v = self.load.volts # for measuring wiring harness resistance 742 743 i1 = latest_i # previous current reading 744 # load_ohms = self.load.ohms 745 latest_i = round(-1 * self.load.amps, digits) 746 747 # if not first_cycle: # skip the first cycle so that there are no divide by 0's 748 # # wire harness resistance 749 # resistance = round((latest_v - load_v) / (-1 * latest_i) + load_ohms, digits) 750 751 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 752 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 753 temp_c = round(self.temperature, digits) # grab the temperature 754 755 logger.write_info_to_report( 756 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}" 757 ) 758 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs) 759 first_cycle = False 760 761 self.load.disable() 762 self.remaining_capacity_percentage = round( 763 self.remaining_capacity_percentage - self.percent_discharge * 100, digits 764 ) 765 766 def run_resting_cycle(self): 767 """Run a resting cycle""" 768 # Guard against calls before init 769 assert self.dmm is not None and self.csv is not None 770 771 if self.max_time == 0: 772 raise ValueLogError("Resting time Invalid") 773 774 if self.sample_interval == 0: 775 raise ValueLogError("Resting sample interval Invalid") 776 777 if self.remaining_capacity_percentage < 0: 778 raise ValueLogError("Remaining capacity percentage Invalid") 779 780 # Place holder values 781 self.percent_discharge = 0 # For capacity calculations 782 elapsed_time = 0.0 783 first_cycle = True 784 785 self.timer.reset() # Keep track of runtime 786 787 logger.write_info_to_report(f"Starting rest cycle (RT: {self.max_time}, SI:{self.sample_interval})") 788 789 while elapsed_time < self.max_time: 790 UserInterrupt.check_user_interrupt() 791 792 t1 = elapsed_time # previous elapsed time for calculating delta t 793 elapsed_time = self.timer.elapsed_time 794 dt = elapsed_time - t1 # delta t 795 796 # This section allows for fairly precise sample interval 797 if dt < self.sample_interval and not first_cycle: 798 while dt < self.sample_interval: 799 elapsed_time = self.timer.elapsed_time 800 dt = elapsed_time - t1 # delta t 801 time.sleep(0.0005) 802 803 latest_v = self.dmm.volts 804 temp_c = self.temperature # grab the temperature 805 806 logger.write_info_to_report(f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Temp: {temp_c}") 807 self.csv.cycle.record(elapsed_time) 808 first_cycle = False 809 810 def run_discharge_cycle(self, digits=6): 811 """Run a discharge cycle.""" 812 813 if self.max_time == 0: 814 raise ValueLogError("Max Time Invalid") 815 816 if self.sample_interval == 0: 817 raise ValueLogError("Discharge Sample Interval Invalid") 818 819 if self.discharge_type == DischargeType.UNDEFINED: 820 raise ValueLogError("Discharge Mode Invalid") 821 822 if self.voltage == 0: 823 raise ValueLogError("Discharge Voltage Invalid") 824 825 if self.uv_protection == 0: 826 raise ValueLogError("Under Voltage Protection Invalid") 827 828 if self.discharge_type == DischargeType.CONSTANT_CURRENT: 829 # we're in current discharge mode, so make sure there is a valid current 830 if self.current == 0: 831 raise ValueLogError("Discharge Current mode selected, but Current is invalid") 832 833 elif self.discharge_type == DischargeType.CONSTANT_RESISTANCE: 834 # we're in resistance discharge mode so make sure there is a valid resistance. 835 if self.resistance == 0: 836 raise ValueLogError("Discharge Resistance mode selected, but Resistance is invalid") 837 838 logger.write_info_to_report( 839 f"Starting discharge cycle (" 840 f"DV: {self.voltage}V, UV: {self.uv_protection}V, DI: {self.current}A, DR: {self.resistance}Ohms, " 841 f"DM: {self.discharge_type}, MT: {self.max_time}s, SI: {self.sample_interval}s" 842 ")" 843 ) 844 845 with self.load(self.current, self.discharge_type, self.resistance): 846 847 self.percent_discharge = 1 # For capacity calculations 848 849 # Place holder values 850 latest_i = 0 851 resistance = 0 852 amp_hrs = 0 # Amp hours 853 latest_v = self.voltage + 0.01 854 elapsed_time = 0 855 first_run = True 856 temp_c = 0 857 858 self.timer.reset() # Keep track of runtime 859 kill_event = FileEvent("kill_discharge") 860 discharge_event = FileEvent("nicd_discharging") 861 discharge_event.set() 862 863 # discharge until discharge voltage is reached or uv_protection or max_time 864 while not (self.uv_protection < latest_v <= self.voltage or kill_event.is_set()): 865 if latest_v <= self.uv_protection: 866 self.load.disable() 867 raise UnderVoltageError( 868 f"Undervoltage protection triggered at {time.strftime('%x %X')}. " 869 f"Voltage {latest_v} is lower than {self.uv_protection}." 870 ) 871 if elapsed_time >= self.max_time: 872 self.load.disable() 873 raise TimeoutExceededError( 874 f"Voltage of {self.voltage} was not reached after {self.max_time} seconds" 875 ) 876 if temp_c >= self.overtemp_c: 877 self.load.disable() 878 raise OverTemperatureError( 879 f"Overtemperature protection triggered at {time.strftime('%x %X')}. " 880 f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C." 881 ) 882 883 UserInterrupt.check_user_interrupt(self.load) # Pause if necessary, turning off load relay 884 885 t1 = elapsed_time # previous elapsed time for calculating delta t 886 elapsed_time = round(self.timer.elapsed_time, digits) 887 dt = round(elapsed_time - t1, digits) # delta t 888 889 # This section allows for fairly precise sample interval 890 if dt < self.sample_interval and not first_run: 891 while dt < self.sample_interval: 892 elapsed_time = round(self.timer.elapsed_time, digits) 893 dt = round(elapsed_time - t1, digits) # delta t 894 time.sleep(0.0005) 895 896 latest_v = self.dmm.volts 897 i1 = latest_i # previous current reading 898 load_ohms = self.load.ohms 899 latest_i = round(-1 * self.load.amps, digits) 900 901 # skip the first cycle so that there are no divide by 0's 902 if not first_run and self.discharge_type == DischargeType.CONSTANT_RESISTANCE: 903 load_v = self.load.volts # for measuring wiring harness resistance 904 try: 905 resistance = (latest_v - load_v) / (-1 * latest_i) + load_ohms # wire harness resistance 906 except ZeroDivisionError: 907 logger.write_error_to_report( 908 "Zero division when calculating resistance. " 909 f"Load Voltage: {load_v}, Current: {latest_i}, Resistance: {load_ohms}." 910 ) 911 912 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 913 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 914 temp_c = round(self.temperature, digits) # grab the temperature 915 916 logger.write_info_to_report( 917 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, " 918 f"Resistance: {resistance}, Temp: {temp_c}" 919 ) 920 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs, resistance) 921 first_run = False 922 923 self.remaining_capacity_percentage = 0 924 return abs(amp_hrs) # Return a positive Ah 925 926 def delay(self, seconds: int | float): 927 """Sleep that counts towards elapsed time.""" 928 time.sleep(seconds) 929 930 def pause_test(self, seconds_to_pause: float = 0, *relays: ChargerOrLoad): 931 """Pause the test for some amount of seconds.""" 932 for relay in relays: 933 relay.disable() 934 if seconds_to_pause == 0: # Wait for user input to continue 935 logger.write_info_to_report("Pausing: waiting for user input") 936 statuscode = 0xFFFF 937 while not os.WIFEXITED(statuscode): # process terminated normally? 938 statuscode = os.system( 939 '/bin/bash -c \'select CHOICE in "Type CONTINUE to resume test"; do ' 940 'if [ "$REPLY" = "CONTINUE" ]; then break; fi; done\'' 941 ) 942 else: 943 logger.write_info_to_report(f"Pausing test (S: {seconds_to_pause})") 944 time.sleep(seconds_to_pause) 945 for relay in relays: 946 relay.enable()
112class SafeShutdown: 113 """Manage shutting down safely.""" 114 115 SIGNALS = [signal.SIGABRT, signal.SIGHUP, signal.SIGINT, signal.SIGTERM] 116 117 @staticmethod 118 def handler(_signo, _stack_frame): 119 """ 120 This handler will be called during normal / abnormal termination. 121 i.e. ssh connection drops out. In this event, we're going to 122 ensure that the test setup is transitioned to a safe state. 123 """ 124 # pylint: disable=comparison-with-callable # We want to check handler function 125 # Failsafe: CTRL-Z sends kill 126 signal.signal(signal.SIGTSTP, lambda _signo, _stack_frame: signal.raise_signal(signal.SIGKILL)) 127 if _signo: 128 for signal_no in signal.valid_signals(): # Ignore all future signals if already shutting down 129 if signal.getsignal(signal_no) == SafeShutdown.handler and signal_no != signal.SIGINT: 130 logger.write_info_to_report(f"Ignoring {signal.Signals(signal_no).name}") 131 signal.signal(signal_no, signal.SIG_IGN) 132 logger.write_critical_to_report(f"Caught {signal.Signals(_signo).name}, Powering OFF the BMS!!!") 133 pytest.exit("BMS died, exiting test suite early", 1) # Exit gracefully
Manage shutting down safely.
117 @staticmethod 118 def handler(_signo, _stack_frame): 119 """ 120 This handler will be called during normal / abnormal termination. 121 i.e. ssh connection drops out. In this event, we're going to 122 ensure that the test setup is transitioned to a safe state. 123 """ 124 # pylint: disable=comparison-with-callable # We want to check handler function 125 # Failsafe: CTRL-Z sends kill 126 signal.signal(signal.SIGTSTP, lambda _signo, _stack_frame: signal.raise_signal(signal.SIGKILL)) 127 if _signo: 128 for signal_no in signal.valid_signals(): # Ignore all future signals if already shutting down 129 if signal.getsignal(signal_no) == SafeShutdown.handler and signal_no != signal.SIGINT: 130 logger.write_info_to_report(f"Ignoring {signal.Signals(signal_no).name}") 131 signal.signal(signal_no, signal.SIG_IGN) 132 logger.write_critical_to_report(f"Caught {signal.Signals(_signo).name}, Powering OFF the BMS!!!") 133 pytest.exit("BMS died, exiting test suite early", 1) # Exit gracefully
This handler will be called during normal / abnormal termination. i.e. ssh connection drops out. In this event, we're going to ensure that the test setup is transitioned to a safe state.
136class UserInterrupt: 137 """Suspend / Continue the program on CTRL-Z events.""" 138 139 interrupt_requested = Event() # Suspend by default 140 141 @staticmethod 142 def init(): 143 """Do not suspend by default.""" 144 UserInterrupt.interrupt_requested.set() 145 146 @staticmethod 147 def handler(_signo, _stack_frame): 148 """Toggle program suspension.""" 149 if UserInterrupt.interrupt_requested.is_set(): 150 logger.write_info_to_report("User requested test suspension!") 151 UserInterrupt.interrupt_requested.clear() 152 else: 153 logger.write_info_to_report("User requested test continuation!") 154 UserInterrupt.interrupt_requested.set() 155 156 @staticmethod 157 def force_pause(): 158 """Pause the program.""" 159 logger.write_info_to_report("Program forced test suspension!") 160 UserInterrupt.interrupt_requested.clear() 161 UserInterrupt.check_user_interrupt() 162 163 @staticmethod 164 def check_user_interrupt(*relays: ChargerOrLoad): 165 """Cooperatively check if a user interrupt was received.""" 166 assert BMSHardware.instance is not None 167 168 if not UserInterrupt.interrupt_requested.is_set(): 169 logger.write_info_to_report("Suspending test") 170 for relay in relays: 171 relay.disable() 172 BMSHardware.instance.timer.stop() 173 UserInterrupt.interrupt_requested.wait() 174 BMSHardware.instance.timer.start() 175 logger.write_info_to_report("Continuing test") 176 for relay in relays: 177 relay.enable()
Suspend / Continue the program on CTRL-Z events.
141 @staticmethod 142 def init(): 143 """Do not suspend by default.""" 144 UserInterrupt.interrupt_requested.set()
Do not suspend by default.
146 @staticmethod 147 def handler(_signo, _stack_frame): 148 """Toggle program suspension.""" 149 if UserInterrupt.interrupt_requested.is_set(): 150 logger.write_info_to_report("User requested test suspension!") 151 UserInterrupt.interrupt_requested.clear() 152 else: 153 logger.write_info_to_report("User requested test continuation!") 154 UserInterrupt.interrupt_requested.set()
Toggle program suspension.
156 @staticmethod 157 def force_pause(): 158 """Pause the program.""" 159 logger.write_info_to_report("Program forced test suspension!") 160 UserInterrupt.interrupt_requested.clear() 161 UserInterrupt.check_user_interrupt()
Pause the program.
163 @staticmethod 164 def check_user_interrupt(*relays: ChargerOrLoad): 165 """Cooperatively check if a user interrupt was received.""" 166 assert BMSHardware.instance is not None 167 168 if not UserInterrupt.interrupt_requested.is_set(): 169 logger.write_info_to_report("Suspending test") 170 for relay in relays: 171 relay.disable() 172 BMSHardware.instance.timer.stop() 173 UserInterrupt.interrupt_requested.wait() 174 BMSHardware.instance.timer.start() 175 logger.write_info_to_report("Continuing test") 176 for relay in relays: 177 relay.enable()
Cooperatively check if a user interrupt was received.
180class BMSHardware: 181 """The main API for accessing all hardware.""" 182 183 instance: ClassVar[Self | None] = None 184 initialized: bool = False 185 186 def __new__(cls, flags: BMSFlags | None = None): 187 """Make BMS hardware a singleton.""" 188 if cls.instance is None: 189 cls.instance = super().__new__(cls) 190 return cls.instance 191 192 def __init__(self, flags: BMSFlags | None = None): # TODO(JA): document attributes 193 """Initialize the BMS hardware.""" 194 if flags is None: 195 return 196 197 # Battery attributes 198 self.remaining_capacity_percentage = 0.0 199 self.battery_type: BatteryType = BatteryType.UNDEFINED # Initialize to unknown 200 self.max_time = 0.0 201 self.sample_interval = 0.0 202 self.voltage = 0.0 203 self.current = 0.0 204 self.current_limit = 10.0 205 self.overtemp_c = OVERTEMP_C 206 207 # Charging attributes 208 self.minimum_readings = 0 # How many readings to take before passing (for erroneous readings). 209 self.nicd_charge_type: NiCdChargeCycle = NiCdChargeCycle.UNDEFINED 210 self.nicd_cell_count = 0 211 self.termination_current = 0.0 212 self.ov_protection = 0.0 213 214 # Discharging attributes 215 self.resistance = 0.0 216 self.discharge_type: DischargeType = DischargeType.UNDEFINED 217 self.discharge_until_undervoltage = False 218 self.percent_discharge = 0.0 219 self.total_ah = 0.0 220 self.uv_protection = 0.0 221 222 # Hardware attributes 223 self.charger: Charger | None = None 224 self.load: Load | None = None 225 self.dmm: Dmm | M300Dmm | None = None 226 self.power_dmm: Dmm | None = None 227 self.korads: dict[int, Korad] = {} 228 self.thermal_chamber = pseudo_hardware.ThermalChamber() if flags.dry_run else ThermalChamber() 229 self.plateset = Plateset() 230 self.plateset_id = flags.plateset_id 231 self.thermo_couples: dict[int, ThermoCouple] = {} 232 self.cells: dict[int, Cell | NGICell | ChromaCell] = {} 233 self.cell_chemistry = flags.cell_chemistry 234 self.dry_run = flags.dry_run 235 236 # Track total test time 237 self.timer = StopWatch() 238 239 # Logging / Documentation attributes 240 self.config = flags.config 241 self.doc_generation = flags.doc_generation 242 self.report_filename = flags.report_filename 243 self.csv: CSVRecorders = CSVRecorders(cast(hitl_tester.modules.bms.bms_hw.BMSHardware, self)) # Must occur last 244 245 # Modify globals of caller 246 assert isinstance(current_frame := inspect.currentframe(), FrameType) 247 assert isinstance(caller := current_frame.f_back, FrameType) 248 if flags.properties: 249 for global_var, value in flags.properties.items(): 250 caller.f_globals[global_var] = value # Globals must occur before init is called 251 252 def init(self): 253 """Initialize the BMSHardware object.""" 254 if self.initialized: 255 return 256 self.initialized = True 257 258 def get_embed(header: str, key: str, result_type: type) -> Any: 259 """get embedded value from config (i.e. value in dict in list).""" 260 default_item: list[str | dict[str, Any]] = [] 261 items = self.config.get(header, default_item) 262 if isinstance(items, list): 263 for item in items: 264 if isinstance(item, dict) and (result := item.get(key)): 265 try: 266 return result_type(result) 267 except (ValueError, TypeError) as exc: 268 raise RuntimeError( 269 f"Expected {result_type.__name__} type, got {type(result).__name__} type" 270 ) from exc 271 raise RuntimeError(f"Could not find {key} key in {header}") 272 273 if self.doc_generation: # Skip if generating docs 274 return 275 276 # Make sure the charger and the load switches are OFF before doing any testing 277 logger.write_info_to_report("Turning off charger and load switch") 278 self.plateset.charger_switch = False 279 self.plateset.load_switch = False 280 281 # setup some signals to a signal handler so that we can gracefully 282 # shutdown in the event that test is somehow interrupted 283 logger.write_debug_to_report("Setting up Safe Shutdown Handlers") 284 for signal_flag in SafeShutdown.SIGNALS: 285 signal.signal(signal_flag, SafeShutdown.handler) 286 287 # Set up signal terminal stop handler (CTRL-Z) 288 UserInterrupt.init() 289 signal.signal(signal.SIGTSTP, UserInterrupt.handler) 290 291 # FIXME(JA): device classes should manage their own configs/initializations 292 # FIXME(JA): don't assume config is correct 293 294 logger.write_info_to_report("Polling hardware") 295 if self.dry_run: 296 if self.config.get("power_supply_id", "") is not None: 297 logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DP711,DP7A252000573,00.01.05") 298 logger.write_info_to_report("Found DP711 Power Supply") 299 self.charger = pseudo_hardware.Charger() 300 301 if self.config.get("load_id", "") is not None: 302 logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DL3031A,DL3D243000300,00.01.05.00.01") 303 logger.write_info_to_report("Found DL3031A Electronic load") 304 self.load = pseudo_hardware.Load() 305 306 if self.config.get("dmm_id", "") is not None: 307 logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00") 308 logger.write_info_to_report("Found DM3068 DMM") 309 self.dmm = pseudo_hardware.Dmm() 310 311 if self.config.get("m300_dmm") is not None: 312 logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00") 313 logger.write_info_to_report("Found DM3068 DMM") 314 self.dmm = pseudo_hardware.M300Dmm(self.config["m300_dmm"]) 315 316 for korad in self.config.get("korads", []): 317 logger.write_debug_to_report(f"Resource ID: KORAD, {korad['part_id']}") 318 logger.write_info_to_report("Found Korad") 319 new_korad = pseudo_hardware.Korad(korad["id"]) 320 self.korads[new_korad.id] = new_korad 321 322 cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else None 323 if cell_model == "agilent": 324 for cell in self.config.get("cell_simulators", []): 325 if "type" not in cell: 326 logger.write_debug_to_report("Resource ID: Cell Sim, 04.08.15.16.23.42") 327 logger.write_info_to_report("Found Cell Sim") 328 new_cell = pseudo_hardware.Cell(cell["id"], self.cell_chemistry) 329 self.cells[new_cell.id] = new_cell 330 331 else: 332 resource_manager = visa.ResourceManager() 333 resources = resource_manager.list_resources() 334 if not resources: 335 raise ResourceNotFoundError("No PyVISA resources found") 336 337 power_supply_id = self.config.get("power_supply_id", "RIGOL TECHNOLOGIES,DP711") 338 load_id = self.config.get("load_id", "RIGOL TECHNOLOGIES,DL3031A") 339 dmm_id = self.config.get("dmm_id", "Rigol Technologies,DM3068") 340 power_dmm_id = self.config.get("power_dmm_id") 341 if "m300_dmm" in self.config: 342 dmm_id = "M300" 343 344 cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else "" 345 if cell_model == "chroma": # Add chroma to resources 346 address = get_embed("cell_simulators", "address", str) 347 resources = (*resources, (f"TCPIP::{address}::60000::SOCKET", "\n", "\n")) 348 config_cells = [cell for cell in self.config.get("cell_simulators", []) if "type" not in cell] 349 350 for resource_address in resources: 351 safe_resource = SafeResource(resource_address) 352 for baud_rate in (9600, 115200): 353 safe_resource.baud_rate = baud_rate 354 if (idn := safe_resource.query("*IDN?")) != safe_resource.default_result: 355 logger.write_debug_to_report(f"Resource ID: {idn.strip(chr(10))}") 356 357 if power_supply_id is not None and power_supply_id in idn: # Power supply 358 logger.write_info_to_report("Found DP711 Power Supply") 359 self.charger = Charger(safe_resource) 360 361 elif load_id is not None and load_id in idn: # Electronic load 362 logger.write_info_to_report("Found DL3031A Electronic load") 363 self.load = Load(safe_resource) 364 365 elif dmm_id is not None and dmm_id in idn: # DMM 366 logger.write_info_to_report("Found DMM") 367 if "m300_dmm" in self.config: 368 self.dmm = M300Dmm(safe_resource, self.config["m300_dmm"]) 369 else: 370 self.dmm = Dmm(safe_resource) 371 372 elif power_dmm_id is not None and power_dmm_id in idn: # DMM 373 logger.write_info_to_report("Found Power DMM") 374 self.power_dmm = Dmm(safe_resource) 375 376 elif any((match := korad)["part_id"] in idn for korad in self.config.get("korads", [])): 377 logger.write_info_to_report("Found KA6003P Power Supply") 378 new_korad = Korad(match["id"], safe_resource) 379 self.korads[new_korad.id] = new_korad 380 381 elif cell_model == "agilent" and any( 382 (match := cell)["part_id"] in idn for cell in config_cells 383 ): 384 logger.write_info_to_report("Found Agilent Cell Sim") 385 new_cell = Cell(match["id"], safe_resource, self.cell_chemistry) 386 new_cell.reset() 387 self.cells[new_cell.id] = new_cell 388 389 elif cell_model == "ngi" and "NGI,N83624" in idn: 390 logger.write_info_to_report("Found NGI Cell Sim") 391 cell_count = get_embed("cell_simulators", "cell_count", int) 392 for cell_id in range(1, cell_count + 1): 393 new_cell = NGICell(cell_id, safe_resource, self.cell_chemistry) 394 self.cells[new_cell.id] = new_cell 395 if cell_id == cell_count: 396 new_cell.reset() 397 398 elif cell_model == "chroma" and "Chroma,87001" in idn: 399 logger.write_info_to_report("Found Chroma Cell Sim") 400 new_cell = None 401 for cell_id, cell in enumerate(config_cells): 402 if "cell_slot" in cell: 403 new_cell = ChromaCell( 404 cell_id, safe_resource, self.cell_chemistry, cell["cell_slot"] 405 ) 406 self.cells[new_cell.id] = new_cell 407 if new_cell: 408 new_cell.reset() # FIXME(JA): does this cause problems for the other side if running? 409 410 else: 411 logger.write_warning_to_report("Unknown Resource") 412 413 break 414 415 logger.write_warning_to_report(f"Failed to connect to: {resource_address}") 416 logger.write_debug_to_report("Attempting higher baud rate") 417 418 if power_supply_id is not None and self.charger is None: 419 raise ResourceNotFoundError("Power Supply Not Found") 420 421 if load_id is not None and self.load is None: 422 raise ResourceNotFoundError("Load Simulator Not Found") 423 424 if dmm_id is not None and self.dmm is None: 425 raise ResourceNotFoundError("DMM Not Found") 426 427 if power_dmm_id is not None and self.power_dmm is None: 428 raise ResourceNotFoundError("Power DMM Not Found") 429 430 if len(self.korads) < len(self.config.get("korads", [])): 431 for korad in self.config["korads"]: 432 if korad["id"] not in self.korads: 433 raise ResourceNotFoundError(f'Unable to locate Korad {korad["id"]}') 434 self.korads = dict(sorted(self.korads.items())) # Sort korads by id 435 436 if cell_model == "agilent" and len(self.cells) < len(config_cells): 437 for cell in config_cells: 438 if cell["id"] not in self.cells: 439 raise ResourceNotFoundError(f'Unable to locate Cell {cell["id"]} Simulator') 440 if cell_model == "ngi" and len(self.cells) == 0: 441 raise ResourceNotFoundError("Unable to locate NGI Cell Simulator") 442 if cell_model == "chroma" and len(self.cells) == 0: 443 raise ResourceNotFoundError("Unable to locate Chroma Cell Simulator") 444 self.cells = dict(sorted(self.cells.items())) # Sort cells by id 445 446 # Now go through the ThermoCouples 447 for thermo_couple in self.config.get("thermo_couples", []): 448 new_thermo_couple = ThermoCouple(thermo_couple["id"], thermo_couple["board"], thermo_couple["channel"]) 449 self.thermo_couples[new_thermo_couple.thermocouple_id] = new_thermo_couple 450 451 logger.write_debug_to_report("Setting up SIGCHLD Handler") # Must be set up afterward due to VISA raising it 452 signal.signal(signal.SIGCHLD, SafeShutdown.handler) 453 454 # FIXME(JA): move into its own module, use exceptions 455 456 @property 457 def temperature(self) -> float: 458 """Temperature measurement used by run cycles.""" 459 return float(self.thermo_couples[1].temperature) if 1 in self.thermo_couples else self.plateset.temperature 460 461 def run_li_charge_cycle(self, digits=6): 462 """Run a Li charge cycle.""" 463 if self.voltage == 0: 464 raise ValueLogError("Charging Voltage Invalid") 465 466 if self.ov_protection == 0: 467 raise ValueLogError("Over-Voltage Protection Invalid") 468 469 if self.current == 0: 470 raise ValueLogError("Charge Current Invalid") 471 472 if self.termination_current == 0: 473 raise ValueLogError("Termination Current Invalid") 474 475 if self.max_time == 0: 476 raise ValueLogError("Max Time Invalid") 477 478 if self.sample_interval == 0: 479 raise ValueLogError("Charging Sample Time Invalid") 480 481 if self.minimum_readings == 0: 482 raise ValueLogError("Minimum Readings Invalid") 483 484 self.percent_discharge = -1 # For capacity calculations 485 486 # Create a queue to hold the previous n readings. This is used to avoid erroneous errors. 487 current_measurement_queue = deque(maxlen=self.minimum_readings) 488 489 logger.write_info_to_report( 490 f"Starting charge cycle (CV: {self.voltage}V, OV: {self.ov_protection}V, " 491 f"CI: {self.current}A, TI: {self.termination_current}A, MT: {self.max_time}s, " 492 f"SI: {self.sample_interval}s, MR: {self.minimum_readings}" 493 ")" 494 ) 495 496 with self.charger(self.voltage, self.current): 497 498 # Starting values for the while loop 499 latest_i = self.termination_current + 0.1 # placeholder for the latest charging current measurement 500 current_measurement_queue.append(latest_i) 501 amp_hrs = 0 # amp hours 502 latest_v = 0 # placeholder for the latest charging voltage measurement 503 elapsed_time = 0 504 temp_c = 0 505 506 self.timer.reset() # Keep track of runtime 507 508 # charge until termination current is reached or ov_protection or max_time 509 while any(i >= self.termination_current for i in current_measurement_queue): 510 if latest_v >= self.ov_protection: 511 self.charger.disable() 512 raise OverVoltageError( 513 f"Overvoltage protection triggered at {time.strftime('%x %X')}. " 514 f"Voltage {latest_v} is higher than {self.ov_protection}." 515 ) 516 if elapsed_time >= self.max_time: 517 self.charger.disable() 518 raise TimeoutExceededError( 519 f"Termination current of {self.termination_current}A was " 520 f"not reached after {self.max_time} seconds" 521 ) 522 if temp_c >= self.overtemp_c: 523 self.charger.disable() 524 raise OverTemperatureError( 525 f"Overtemperature protection triggered at {time.strftime('%x %X')}. " 526 f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C." 527 ) 528 529 UserInterrupt.check_user_interrupt(self.charger) # Pause if necessary, turning off charger relay 530 531 t1 = elapsed_time # previous elapsed time for calculating delta t 532 elapsed_time = round(self.timer.elapsed_time, digits) 533 dt = round(elapsed_time - t1, digits) # delta t 534 535 latest_v = self.dmm.volts 536 537 i1 = latest_i # previous current reading 538 latest_i = self.charger.amps 539 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 540 current_measurement_queue.append(latest_i) 541 542 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # milliamp hour calculation 543 temp_c = round(self.temperature, digits) # grab the temperature 544 545 logger.write_info_to_report( 546 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, " 547 f"Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}" 548 ) 549 550 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs) 551 time.sleep(self.sample_interval) 552 553 self.remaining_capacity_percentage = 100 554 555 def run_nicd_charge_cycle(self, digits=6): 556 """Run a NiCad charge cycle.""" 557 558 if self.nicd_cell_count == 0: 559 raise ValueLogError("Cell Count Invalid") 560 561 if self.max_time == 0: 562 raise ValueLogError("Charge Time Invalid") 563 564 if self.sample_interval == 0: 565 raise ValueLogError("Charge Sample Interval Invalid") 566 567 if self.current == 0: 568 raise ValueLogError("Charge Current Invalid") 569 570 if self.nicd_charge_type is NiCdChargeCycle.UNDEFINED: 571 raise ValueLogError("Charge Type Invalid") 572 573 self.percent_discharge = -1 # For capacity calculations 574 self.ov_protection = min(30.0, self.nicd_cell_count * 1.95) 575 576 logger.write_info_to_report( 577 f"Starting {self.nicd_charge_type.name.lower()} NiCd charge cycle (" 578 f"CV: {self.ov_protection}V, OV: {self.ov_protection}V, CI: {self.current}A, MT: {self.max_time}s, " 579 f"SI: {self.sample_interval}s)" 580 ) 581 582 self.charger.set_profile(self.ov_protection, self.current) 583 self.charger.enable() # Enables the output of the charger 584 585 # initialize charge control variables 586 latest_i = 0 587 amp_hrs = 0 588 latest_v = 0 589 elapsed_time = 0 590 max_v = 0 591 first_cycle = True 592 593 self.timer.reset() # Keep track of runtime 594 595 if self.nicd_charge_type in (NiCdChargeCycle.STANDARD, NiCdChargeCycle.CUSTOM): 596 while latest_v < self.ov_protection and elapsed_time < self.max_time: 597 UserInterrupt.check_user_interrupt(self.charger) # Pause if necessary, turning off charger relay 598 t1 = elapsed_time 599 elapsed_time = round(self.timer.elapsed_time, digits) 600 dt = round(elapsed_time - t1, digits) 601 602 # This section allows for fairly precise sample interval 603 if dt < self.sample_interval and not first_cycle: 604 while dt < self.sample_interval: 605 elapsed_time = round(self.timer.elapsed_time, digits) 606 dt = round(elapsed_time - t1, digits) # delta t 607 time.sleep(0.0005) 608 609 latest_v = self.dmm.volts 610 611 # keep track of the highest cell voltage seen 612 max_v = max(max_v, latest_v) 613 614 i1 = latest_i # previous current reading 615 latest_i = self.charger.amps # current reading 616 617 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 618 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 619 temp_c = round(self.temperature, digits) # grab the temperature 620 621 logger.write_info_to_report( 622 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, " 623 f"Ah: {amp_hrs}, Temp: {temp_c}" 624 ) 625 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs) 626 first_cycle = False 627 628 elif self.nicd_charge_type == NiCdChargeCycle.DV_DT: 629 dv = 0.005 * self.nicd_cell_count 630 while (max_v - latest_v) < dv and latest_v < self.ov_protection and elapsed_time < self.max_time: 631 UserInterrupt.check_user_interrupt(self.charger) # Pause if necessary, turning off charger relay 632 t1 = elapsed_time # previous elapsed time for calculating delta t 633 elapsed_time = round(self.timer.elapsed_time, digits) 634 dt = round(elapsed_time - t1, digits) # delta t 635 636 # This section allows for fairly precise sample interval 637 if dt < self.sample_interval and not first_cycle: 638 while dt < self.sample_interval: 639 elapsed_time = round(self.timer.elapsed_time, digits) 640 dt = round(elapsed_time - t1, digits) # delta t 641 time.sleep(0.0005) 642 643 latest_v = self.dmm.volts 644 max_v = max(max_v, latest_v) # keep track of highest cell voltage seen 645 i1 = latest_i # previous current reading 646 latest_i = self.charger.amps # current reading 647 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 648 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 649 temp_c = round(self.temperature, digits) # grab the temperature 650 651 logger.write_info_to_report( 652 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, " 653 f"Ah: {amp_hrs}, Temp: {temp_c}" 654 ) 655 first_cycle = False 656 657 self.charger.disable() # Charging is complete, turn off the charger 658 self.remaining_capacity_percentage = 100 659 660 logger.write_info_to_report(f"Charge Cycle complete, Ah: {amp_hrs}") 661 return amp_hrs 662 663 def run_discharge_step_cycle(self, digits=6): 664 """Run a discharge step cycle.""" 665 if self.max_time == 0: 666 raise ValueLogError("Max Time Invalid") 667 668 if self.current == 0: 669 raise ValueLogError("Discharge rate Invalid") 670 671 if self.sample_interval == 0: 672 raise ValueLogError("Sample Interval Invalid") 673 674 if self.uv_protection == 0: 675 raise ValueLogError("Undervoltage Protection Invalid") 676 677 if self.percent_discharge == 0: 678 raise ValueLogError("Percent Discharge Invalid") 679 680 if self.total_ah == 0: 681 raise ValueLogError("Total Ah Invalid") 682 683 delta_ah = self.total_ah * self.percent_discharge 684 # discharge_time = ((self.total_ah * self.percent_discharge) / self.discharge_i) * 3600 685 logger.write_info_to_report( 686 f"Starting discharge step cycle (" 687 f"DA: {delta_ah}Ah, UV: {self.uv_protection}V, DI: {self.current}A, " 688 f"PD: {self.percent_discharge * 100}%, TA: {self.total_ah}Ah, SI: {self.sample_interval}s, " 689 f"RC: {self.remaining_capacity_percentage}, MT: {self.max_time}, DV: {self.discharge_until_undervoltage})" 690 ) 691 692 self.load.mode_cc() 693 if self.current > 6: 694 self.load.amps_range = 60 # if current is > 6A set to high range 695 self.load.amps = self.current 696 self.load.enable() 697 698 # Place holder values 699 latest_i = 0 700 amp_hrs = 0 # mA hours 701 latest_v = self.uv_protection + 0.01 702 elapsed_time = 0 703 first_cycle = True 704 temp_c = 0 705 # resistance = 0 706 707 self.timer.reset() # Keep track of runtime 708 709 # discharge until Ah reached or uv_protection or max_time 710 while abs(amp_hrs) < delta_ah or self.discharge_until_undervoltage: 711 if latest_v <= self.uv_protection: 712 exception = UnderVoltageError( 713 f"Undervoltage protection triggered at {time.strftime('%x %X')}. " 714 f"Voltage {latest_v} is lower than {self.uv_protection}" 715 ) 716 if self.discharge_until_undervoltage: 717 logger.write_info_to_report(f"{type(exception).__name__} - Continuing test") 718 break 719 raise exception 720 721 if elapsed_time >= self.max_time: 722 raise TimeoutExceededError(f"Delta Ah of {delta_ah} was not reached after {self.max_time} seconds") 723 724 if temp_c >= self.overtemp_c: 725 raise OverTemperatureError( 726 f"Overtemperature protection triggered at {time.strftime('%x %X')}. " 727 f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C." 728 ) 729 730 t1 = elapsed_time # previous elapsed time for calculating delta t 731 elapsed_time = round(self.timer.elapsed_time, digits) 732 dt = round(elapsed_time - t1, digits) # delta t 733 734 # This section allows for fairly precise sample interval 735 if dt < self.sample_interval and not first_cycle: 736 while dt < self.sample_interval: 737 elapsed_time = round(self.timer.elapsed_time, digits) 738 dt = round(elapsed_time - t1, digits) # delta t 739 time.sleep(0.0005) 740 741 latest_v = self.dmm.volts 742 # load_v = self.load.volts # for measuring wiring harness resistance 743 744 i1 = latest_i # previous current reading 745 # load_ohms = self.load.ohms 746 latest_i = round(-1 * self.load.amps, digits) 747 748 # if not first_cycle: # skip the first cycle so that there are no divide by 0's 749 # # wire harness resistance 750 # resistance = round((latest_v - load_v) / (-1 * latest_i) + load_ohms, digits) 751 752 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 753 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 754 temp_c = round(self.temperature, digits) # grab the temperature 755 756 logger.write_info_to_report( 757 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}" 758 ) 759 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs) 760 first_cycle = False 761 762 self.load.disable() 763 self.remaining_capacity_percentage = round( 764 self.remaining_capacity_percentage - self.percent_discharge * 100, digits 765 ) 766 767 def run_resting_cycle(self): 768 """Run a resting cycle""" 769 # Guard against calls before init 770 assert self.dmm is not None and self.csv is not None 771 772 if self.max_time == 0: 773 raise ValueLogError("Resting time Invalid") 774 775 if self.sample_interval == 0: 776 raise ValueLogError("Resting sample interval Invalid") 777 778 if self.remaining_capacity_percentage < 0: 779 raise ValueLogError("Remaining capacity percentage Invalid") 780 781 # Place holder values 782 self.percent_discharge = 0 # For capacity calculations 783 elapsed_time = 0.0 784 first_cycle = True 785 786 self.timer.reset() # Keep track of runtime 787 788 logger.write_info_to_report(f"Starting rest cycle (RT: {self.max_time}, SI:{self.sample_interval})") 789 790 while elapsed_time < self.max_time: 791 UserInterrupt.check_user_interrupt() 792 793 t1 = elapsed_time # previous elapsed time for calculating delta t 794 elapsed_time = self.timer.elapsed_time 795 dt = elapsed_time - t1 # delta t 796 797 # This section allows for fairly precise sample interval 798 if dt < self.sample_interval and not first_cycle: 799 while dt < self.sample_interval: 800 elapsed_time = self.timer.elapsed_time 801 dt = elapsed_time - t1 # delta t 802 time.sleep(0.0005) 803 804 latest_v = self.dmm.volts 805 temp_c = self.temperature # grab the temperature 806 807 logger.write_info_to_report(f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Temp: {temp_c}") 808 self.csv.cycle.record(elapsed_time) 809 first_cycle = False 810 811 def run_discharge_cycle(self, digits=6): 812 """Run a discharge cycle.""" 813 814 if self.max_time == 0: 815 raise ValueLogError("Max Time Invalid") 816 817 if self.sample_interval == 0: 818 raise ValueLogError("Discharge Sample Interval Invalid") 819 820 if self.discharge_type == DischargeType.UNDEFINED: 821 raise ValueLogError("Discharge Mode Invalid") 822 823 if self.voltage == 0: 824 raise ValueLogError("Discharge Voltage Invalid") 825 826 if self.uv_protection == 0: 827 raise ValueLogError("Under Voltage Protection Invalid") 828 829 if self.discharge_type == DischargeType.CONSTANT_CURRENT: 830 # we're in current discharge mode, so make sure there is a valid current 831 if self.current == 0: 832 raise ValueLogError("Discharge Current mode selected, but Current is invalid") 833 834 elif self.discharge_type == DischargeType.CONSTANT_RESISTANCE: 835 # we're in resistance discharge mode so make sure there is a valid resistance. 836 if self.resistance == 0: 837 raise ValueLogError("Discharge Resistance mode selected, but Resistance is invalid") 838 839 logger.write_info_to_report( 840 f"Starting discharge cycle (" 841 f"DV: {self.voltage}V, UV: {self.uv_protection}V, DI: {self.current}A, DR: {self.resistance}Ohms, " 842 f"DM: {self.discharge_type}, MT: {self.max_time}s, SI: {self.sample_interval}s" 843 ")" 844 ) 845 846 with self.load(self.current, self.discharge_type, self.resistance): 847 848 self.percent_discharge = 1 # For capacity calculations 849 850 # Place holder values 851 latest_i = 0 852 resistance = 0 853 amp_hrs = 0 # Amp hours 854 latest_v = self.voltage + 0.01 855 elapsed_time = 0 856 first_run = True 857 temp_c = 0 858 859 self.timer.reset() # Keep track of runtime 860 kill_event = FileEvent("kill_discharge") 861 discharge_event = FileEvent("nicd_discharging") 862 discharge_event.set() 863 864 # discharge until discharge voltage is reached or uv_protection or max_time 865 while not (self.uv_protection < latest_v <= self.voltage or kill_event.is_set()): 866 if latest_v <= self.uv_protection: 867 self.load.disable() 868 raise UnderVoltageError( 869 f"Undervoltage protection triggered at {time.strftime('%x %X')}. " 870 f"Voltage {latest_v} is lower than {self.uv_protection}." 871 ) 872 if elapsed_time >= self.max_time: 873 self.load.disable() 874 raise TimeoutExceededError( 875 f"Voltage of {self.voltage} was not reached after {self.max_time} seconds" 876 ) 877 if temp_c >= self.overtemp_c: 878 self.load.disable() 879 raise OverTemperatureError( 880 f"Overtemperature protection triggered at {time.strftime('%x %X')}. " 881 f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C." 882 ) 883 884 UserInterrupt.check_user_interrupt(self.load) # Pause if necessary, turning off load relay 885 886 t1 = elapsed_time # previous elapsed time for calculating delta t 887 elapsed_time = round(self.timer.elapsed_time, digits) 888 dt = round(elapsed_time - t1, digits) # delta t 889 890 # This section allows for fairly precise sample interval 891 if dt < self.sample_interval and not first_run: 892 while dt < self.sample_interval: 893 elapsed_time = round(self.timer.elapsed_time, digits) 894 dt = round(elapsed_time - t1, digits) # delta t 895 time.sleep(0.0005) 896 897 latest_v = self.dmm.volts 898 i1 = latest_i # previous current reading 899 load_ohms = self.load.ohms 900 latest_i = round(-1 * self.load.amps, digits) 901 902 # skip the first cycle so that there are no divide by 0's 903 if not first_run and self.discharge_type == DischargeType.CONSTANT_RESISTANCE: 904 load_v = self.load.volts # for measuring wiring harness resistance 905 try: 906 resistance = (latest_v - load_v) / (-1 * latest_i) + load_ohms # wire harness resistance 907 except ZeroDivisionError: 908 logger.write_error_to_report( 909 "Zero division when calculating resistance. " 910 f"Load Voltage: {load_v}, Current: {latest_i}, Resistance: {load_ohms}." 911 ) 912 913 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 914 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 915 temp_c = round(self.temperature, digits) # grab the temperature 916 917 logger.write_info_to_report( 918 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, " 919 f"Resistance: {resistance}, Temp: {temp_c}" 920 ) 921 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs, resistance) 922 first_run = False 923 924 self.remaining_capacity_percentage = 0 925 return abs(amp_hrs) # Return a positive Ah 926 927 def delay(self, seconds: int | float): 928 """Sleep that counts towards elapsed time.""" 929 time.sleep(seconds) 930 931 def pause_test(self, seconds_to_pause: float = 0, *relays: ChargerOrLoad): 932 """Pause the test for some amount of seconds.""" 933 for relay in relays: 934 relay.disable() 935 if seconds_to_pause == 0: # Wait for user input to continue 936 logger.write_info_to_report("Pausing: waiting for user input") 937 statuscode = 0xFFFF 938 while not os.WIFEXITED(statuscode): # process terminated normally? 939 statuscode = os.system( 940 '/bin/bash -c \'select CHOICE in "Type CONTINUE to resume test"; do ' 941 'if [ "$REPLY" = "CONTINUE" ]; then break; fi; done\'' 942 ) 943 else: 944 logger.write_info_to_report(f"Pausing test (S: {seconds_to_pause})") 945 time.sleep(seconds_to_pause) 946 for relay in relays: 947 relay.enable()
The main API for accessing all hardware.
192 def __init__(self, flags: BMSFlags | None = None): # TODO(JA): document attributes 193 """Initialize the BMS hardware.""" 194 if flags is None: 195 return 196 197 # Battery attributes 198 self.remaining_capacity_percentage = 0.0 199 self.battery_type: BatteryType = BatteryType.UNDEFINED # Initialize to unknown 200 self.max_time = 0.0 201 self.sample_interval = 0.0 202 self.voltage = 0.0 203 self.current = 0.0 204 self.current_limit = 10.0 205 self.overtemp_c = OVERTEMP_C 206 207 # Charging attributes 208 self.minimum_readings = 0 # How many readings to take before passing (for erroneous readings). 209 self.nicd_charge_type: NiCdChargeCycle = NiCdChargeCycle.UNDEFINED 210 self.nicd_cell_count = 0 211 self.termination_current = 0.0 212 self.ov_protection = 0.0 213 214 # Discharging attributes 215 self.resistance = 0.0 216 self.discharge_type: DischargeType = DischargeType.UNDEFINED 217 self.discharge_until_undervoltage = False 218 self.percent_discharge = 0.0 219 self.total_ah = 0.0 220 self.uv_protection = 0.0 221 222 # Hardware attributes 223 self.charger: Charger | None = None 224 self.load: Load | None = None 225 self.dmm: Dmm | M300Dmm | None = None 226 self.power_dmm: Dmm | None = None 227 self.korads: dict[int, Korad] = {} 228 self.thermal_chamber = pseudo_hardware.ThermalChamber() if flags.dry_run else ThermalChamber() 229 self.plateset = Plateset() 230 self.plateset_id = flags.plateset_id 231 self.thermo_couples: dict[int, ThermoCouple] = {} 232 self.cells: dict[int, Cell | NGICell | ChromaCell] = {} 233 self.cell_chemistry = flags.cell_chemistry 234 self.dry_run = flags.dry_run 235 236 # Track total test time 237 self.timer = StopWatch() 238 239 # Logging / Documentation attributes 240 self.config = flags.config 241 self.doc_generation = flags.doc_generation 242 self.report_filename = flags.report_filename 243 self.csv: CSVRecorders = CSVRecorders(cast(hitl_tester.modules.bms.bms_hw.BMSHardware, self)) # Must occur last 244 245 # Modify globals of caller 246 assert isinstance(current_frame := inspect.currentframe(), FrameType) 247 assert isinstance(caller := current_frame.f_back, FrameType) 248 if flags.properties: 249 for global_var, value in flags.properties.items(): 250 caller.f_globals[global_var] = value # Globals must occur before init is called
Initialize the BMS hardware.
252 def init(self): 253 """Initialize the BMSHardware object.""" 254 if self.initialized: 255 return 256 self.initialized = True 257 258 def get_embed(header: str, key: str, result_type: type) -> Any: 259 """get embedded value from config (i.e. value in dict in list).""" 260 default_item: list[str | dict[str, Any]] = [] 261 items = self.config.get(header, default_item) 262 if isinstance(items, list): 263 for item in items: 264 if isinstance(item, dict) and (result := item.get(key)): 265 try: 266 return result_type(result) 267 except (ValueError, TypeError) as exc: 268 raise RuntimeError( 269 f"Expected {result_type.__name__} type, got {type(result).__name__} type" 270 ) from exc 271 raise RuntimeError(f"Could not find {key} key in {header}") 272 273 if self.doc_generation: # Skip if generating docs 274 return 275 276 # Make sure the charger and the load switches are OFF before doing any testing 277 logger.write_info_to_report("Turning off charger and load switch") 278 self.plateset.charger_switch = False 279 self.plateset.load_switch = False 280 281 # setup some signals to a signal handler so that we can gracefully 282 # shutdown in the event that test is somehow interrupted 283 logger.write_debug_to_report("Setting up Safe Shutdown Handlers") 284 for signal_flag in SafeShutdown.SIGNALS: 285 signal.signal(signal_flag, SafeShutdown.handler) 286 287 # Set up signal terminal stop handler (CTRL-Z) 288 UserInterrupt.init() 289 signal.signal(signal.SIGTSTP, UserInterrupt.handler) 290 291 # FIXME(JA): device classes should manage their own configs/initializations 292 # FIXME(JA): don't assume config is correct 293 294 logger.write_info_to_report("Polling hardware") 295 if self.dry_run: 296 if self.config.get("power_supply_id", "") is not None: 297 logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DP711,DP7A252000573,00.01.05") 298 logger.write_info_to_report("Found DP711 Power Supply") 299 self.charger = pseudo_hardware.Charger() 300 301 if self.config.get("load_id", "") is not None: 302 logger.write_debug_to_report("Resource ID: RIGOL TECHNOLOGIES,DL3031A,DL3D243000300,00.01.05.00.01") 303 logger.write_info_to_report("Found DL3031A Electronic load") 304 self.load = pseudo_hardware.Load() 305 306 if self.config.get("dmm_id", "") is not None: 307 logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00") 308 logger.write_info_to_report("Found DM3068 DMM") 309 self.dmm = pseudo_hardware.Dmm() 310 311 if self.config.get("m300_dmm") is not None: 312 logger.write_debug_to_report("Resource ID: Rigol Technologies,DM3068,DM3O252000861,01.01.00.01.11.00") 313 logger.write_info_to_report("Found DM3068 DMM") 314 self.dmm = pseudo_hardware.M300Dmm(self.config["m300_dmm"]) 315 316 for korad in self.config.get("korads", []): 317 logger.write_debug_to_report(f"Resource ID: KORAD, {korad['part_id']}") 318 logger.write_info_to_report("Found Korad") 319 new_korad = pseudo_hardware.Korad(korad["id"]) 320 self.korads[new_korad.id] = new_korad 321 322 cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else None 323 if cell_model == "agilent": 324 for cell in self.config.get("cell_simulators", []): 325 if "type" not in cell: 326 logger.write_debug_to_report("Resource ID: Cell Sim, 04.08.15.16.23.42") 327 logger.write_info_to_report("Found Cell Sim") 328 new_cell = pseudo_hardware.Cell(cell["id"], self.cell_chemistry) 329 self.cells[new_cell.id] = new_cell 330 331 else: 332 resource_manager = visa.ResourceManager() 333 resources = resource_manager.list_resources() 334 if not resources: 335 raise ResourceNotFoundError("No PyVISA resources found") 336 337 power_supply_id = self.config.get("power_supply_id", "RIGOL TECHNOLOGIES,DP711") 338 load_id = self.config.get("load_id", "RIGOL TECHNOLOGIES,DL3031A") 339 dmm_id = self.config.get("dmm_id", "Rigol Technologies,DM3068") 340 power_dmm_id = self.config.get("power_dmm_id") 341 if "m300_dmm" in self.config: 342 dmm_id = "M300" 343 344 cell_model = get_embed("cell_simulators", "type", str) if self.config.get("cell_simulators") else "" 345 if cell_model == "chroma": # Add chroma to resources 346 address = get_embed("cell_simulators", "address", str) 347 resources = (*resources, (f"TCPIP::{address}::60000::SOCKET", "\n", "\n")) 348 config_cells = [cell for cell in self.config.get("cell_simulators", []) if "type" not in cell] 349 350 for resource_address in resources: 351 safe_resource = SafeResource(resource_address) 352 for baud_rate in (9600, 115200): 353 safe_resource.baud_rate = baud_rate 354 if (idn := safe_resource.query("*IDN?")) != safe_resource.default_result: 355 logger.write_debug_to_report(f"Resource ID: {idn.strip(chr(10))}") 356 357 if power_supply_id is not None and power_supply_id in idn: # Power supply 358 logger.write_info_to_report("Found DP711 Power Supply") 359 self.charger = Charger(safe_resource) 360 361 elif load_id is not None and load_id in idn: # Electronic load 362 logger.write_info_to_report("Found DL3031A Electronic load") 363 self.load = Load(safe_resource) 364 365 elif dmm_id is not None and dmm_id in idn: # DMM 366 logger.write_info_to_report("Found DMM") 367 if "m300_dmm" in self.config: 368 self.dmm = M300Dmm(safe_resource, self.config["m300_dmm"]) 369 else: 370 self.dmm = Dmm(safe_resource) 371 372 elif power_dmm_id is not None and power_dmm_id in idn: # DMM 373 logger.write_info_to_report("Found Power DMM") 374 self.power_dmm = Dmm(safe_resource) 375 376 elif any((match := korad)["part_id"] in idn for korad in self.config.get("korads", [])): 377 logger.write_info_to_report("Found KA6003P Power Supply") 378 new_korad = Korad(match["id"], safe_resource) 379 self.korads[new_korad.id] = new_korad 380 381 elif cell_model == "agilent" and any( 382 (match := cell)["part_id"] in idn for cell in config_cells 383 ): 384 logger.write_info_to_report("Found Agilent Cell Sim") 385 new_cell = Cell(match["id"], safe_resource, self.cell_chemistry) 386 new_cell.reset() 387 self.cells[new_cell.id] = new_cell 388 389 elif cell_model == "ngi" and "NGI,N83624" in idn: 390 logger.write_info_to_report("Found NGI Cell Sim") 391 cell_count = get_embed("cell_simulators", "cell_count", int) 392 for cell_id in range(1, cell_count + 1): 393 new_cell = NGICell(cell_id, safe_resource, self.cell_chemistry) 394 self.cells[new_cell.id] = new_cell 395 if cell_id == cell_count: 396 new_cell.reset() 397 398 elif cell_model == "chroma" and "Chroma,87001" in idn: 399 logger.write_info_to_report("Found Chroma Cell Sim") 400 new_cell = None 401 for cell_id, cell in enumerate(config_cells): 402 if "cell_slot" in cell: 403 new_cell = ChromaCell( 404 cell_id, safe_resource, self.cell_chemistry, cell["cell_slot"] 405 ) 406 self.cells[new_cell.id] = new_cell 407 if new_cell: 408 new_cell.reset() # FIXME(JA): does this cause problems for the other side if running? 409 410 else: 411 logger.write_warning_to_report("Unknown Resource") 412 413 break 414 415 logger.write_warning_to_report(f"Failed to connect to: {resource_address}") 416 logger.write_debug_to_report("Attempting higher baud rate") 417 418 if power_supply_id is not None and self.charger is None: 419 raise ResourceNotFoundError("Power Supply Not Found") 420 421 if load_id is not None and self.load is None: 422 raise ResourceNotFoundError("Load Simulator Not Found") 423 424 if dmm_id is not None and self.dmm is None: 425 raise ResourceNotFoundError("DMM Not Found") 426 427 if power_dmm_id is not None and self.power_dmm is None: 428 raise ResourceNotFoundError("Power DMM Not Found") 429 430 if len(self.korads) < len(self.config.get("korads", [])): 431 for korad in self.config["korads"]: 432 if korad["id"] not in self.korads: 433 raise ResourceNotFoundError(f'Unable to locate Korad {korad["id"]}') 434 self.korads = dict(sorted(self.korads.items())) # Sort korads by id 435 436 if cell_model == "agilent" and len(self.cells) < len(config_cells): 437 for cell in config_cells: 438 if cell["id"] not in self.cells: 439 raise ResourceNotFoundError(f'Unable to locate Cell {cell["id"]} Simulator') 440 if cell_model == "ngi" and len(self.cells) == 0: 441 raise ResourceNotFoundError("Unable to locate NGI Cell Simulator") 442 if cell_model == "chroma" and len(self.cells) == 0: 443 raise ResourceNotFoundError("Unable to locate Chroma Cell Simulator") 444 self.cells = dict(sorted(self.cells.items())) # Sort cells by id 445 446 # Now go through the ThermoCouples 447 for thermo_couple in self.config.get("thermo_couples", []): 448 new_thermo_couple = ThermoCouple(thermo_couple["id"], thermo_couple["board"], thermo_couple["channel"]) 449 self.thermo_couples[new_thermo_couple.thermocouple_id] = new_thermo_couple 450 451 logger.write_debug_to_report("Setting up SIGCHLD Handler") # Must be set up afterward due to VISA raising it 452 signal.signal(signal.SIGCHLD, SafeShutdown.handler)
Initialize the BMSHardware object.
456 @property 457 def temperature(self) -> float: 458 """Temperature measurement used by run cycles.""" 459 return float(self.thermo_couples[1].temperature) if 1 in self.thermo_couples else self.plateset.temperature
Temperature measurement used by run cycles.
461 def run_li_charge_cycle(self, digits=6): 462 """Run a Li charge cycle.""" 463 if self.voltage == 0: 464 raise ValueLogError("Charging Voltage Invalid") 465 466 if self.ov_protection == 0: 467 raise ValueLogError("Over-Voltage Protection Invalid") 468 469 if self.current == 0: 470 raise ValueLogError("Charge Current Invalid") 471 472 if self.termination_current == 0: 473 raise ValueLogError("Termination Current Invalid") 474 475 if self.max_time == 0: 476 raise ValueLogError("Max Time Invalid") 477 478 if self.sample_interval == 0: 479 raise ValueLogError("Charging Sample Time Invalid") 480 481 if self.minimum_readings == 0: 482 raise ValueLogError("Minimum Readings Invalid") 483 484 self.percent_discharge = -1 # For capacity calculations 485 486 # Create a queue to hold the previous n readings. This is used to avoid erroneous errors. 487 current_measurement_queue = deque(maxlen=self.minimum_readings) 488 489 logger.write_info_to_report( 490 f"Starting charge cycle (CV: {self.voltage}V, OV: {self.ov_protection}V, " 491 f"CI: {self.current}A, TI: {self.termination_current}A, MT: {self.max_time}s, " 492 f"SI: {self.sample_interval}s, MR: {self.minimum_readings}" 493 ")" 494 ) 495 496 with self.charger(self.voltage, self.current): 497 498 # Starting values for the while loop 499 latest_i = self.termination_current + 0.1 # placeholder for the latest charging current measurement 500 current_measurement_queue.append(latest_i) 501 amp_hrs = 0 # amp hours 502 latest_v = 0 # placeholder for the latest charging voltage measurement 503 elapsed_time = 0 504 temp_c = 0 505 506 self.timer.reset() # Keep track of runtime 507 508 # charge until termination current is reached or ov_protection or max_time 509 while any(i >= self.termination_current for i in current_measurement_queue): 510 if latest_v >= self.ov_protection: 511 self.charger.disable() 512 raise OverVoltageError( 513 f"Overvoltage protection triggered at {time.strftime('%x %X')}. " 514 f"Voltage {latest_v} is higher than {self.ov_protection}." 515 ) 516 if elapsed_time >= self.max_time: 517 self.charger.disable() 518 raise TimeoutExceededError( 519 f"Termination current of {self.termination_current}A was " 520 f"not reached after {self.max_time} seconds" 521 ) 522 if temp_c >= self.overtemp_c: 523 self.charger.disable() 524 raise OverTemperatureError( 525 f"Overtemperature protection triggered at {time.strftime('%x %X')}. " 526 f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C." 527 ) 528 529 UserInterrupt.check_user_interrupt(self.charger) # Pause if necessary, turning off charger relay 530 531 t1 = elapsed_time # previous elapsed time for calculating delta t 532 elapsed_time = round(self.timer.elapsed_time, digits) 533 dt = round(elapsed_time - t1, digits) # delta t 534 535 latest_v = self.dmm.volts 536 537 i1 = latest_i # previous current reading 538 latest_i = self.charger.amps 539 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 540 current_measurement_queue.append(latest_i) 541 542 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # milliamp hour calculation 543 temp_c = round(self.temperature, digits) # grab the temperature 544 545 logger.write_info_to_report( 546 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, " 547 f"Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}" 548 ) 549 550 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs) 551 time.sleep(self.sample_interval) 552 553 self.remaining_capacity_percentage = 100
Run a Li charge cycle.
555 def run_nicd_charge_cycle(self, digits=6): 556 """Run a NiCad charge cycle.""" 557 558 if self.nicd_cell_count == 0: 559 raise ValueLogError("Cell Count Invalid") 560 561 if self.max_time == 0: 562 raise ValueLogError("Charge Time Invalid") 563 564 if self.sample_interval == 0: 565 raise ValueLogError("Charge Sample Interval Invalid") 566 567 if self.current == 0: 568 raise ValueLogError("Charge Current Invalid") 569 570 if self.nicd_charge_type is NiCdChargeCycle.UNDEFINED: 571 raise ValueLogError("Charge Type Invalid") 572 573 self.percent_discharge = -1 # For capacity calculations 574 self.ov_protection = min(30.0, self.nicd_cell_count * 1.95) 575 576 logger.write_info_to_report( 577 f"Starting {self.nicd_charge_type.name.lower()} NiCd charge cycle (" 578 f"CV: {self.ov_protection}V, OV: {self.ov_protection}V, CI: {self.current}A, MT: {self.max_time}s, " 579 f"SI: {self.sample_interval}s)" 580 ) 581 582 self.charger.set_profile(self.ov_protection, self.current) 583 self.charger.enable() # Enables the output of the charger 584 585 # initialize charge control variables 586 latest_i = 0 587 amp_hrs = 0 588 latest_v = 0 589 elapsed_time = 0 590 max_v = 0 591 first_cycle = True 592 593 self.timer.reset() # Keep track of runtime 594 595 if self.nicd_charge_type in (NiCdChargeCycle.STANDARD, NiCdChargeCycle.CUSTOM): 596 while latest_v < self.ov_protection and elapsed_time < self.max_time: 597 UserInterrupt.check_user_interrupt(self.charger) # Pause if necessary, turning off charger relay 598 t1 = elapsed_time 599 elapsed_time = round(self.timer.elapsed_time, digits) 600 dt = round(elapsed_time - t1, digits) 601 602 # This section allows for fairly precise sample interval 603 if dt < self.sample_interval and not first_cycle: 604 while dt < self.sample_interval: 605 elapsed_time = round(self.timer.elapsed_time, digits) 606 dt = round(elapsed_time - t1, digits) # delta t 607 time.sleep(0.0005) 608 609 latest_v = self.dmm.volts 610 611 # keep track of the highest cell voltage seen 612 max_v = max(max_v, latest_v) 613 614 i1 = latest_i # previous current reading 615 latest_i = self.charger.amps # current reading 616 617 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 618 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 619 temp_c = round(self.temperature, digits) # grab the temperature 620 621 logger.write_info_to_report( 622 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, " 623 f"Ah: {amp_hrs}, Temp: {temp_c}" 624 ) 625 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs) 626 first_cycle = False 627 628 elif self.nicd_charge_type == NiCdChargeCycle.DV_DT: 629 dv = 0.005 * self.nicd_cell_count 630 while (max_v - latest_v) < dv and latest_v < self.ov_protection and elapsed_time < self.max_time: 631 UserInterrupt.check_user_interrupt(self.charger) # Pause if necessary, turning off charger relay 632 t1 = elapsed_time # previous elapsed time for calculating delta t 633 elapsed_time = round(self.timer.elapsed_time, digits) 634 dt = round(elapsed_time - t1, digits) # delta t 635 636 # This section allows for fairly precise sample interval 637 if dt < self.sample_interval and not first_cycle: 638 while dt < self.sample_interval: 639 elapsed_time = round(self.timer.elapsed_time, digits) 640 dt = round(elapsed_time - t1, digits) # delta t 641 time.sleep(0.0005) 642 643 latest_v = self.dmm.volts 644 max_v = max(max_v, latest_v) # keep track of highest cell voltage seen 645 i1 = latest_i # previous current reading 646 latest_i = self.charger.amps # current reading 647 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 648 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 649 temp_c = round(self.temperature, digits) # grab the temperature 650 651 logger.write_info_to_report( 652 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, " 653 f"Ah: {amp_hrs}, Temp: {temp_c}" 654 ) 655 first_cycle = False 656 657 self.charger.disable() # Charging is complete, turn off the charger 658 self.remaining_capacity_percentage = 100 659 660 logger.write_info_to_report(f"Charge Cycle complete, Ah: {amp_hrs}") 661 return amp_hrs
Run a NiCad charge cycle.
663 def run_discharge_step_cycle(self, digits=6): 664 """Run a discharge step cycle.""" 665 if self.max_time == 0: 666 raise ValueLogError("Max Time Invalid") 667 668 if self.current == 0: 669 raise ValueLogError("Discharge rate Invalid") 670 671 if self.sample_interval == 0: 672 raise ValueLogError("Sample Interval Invalid") 673 674 if self.uv_protection == 0: 675 raise ValueLogError("Undervoltage Protection Invalid") 676 677 if self.percent_discharge == 0: 678 raise ValueLogError("Percent Discharge Invalid") 679 680 if self.total_ah == 0: 681 raise ValueLogError("Total Ah Invalid") 682 683 delta_ah = self.total_ah * self.percent_discharge 684 # discharge_time = ((self.total_ah * self.percent_discharge) / self.discharge_i) * 3600 685 logger.write_info_to_report( 686 f"Starting discharge step cycle (" 687 f"DA: {delta_ah}Ah, UV: {self.uv_protection}V, DI: {self.current}A, " 688 f"PD: {self.percent_discharge * 100}%, TA: {self.total_ah}Ah, SI: {self.sample_interval}s, " 689 f"RC: {self.remaining_capacity_percentage}, MT: {self.max_time}, DV: {self.discharge_until_undervoltage})" 690 ) 691 692 self.load.mode_cc() 693 if self.current > 6: 694 self.load.amps_range = 60 # if current is > 6A set to high range 695 self.load.amps = self.current 696 self.load.enable() 697 698 # Place holder values 699 latest_i = 0 700 amp_hrs = 0 # mA hours 701 latest_v = self.uv_protection + 0.01 702 elapsed_time = 0 703 first_cycle = True 704 temp_c = 0 705 # resistance = 0 706 707 self.timer.reset() # Keep track of runtime 708 709 # discharge until Ah reached or uv_protection or max_time 710 while abs(amp_hrs) < delta_ah or self.discharge_until_undervoltage: 711 if latest_v <= self.uv_protection: 712 exception = UnderVoltageError( 713 f"Undervoltage protection triggered at {time.strftime('%x %X')}. " 714 f"Voltage {latest_v} is lower than {self.uv_protection}" 715 ) 716 if self.discharge_until_undervoltage: 717 logger.write_info_to_report(f"{type(exception).__name__} - Continuing test") 718 break 719 raise exception 720 721 if elapsed_time >= self.max_time: 722 raise TimeoutExceededError(f"Delta Ah of {delta_ah} was not reached after {self.max_time} seconds") 723 724 if temp_c >= self.overtemp_c: 725 raise OverTemperatureError( 726 f"Overtemperature protection triggered at {time.strftime('%x %X')}. " 727 f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C." 728 ) 729 730 t1 = elapsed_time # previous elapsed time for calculating delta t 731 elapsed_time = round(self.timer.elapsed_time, digits) 732 dt = round(elapsed_time - t1, digits) # delta t 733 734 # This section allows for fairly precise sample interval 735 if dt < self.sample_interval and not first_cycle: 736 while dt < self.sample_interval: 737 elapsed_time = round(self.timer.elapsed_time, digits) 738 dt = round(elapsed_time - t1, digits) # delta t 739 time.sleep(0.0005) 740 741 latest_v = self.dmm.volts 742 # load_v = self.load.volts # for measuring wiring harness resistance 743 744 i1 = latest_i # previous current reading 745 # load_ohms = self.load.ohms 746 latest_i = round(-1 * self.load.amps, digits) 747 748 # if not first_cycle: # skip the first cycle so that there are no divide by 0's 749 # # wire harness resistance 750 # resistance = round((latest_v - load_v) / (-1 * latest_i) + load_ohms, digits) 751 752 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 753 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 754 temp_c = round(self.temperature, digits) # grab the temperature 755 756 logger.write_info_to_report( 757 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, Temp: {temp_c}" 758 ) 759 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs) 760 first_cycle = False 761 762 self.load.disable() 763 self.remaining_capacity_percentage = round( 764 self.remaining_capacity_percentage - self.percent_discharge * 100, digits 765 )
Run a discharge step cycle.
767 def run_resting_cycle(self): 768 """Run a resting cycle""" 769 # Guard against calls before init 770 assert self.dmm is not None and self.csv is not None 771 772 if self.max_time == 0: 773 raise ValueLogError("Resting time Invalid") 774 775 if self.sample_interval == 0: 776 raise ValueLogError("Resting sample interval Invalid") 777 778 if self.remaining_capacity_percentage < 0: 779 raise ValueLogError("Remaining capacity percentage Invalid") 780 781 # Place holder values 782 self.percent_discharge = 0 # For capacity calculations 783 elapsed_time = 0.0 784 first_cycle = True 785 786 self.timer.reset() # Keep track of runtime 787 788 logger.write_info_to_report(f"Starting rest cycle (RT: {self.max_time}, SI:{self.sample_interval})") 789 790 while elapsed_time < self.max_time: 791 UserInterrupt.check_user_interrupt() 792 793 t1 = elapsed_time # previous elapsed time for calculating delta t 794 elapsed_time = self.timer.elapsed_time 795 dt = elapsed_time - t1 # delta t 796 797 # This section allows for fairly precise sample interval 798 if dt < self.sample_interval and not first_cycle: 799 while dt < self.sample_interval: 800 elapsed_time = self.timer.elapsed_time 801 dt = elapsed_time - t1 # delta t 802 time.sleep(0.0005) 803 804 latest_v = self.dmm.volts 805 temp_c = self.temperature # grab the temperature 806 807 logger.write_info_to_report(f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Temp: {temp_c}") 808 self.csv.cycle.record(elapsed_time) 809 first_cycle = False
Run a resting cycle
811 def run_discharge_cycle(self, digits=6): 812 """Run a discharge cycle.""" 813 814 if self.max_time == 0: 815 raise ValueLogError("Max Time Invalid") 816 817 if self.sample_interval == 0: 818 raise ValueLogError("Discharge Sample Interval Invalid") 819 820 if self.discharge_type == DischargeType.UNDEFINED: 821 raise ValueLogError("Discharge Mode Invalid") 822 823 if self.voltage == 0: 824 raise ValueLogError("Discharge Voltage Invalid") 825 826 if self.uv_protection == 0: 827 raise ValueLogError("Under Voltage Protection Invalid") 828 829 if self.discharge_type == DischargeType.CONSTANT_CURRENT: 830 # we're in current discharge mode, so make sure there is a valid current 831 if self.current == 0: 832 raise ValueLogError("Discharge Current mode selected, but Current is invalid") 833 834 elif self.discharge_type == DischargeType.CONSTANT_RESISTANCE: 835 # we're in resistance discharge mode so make sure there is a valid resistance. 836 if self.resistance == 0: 837 raise ValueLogError("Discharge Resistance mode selected, but Resistance is invalid") 838 839 logger.write_info_to_report( 840 f"Starting discharge cycle (" 841 f"DV: {self.voltage}V, UV: {self.uv_protection}V, DI: {self.current}A, DR: {self.resistance}Ohms, " 842 f"DM: {self.discharge_type}, MT: {self.max_time}s, SI: {self.sample_interval}s" 843 ")" 844 ) 845 846 with self.load(self.current, self.discharge_type, self.resistance): 847 848 self.percent_discharge = 1 # For capacity calculations 849 850 # Place holder values 851 latest_i = 0 852 resistance = 0 853 amp_hrs = 0 # Amp hours 854 latest_v = self.voltage + 0.01 855 elapsed_time = 0 856 first_run = True 857 temp_c = 0 858 859 self.timer.reset() # Keep track of runtime 860 kill_event = FileEvent("kill_discharge") 861 discharge_event = FileEvent("nicd_discharging") 862 discharge_event.set() 863 864 # discharge until discharge voltage is reached or uv_protection or max_time 865 while not (self.uv_protection < latest_v <= self.voltage or kill_event.is_set()): 866 if latest_v <= self.uv_protection: 867 self.load.disable() 868 raise UnderVoltageError( 869 f"Undervoltage protection triggered at {time.strftime('%x %X')}. " 870 f"Voltage {latest_v} is lower than {self.uv_protection}." 871 ) 872 if elapsed_time >= self.max_time: 873 self.load.disable() 874 raise TimeoutExceededError( 875 f"Voltage of {self.voltage} was not reached after {self.max_time} seconds" 876 ) 877 if temp_c >= self.overtemp_c: 878 self.load.disable() 879 raise OverTemperatureError( 880 f"Overtemperature protection triggered at {time.strftime('%x %X')}. " 881 f"Temperature {temp_c}°C is higher than {self.overtemp_c}°C." 882 ) 883 884 UserInterrupt.check_user_interrupt(self.load) # Pause if necessary, turning off load relay 885 886 t1 = elapsed_time # previous elapsed time for calculating delta t 887 elapsed_time = round(self.timer.elapsed_time, digits) 888 dt = round(elapsed_time - t1, digits) # delta t 889 890 # This section allows for fairly precise sample interval 891 if dt < self.sample_interval and not first_run: 892 while dt < self.sample_interval: 893 elapsed_time = round(self.timer.elapsed_time, digits) 894 dt = round(elapsed_time - t1, digits) # delta t 895 time.sleep(0.0005) 896 897 latest_v = self.dmm.volts 898 i1 = latest_i # previous current reading 899 load_ohms = self.load.ohms 900 latest_i = round(-1 * self.load.amps, digits) 901 902 # skip the first cycle so that there are no divide by 0's 903 if not first_run and self.discharge_type == DischargeType.CONSTANT_RESISTANCE: 904 load_v = self.load.volts # for measuring wiring harness resistance 905 try: 906 resistance = (latest_v - load_v) / (-1 * latest_i) + load_ohms # wire harness resistance 907 except ZeroDivisionError: 908 logger.write_error_to_report( 909 "Zero division when calculating resistance. " 910 f"Load Voltage: {load_v}, Current: {latest_i}, Resistance: {load_ohms}." 911 ) 912 913 i_avg = round((i1 + latest_i) / 2, digits) # average current of the last 2 readings 914 amp_hrs = round(amp_hrs + (i_avg * dt) / 3600, digits) # amp hour calculation 915 temp_c = round(self.temperature, digits) # grab the temperature 916 917 logger.write_info_to_report( 918 f"Elapsed Time: {elapsed_time}, Voltage: {latest_v}, Current: {latest_i}, Ah: {amp_hrs}, " 919 f"Resistance: {resistance}, Temp: {temp_c}" 920 ) 921 self.csv.cycle.record(elapsed_time, latest_i, amp_hrs, resistance) 922 first_run = False 923 924 self.remaining_capacity_percentage = 0 925 return abs(amp_hrs) # Return a positive Ah
Run a discharge cycle.
927 def delay(self, seconds: int | float): 928 """Sleep that counts towards elapsed time.""" 929 time.sleep(seconds)
Sleep that counts towards elapsed time.
931 def pause_test(self, seconds_to_pause: float = 0, *relays: ChargerOrLoad): 932 """Pause the test for some amount of seconds.""" 933 for relay in relays: 934 relay.disable() 935 if seconds_to_pause == 0: # Wait for user input to continue 936 logger.write_info_to_report("Pausing: waiting for user input") 937 statuscode = 0xFFFF 938 while not os.WIFEXITED(statuscode): # process terminated normally? 939 statuscode = os.system( 940 '/bin/bash -c \'select CHOICE in "Type CONTINUE to resume test"; do ' 941 'if [ "$REPLY" = "CONTINUE" ]; then break; fi; done\'' 942 ) 943 else: 944 logger.write_info_to_report(f"Pausing test (S: {seconds_to_pause})") 945 time.sleep(seconds_to_pause) 946 for relay in relays: 947 relay.enable()
Pause the test for some amount of seconds.