hitl_tester.modules.bms.cell
Provides controls for the Agilent 66321 Battery Simulator.
(c) 2020-2024 TurnAround Factor, Inc.
#
CUI DISTRIBUTION CONTROL
Controlled by: DLA J68 R&D SBIP
CUI Category: Small Business Research and Technology
Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
POC: GOV SBIP Program Manager Denise Price, 571-767-0111
Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
Fort Belvoir, VA 22060-6221
#
SBIR DATA RIGHTS
Contract No.:SP4701-23-C-0083
Contractor Name: TurnAround Factor, Inc.
Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
Expiration of SBIR Data Rights Period: September 24, 2029
The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
the markings.
1""" 2Provides controls for the Agilent 66321 Battery Simulator. 3 4# (c) 2020-2024 TurnAround Factor, Inc. 5# 6# CUI DISTRIBUTION CONTROL 7# Controlled by: DLA J68 R&D SBIP 8# CUI Category: Small Business Research and Technology 9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, 15# Fort Belvoir, VA 22060-6221 16# 17# SBIR DATA RIGHTS 18# Contract No.:SP4701-23-C-0083 19# Contractor Name: TurnAround Factor, Inc. 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 21# Expiration of SBIR Data Rights Period: September 24, 2029 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce 27# the markings. 28""" 29 30from __future__ import annotations 31 32import atexit 33import bisect 34import concurrent.futures 35import json 36import signal 37import threading 38import time 39from pathlib import Path 40 41from hitl_tester.modules.bms.plateset import Plateset 42from hitl_tester.modules.bms_types import OverVoltageError, UnderVoltageError, CellCompMode, StopWatch, SafeResource 43from hitl_tester.modules.logger import logger 44 45CELL_DATA_FILE = Path(__file__).resolve().parent / "cell_data.json" # Path relative to this file 46 47CELL_PARALLEL_COUNT = 3 48MAX_ATTEMPTS = 60 49 50plateset = Plateset() 51 52 53class CellData: 54 """A datastructure for all cell properties.""" 55 56 def __init__(self, filename: Path, cell_chemistry: str): 57 with open(filename, encoding="utf-8") as json_file: 58 self._raw_data = json.load(json_file)[cell_chemistry or "ZERO"] 59 self.capacity: float = self._raw_data["capacity"] * CELL_PARALLEL_COUNT 60 self.ov_protection: float = self._raw_data["overvoltage_protection"] 61 self.uv_protection: float = self._raw_data["undervoltage_protection"] 62 self.soc: list[float] = self._raw_data["soc"] # State of charge (header for voltage / impedance) 63 self.impedance: dict[float, list[float]] = { # Convert keys to float (not valid in json) 64 float(temperature): [impedance / 1000 / CELL_PARALLEL_COUNT for impedance in impedances] 65 for temperature, impedances in self._raw_data["impedance"].items() 66 } 67 self.volts: dict[float, list[float]] = { # Convert keys to float (not valid in json) 68 float(temperature): voltage for temperature, voltage in self._raw_data["voltage"].items() 69 } 70 71 72class Cell: 73 """Agilent 66321 Battery Simulator command wrapper.""" 74 75 def __init__(self, cell_id: int, resource: SafeResource, cell_chemistry: str): 76 """ 77 Initialize the 66321 wrapper with a specific PyVISA resource. 78 """ 79 self.id: int = cell_id 80 self._resource = resource 81 self._resource.timeout = float("+inf") 82 self._volts: float = 0.0 83 self._resistance: float = 0.0 84 self._timer = StopWatch() 85 self._thread_exit_flag = threading.Event() 86 self._thread_pause_flag = threading.Event() 87 self._lock = threading.RLock() # A reentrant lock can acquire multiple times for the same thread 88 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) 89 self.disengage_safety_protocols: bool = False 90 self.data = CellData(CELL_DATA_FILE, cell_chemistry) 91 92 if cell_chemistry: 93 # Using concurrent.futures allows us to catch all exceptions raised by the thread 94 self._executor.submit(self._advanced_cell_simulator).add_done_callback(self._reraise_thread_exceptions) 95 # In Python 3.9+, concurrent.futures waits for all threads to end before running any atexit 96 # functions, which we use as destructors. This means we're unable to tell a thread to end, 97 # causing the program to hang. We could register our destructor with threading, but this is a private 98 # method, so instead we'll modify join in the thread itself. 99 # threading._register_atexit(self.disable) # type: ignore[attr-defined] # Kill thread to prevent hanging 100 101 @atexit.register 102 def __atexit__(): 103 """Configure a safe shut down for when the class instance is destroyed.""" 104 logger.write_info_to_report(f"Disabling cell {self.id}") 105 if not self._thread_exit_flag.is_set(): 106 logger.write_debug_to_report("Killing thread") 107 self._thread_exit_flag.set() 108 self._executor.shutdown(wait=True) 109 self.disable() 110 111 def _reraise_thread_exceptions(self, future: concurrent.futures.Future): # type: ignore[type-arg] # Future 112 if future.exception(): 113 # _thread.interrupt_main(signal.SIGCHLD) # Python 3.10+ only 114 logger.write_critical_to_report("Thread encountered an exception", exc_info=future.exception()) 115 signal.raise_signal(signal.SIGCHLD) # Child process stopped or terminated. Kill main thread 116 117 @property 118 def measured_volts(self) -> float: 119 """Measures actual cell voltage.""" 120 with self._lock: 121 result = float(self._resource.query(":MEAS:VOLT?")) 122 return result if result != 9.91e37 else 0.0 # Convert NaN to 0 123 124 @property 125 def exact_volts(self) -> float: 126 """Gets the last target voltage.""" 127 return self.volts 128 129 @exact_volts.setter 130 def exact_volts(self, target_v): 131 """ 132 The cell sim isn't always accurate in the voltage that it actually sets, so we need to adjust it until 133 it's correct. 134 """ 135 error = 0.01 136 with self._lock: 137 self.volts = target_v 138 measured = self.measured_volts 139 if not target_v - error <= measured <= target_v + error: # Might be a stale reading 140 time.sleep(5) 141 measured = self.measured_volts 142 if not target_v - error <= measured <= target_v + error: 143 logger.write_debug_to_report(f"{measured} should be {target_v}") 144 self.volts -= measured - target_v 145 logger.write_debug_to_report(f"-> {self.measured_volts=} {self.volts=}") 146 while not target_v - error <= measured <= target_v + error: 147 logger.write_debug_to_report(f"{measured} should be {target_v}") 148 if target_v < measured: 149 self.volts -= 0.001 150 else: 151 self.volts += 0.001 152 measured = self.measured_volts 153 logger.write_debug_to_report(f"-> {self.measured_volts=} {self.volts=}") 154 155 @property 156 def volts(self) -> float: 157 """Gets the last target voltage.""" 158 with self._lock: 159 return self._volts 160 161 @volts.setter 162 def volts(self, new_voltage: float): 163 """Sets the output voltage.""" 164 with self._lock: 165 if not self.disengage_safety_protocols and new_voltage <= self.data.uv_protection: 166 raise UnderVoltageError( 167 f"Undervoltage protection triggered at {time.strftime('%x %X')} on cell {self.id}. " 168 f"Voltage {new_voltage} is lower than {self.data.uv_protection}." 169 ) 170 if not self.disengage_safety_protocols and new_voltage >= self.data.ov_protection: 171 raise OverVoltageError( 172 f"Overvoltage protection triggered at {time.strftime('%x %X')} on cell {self.id}. " 173 f"Voltage {new_voltage} is higher than {self.data.ov_protection}." 174 ) 175 176 self._resource.write(f":VOLT {new_voltage}") 177 self._volts = new_voltage 178 self.enable() # FIXME(JA): explicitly enable cells 179 180 @property 181 def ohms(self) -> float: 182 """Measures internal resistance of the cell""" 183 with self._lock: 184 return self._resistance 185 186 @ohms.setter 187 def ohms(self, new_ohms: float): 188 """Sets the internal resistance of the cell.""" 189 with self._lock: 190 self._resistance = new_ohms 191 self._resource.write(f":RES {new_ohms}") 192 193 @property 194 def amps(self) -> float: 195 """Measures cell current.""" 196 with self._lock: 197 result = float(self._resource.query(":MEAS:CURR?")) 198 return result if result != 9.91e37 else 0.0 # Convert NaN to 0 199 200 @property 201 def compensation_mode(self) -> CellCompMode: 202 """Sets compensation mode""" 203 return CellCompMode[self._resource.query(":OUTP:COMP:MODE?")] 204 205 @compensation_mode.setter 206 def compensation_mode(self, mode: CellCompMode): 207 """Sets compensation mode""" 208 self._resource.write(f":OUTP:COMP:MODE {mode.name}") 209 210 def enable(self): 211 """Enables the cell.""" 212 self._resource.write(":OUTP ON") 213 self._thread_pause_flag.set() 214 215 def disable(self): 216 """Disables the cell.""" 217 self._thread_pause_flag.clear() 218 self._resource.write(":OUTP OFF") 219 220 def reset(self): 221 """Resets the instrument""" 222 self._resource.write("*RST") 223 self.compensation_mode = CellCompMode.LLOCAL 224 225 def _bisect_and_clamp(self, array: list[float], element: float) -> tuple[int, float]: 226 """ 227 Clamp an element to be in the valid range of an array, then return it, and its index. 228 Bisect returns an insertion point, which may be an invalid index. Thus, we clamp it. 229 """ 230 element = min(max(array), max(min(array), element)) 231 insertion_point_clamped = min(len(array) - 1, bisect.bisect(array, element)) 232 return insertion_point_clamped, element 233 234 def _basic_cell_simulator(self): 235 """A simplified simulator to confirm our code is working.""" 236 237 # Patch join to set the kill flag before waiting 238 old_join_method = threading.current_thread().join 239 threading.current_thread().join = lambda: [self._thread_exit_flag.set(), old_join_method()] 240 241 volts_list = [0.0, 2.55, 2.83, 3.109, 3.42, 3.74, 4.2] 242 rate_list = [0.0, 0.000877, 0.000357, 0.000148, 8.63e-05, 8.88e-05, 0.000127] 243 244 self.ohms = 0.35 245 self.volts = volts_list[-1] 246 247 self._timer.reset() 248 while not self._thread_exit_flag.is_set(): 249 delta_t = self._timer.elapsed_time 250 self._thread_pause_flag.wait() 251 self._timer.reset() 252 253 with self._lock: # Update voltage atomically 254 rate = rate_list[self._bisect_and_clamp(volts_list, self.volts)[0]] 255 delta_v = rate * self.amps * delta_t # dV/mAS times the rate divided by millivolts 256 self.volts -= delta_v 257 258 self._thread_exit_flag.wait(2) # Sleep for a little bit 259 260 logger.write_info_to_report(f"Cell {self.id} thread died") 261 262 def _advanced_cell_simulator(self): 263 """The full cell simulator.""" 264 265 # Patch join to set the kill flag before waiting 266 old_join_method = threading.current_thread().join 267 threading.current_thread().join = lambda: [self._thread_exit_flag.set(), old_join_method()] 268 269 # Wait until voltage is set 270 while not self._thread_exit_flag.is_set() and self.volts == 0: 271 self._thread_exit_flag.wait(0.5) # Sleep for a little bit 272 273 # Interpolate impedance with voltage and temperature 274 temp_low, temperature, temp_high = self._temperature_data() 275 ohm_lower = self._impedance_from_voltage(temp_low, self.volts) 276 ohm_higher = self._impedance_from_voltage(temp_high, self.volts) 277 self.ohms = self._interpolate_value(ohm_lower, ohm_higher, temp_low, temp_high, temperature) 278 279 # Start the main loop 280 self._timer.reset() 281 while not self._thread_exit_flag.is_set(): 282 delta_t = self._timer.elapsed_time 283 self._thread_pause_flag.wait() 284 self._timer.reset() 285 286 temp_low, temperature, temp_high = self._temperature_data() 287 with self._lock: # Update voltage and impedance atomically 288 # Calculate new voltage 289 rate_low = self._rate_from_voltage(temp_low, self.volts) 290 rate_high = self._rate_from_voltage(temp_high, self.volts) 291 rate = self._interpolate_value(rate_low, rate_high, temp_low, temp_high, temperature) 292 delta_v = rate * self.amps * delta_t / 3600 # dV/dQ * mAh 293 self.volts -= delta_v 294 295 # Calculate impedance from new voltage 296 ohm_lower = self._impedance_from_voltage(temp_low, self.volts) 297 ohm_higher = self._impedance_from_voltage(temp_high, self.volts) 298 self.ohms = self._interpolate_value(ohm_lower, ohm_higher, temp_low, temp_high, temperature) 299 300 self._thread_exit_flag.wait(2) # Sleep for a little bit 301 302 logger.write_debug_to_report(f"Cell {self.id} thread died") 303 304 def _interpolate_value(self, low_a: float, high_a: float, low_b: float, high_b: float, b: float) -> float: 305 """Interpolate A in range A based on where B lies in range B.""" 306 return low_a + (b - low_b) * (high_a - low_a) / (high_b - low_b) 307 308 def _temperature_data(self) -> tuple[float, float, float]: 309 """Get the temperature, and the two boundaries between which the given temperature lies.""" 310 temp_list = list(self.data.volts) # Get the complete temperature range (keys in volts dict) 311 temperature_reading = (plateset.thermistor1 + plateset.thermistor2) / 2 312 temp_index, temperature_reading = self._bisect_and_clamp(temp_list, temperature_reading) 313 return temp_list[temp_index - 1], temperature_reading, temp_list[temp_index] 314 315 def _impedance_from_voltage(self, temperature_key: float, voltage: float) -> float: 316 """Calculate the impedance from a temperature and voltage.""" 317 ohms_list = self.data.impedance[temperature_key] 318 volts_list = self.data.volts[temperature_key] 319 volts_index, voltage = self._bisect_and_clamp(volts_list, voltage) # Determine two closest data point indexes 320 return self._interpolate_value( 321 ohms_list[volts_index - 1], 322 ohms_list[volts_index], 323 volts_list[volts_index - 1], 324 volts_list[volts_index], 325 voltage, 326 ) 327 328 def volts_to_soc(self, voltage: float | None) -> float | None: 329 """Calculate the state of charge from a voltage.""" 330 logger.write_debug_to_report(f"Got voltage {voltage}") 331 if voltage is None: 332 return None 333 334 temp_low, temperature, temp_high = self._temperature_data() 335 336 volts_list = self.data.volts[temp_low] 337 volts_index, voltage = self._bisect_and_clamp(volts_list, voltage) # Determine two closest data points 338 temp_low_soc = self._interpolate_value( 339 self.data.soc[volts_index - 1], 340 self.data.soc[volts_index], 341 volts_list[volts_index - 1], 342 volts_list[volts_index], 343 voltage, 344 ) 345 346 volts_list = self.data.volts[temp_high] 347 volts_index, voltage = self._bisect_and_clamp(volts_list, voltage) # Determine two closest data points 348 temp_high_soc = self._interpolate_value( 349 self.data.soc[volts_index - 1], 350 self.data.soc[volts_index], 351 volts_list[volts_index - 1], 352 volts_list[volts_index], 353 voltage, 354 ) 355 356 result = self._interpolate_value(temp_low_soc, temp_high_soc, temp_low, temp_high, temperature) 357 logger.write_debug_to_report( 358 f"Got result {[result, temp_low_soc, temp_high_soc, temp_low, temp_high, temperature]}" 359 ) 360 return result 361 362 @property 363 def state_of_charge(self) -> float: 364 """Calculate the state of charge of the simulator.""" 365 366 return self.volts_to_soc(self.volts) or 0.0 367 368 @state_of_charge.setter 369 def state_of_charge(self, new_soc: float): 370 """Set the voltage of the simulator based on the state of charge.""" 371 temp_low, temperature, temp_high = self._temperature_data() 372 373 volts_list = self.data.volts[temp_low] 374 soc_index, soc = self._bisect_and_clamp(self.data.soc, new_soc) 375 temp_low_volts = self._interpolate_value( 376 volts_list[soc_index - 1], 377 volts_list[soc_index], 378 self.data.soc[soc_index - 1], 379 self.data.soc[soc_index], 380 soc, 381 ) 382 383 volts_list = self.data.volts[temp_high] 384 soc_index, soc = self._bisect_and_clamp(self.data.soc, new_soc) 385 temp_high_volts = self._interpolate_value( 386 volts_list[soc_index - 1], 387 volts_list[soc_index], 388 self.data.soc[soc_index - 1], 389 self.data.soc[soc_index], 390 soc, 391 ) 392 393 self.volts = self._interpolate_value(temp_low_volts, temp_high_volts, temp_low, temp_high, temperature) 394 395 def _rate_from_voltage(self, temperature_key: float, voltage: float) -> float: 396 """Calculate the change rate from a temperature and voltage.""" 397 volts_list = self.data.volts[temperature_key] 398 volts_index, voltage = self._bisect_and_clamp(volts_list, voltage) 399 400 delta_soc = self.data.soc[volts_index] - self.data.soc[volts_index - 1] 401 delta_v = volts_list[volts_index] - volts_list[volts_index - 1] 402 delta_q = delta_soc * self.data.capacity 403 return float(delta_v / delta_q)
CELL_DATA_FILE =
PosixPath('/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/site-packages/hitl_tester/modules/bms/cell_data.json')
CELL_PARALLEL_COUNT =
3
MAX_ATTEMPTS =
60
plateset =
<hitl_tester.modules.bms.plateset.Plateset object>
class
CellData:
54class CellData: 55 """A datastructure for all cell properties.""" 56 57 def __init__(self, filename: Path, cell_chemistry: str): 58 with open(filename, encoding="utf-8") as json_file: 59 self._raw_data = json.load(json_file)[cell_chemistry or "ZERO"] 60 self.capacity: float = self._raw_data["capacity"] * CELL_PARALLEL_COUNT 61 self.ov_protection: float = self._raw_data["overvoltage_protection"] 62 self.uv_protection: float = self._raw_data["undervoltage_protection"] 63 self.soc: list[float] = self._raw_data["soc"] # State of charge (header for voltage / impedance) 64 self.impedance: dict[float, list[float]] = { # Convert keys to float (not valid in json) 65 float(temperature): [impedance / 1000 / CELL_PARALLEL_COUNT for impedance in impedances] 66 for temperature, impedances in self._raw_data["impedance"].items() 67 } 68 self.volts: dict[float, list[float]] = { # Convert keys to float (not valid in json) 69 float(temperature): voltage for temperature, voltage in self._raw_data["voltage"].items() 70 }
A datastructure for all cell properties.
CellData(filename: pathlib.Path, cell_chemistry: str)
57 def __init__(self, filename: Path, cell_chemistry: str): 58 with open(filename, encoding="utf-8") as json_file: 59 self._raw_data = json.load(json_file)[cell_chemistry or "ZERO"] 60 self.capacity: float = self._raw_data["capacity"] * CELL_PARALLEL_COUNT 61 self.ov_protection: float = self._raw_data["overvoltage_protection"] 62 self.uv_protection: float = self._raw_data["undervoltage_protection"] 63 self.soc: list[float] = self._raw_data["soc"] # State of charge (header for voltage / impedance) 64 self.impedance: dict[float, list[float]] = { # Convert keys to float (not valid in json) 65 float(temperature): [impedance / 1000 / CELL_PARALLEL_COUNT for impedance in impedances] 66 for temperature, impedances in self._raw_data["impedance"].items() 67 } 68 self.volts: dict[float, list[float]] = { # Convert keys to float (not valid in json) 69 float(temperature): voltage for temperature, voltage in self._raw_data["voltage"].items() 70 }
class
Cell:
73class Cell: 74 """Agilent 66321 Battery Simulator command wrapper.""" 75 76 def __init__(self, cell_id: int, resource: SafeResource, cell_chemistry: str): 77 """ 78 Initialize the 66321 wrapper with a specific PyVISA resource. 79 """ 80 self.id: int = cell_id 81 self._resource = resource 82 self._resource.timeout = float("+inf") 83 self._volts: float = 0.0 84 self._resistance: float = 0.0 85 self._timer = StopWatch() 86 self._thread_exit_flag = threading.Event() 87 self._thread_pause_flag = threading.Event() 88 self._lock = threading.RLock() # A reentrant lock can acquire multiple times for the same thread 89 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) 90 self.disengage_safety_protocols: bool = False 91 self.data = CellData(CELL_DATA_FILE, cell_chemistry) 92 93 if cell_chemistry: 94 # Using concurrent.futures allows us to catch all exceptions raised by the thread 95 self._executor.submit(self._advanced_cell_simulator).add_done_callback(self._reraise_thread_exceptions) 96 # In Python 3.9+, concurrent.futures waits for all threads to end before running any atexit 97 # functions, which we use as destructors. This means we're unable to tell a thread to end, 98 # causing the program to hang. We could register our destructor with threading, but this is a private 99 # method, so instead we'll modify join in the thread itself. 100 # threading._register_atexit(self.disable) # type: ignore[attr-defined] # Kill thread to prevent hanging 101 102 @atexit.register 103 def __atexit__(): 104 """Configure a safe shut down for when the class instance is destroyed.""" 105 logger.write_info_to_report(f"Disabling cell {self.id}") 106 if not self._thread_exit_flag.is_set(): 107 logger.write_debug_to_report("Killing thread") 108 self._thread_exit_flag.set() 109 self._executor.shutdown(wait=True) 110 self.disable() 111 112 def _reraise_thread_exceptions(self, future: concurrent.futures.Future): # type: ignore[type-arg] # Future 113 if future.exception(): 114 # _thread.interrupt_main(signal.SIGCHLD) # Python 3.10+ only 115 logger.write_critical_to_report("Thread encountered an exception", exc_info=future.exception()) 116 signal.raise_signal(signal.SIGCHLD) # Child process stopped or terminated. Kill main thread 117 118 @property 119 def measured_volts(self) -> float: 120 """Measures actual cell voltage.""" 121 with self._lock: 122 result = float(self._resource.query(":MEAS:VOLT?")) 123 return result if result != 9.91e37 else 0.0 # Convert NaN to 0 124 125 @property 126 def exact_volts(self) -> float: 127 """Gets the last target voltage.""" 128 return self.volts 129 130 @exact_volts.setter 131 def exact_volts(self, target_v): 132 """ 133 The cell sim isn't always accurate in the voltage that it actually sets, so we need to adjust it until 134 it's correct. 135 """ 136 error = 0.01 137 with self._lock: 138 self.volts = target_v 139 measured = self.measured_volts 140 if not target_v - error <= measured <= target_v + error: # Might be a stale reading 141 time.sleep(5) 142 measured = self.measured_volts 143 if not target_v - error <= measured <= target_v + error: 144 logger.write_debug_to_report(f"{measured} should be {target_v}") 145 self.volts -= measured - target_v 146 logger.write_debug_to_report(f"-> {self.measured_volts=} {self.volts=}") 147 while not target_v - error <= measured <= target_v + error: 148 logger.write_debug_to_report(f"{measured} should be {target_v}") 149 if target_v < measured: 150 self.volts -= 0.001 151 else: 152 self.volts += 0.001 153 measured = self.measured_volts 154 logger.write_debug_to_report(f"-> {self.measured_volts=} {self.volts=}") 155 156 @property 157 def volts(self) -> float: 158 """Gets the last target voltage.""" 159 with self._lock: 160 return self._volts 161 162 @volts.setter 163 def volts(self, new_voltage: float): 164 """Sets the output voltage.""" 165 with self._lock: 166 if not self.disengage_safety_protocols and new_voltage <= self.data.uv_protection: 167 raise UnderVoltageError( 168 f"Undervoltage protection triggered at {time.strftime('%x %X')} on cell {self.id}. " 169 f"Voltage {new_voltage} is lower than {self.data.uv_protection}." 170 ) 171 if not self.disengage_safety_protocols and new_voltage >= self.data.ov_protection: 172 raise OverVoltageError( 173 f"Overvoltage protection triggered at {time.strftime('%x %X')} on cell {self.id}. " 174 f"Voltage {new_voltage} is higher than {self.data.ov_protection}." 175 ) 176 177 self._resource.write(f":VOLT {new_voltage}") 178 self._volts = new_voltage 179 self.enable() # FIXME(JA): explicitly enable cells 180 181 @property 182 def ohms(self) -> float: 183 """Measures internal resistance of the cell""" 184 with self._lock: 185 return self._resistance 186 187 @ohms.setter 188 def ohms(self, new_ohms: float): 189 """Sets the internal resistance of the cell.""" 190 with self._lock: 191 self._resistance = new_ohms 192 self._resource.write(f":RES {new_ohms}") 193 194 @property 195 def amps(self) -> float: 196 """Measures cell current.""" 197 with self._lock: 198 result = float(self._resource.query(":MEAS:CURR?")) 199 return result if result != 9.91e37 else 0.0 # Convert NaN to 0 200 201 @property 202 def compensation_mode(self) -> CellCompMode: 203 """Sets compensation mode""" 204 return CellCompMode[self._resource.query(":OUTP:COMP:MODE?")] 205 206 @compensation_mode.setter 207 def compensation_mode(self, mode: CellCompMode): 208 """Sets compensation mode""" 209 self._resource.write(f":OUTP:COMP:MODE {mode.name}") 210 211 def enable(self): 212 """Enables the cell.""" 213 self._resource.write(":OUTP ON") 214 self._thread_pause_flag.set() 215 216 def disable(self): 217 """Disables the cell.""" 218 self._thread_pause_flag.clear() 219 self._resource.write(":OUTP OFF") 220 221 def reset(self): 222 """Resets the instrument""" 223 self._resource.write("*RST") 224 self.compensation_mode = CellCompMode.LLOCAL 225 226 def _bisect_and_clamp(self, array: list[float], element: float) -> tuple[int, float]: 227 """ 228 Clamp an element to be in the valid range of an array, then return it, and its index. 229 Bisect returns an insertion point, which may be an invalid index. Thus, we clamp it. 230 """ 231 element = min(max(array), max(min(array), element)) 232 insertion_point_clamped = min(len(array) - 1, bisect.bisect(array, element)) 233 return insertion_point_clamped, element 234 235 def _basic_cell_simulator(self): 236 """A simplified simulator to confirm our code is working.""" 237 238 # Patch join to set the kill flag before waiting 239 old_join_method = threading.current_thread().join 240 threading.current_thread().join = lambda: [self._thread_exit_flag.set(), old_join_method()] 241 242 volts_list = [0.0, 2.55, 2.83, 3.109, 3.42, 3.74, 4.2] 243 rate_list = [0.0, 0.000877, 0.000357, 0.000148, 8.63e-05, 8.88e-05, 0.000127] 244 245 self.ohms = 0.35 246 self.volts = volts_list[-1] 247 248 self._timer.reset() 249 while not self._thread_exit_flag.is_set(): 250 delta_t = self._timer.elapsed_time 251 self._thread_pause_flag.wait() 252 self._timer.reset() 253 254 with self._lock: # Update voltage atomically 255 rate = rate_list[self._bisect_and_clamp(volts_list, self.volts)[0]] 256 delta_v = rate * self.amps * delta_t # dV/mAS times the rate divided by millivolts 257 self.volts -= delta_v 258 259 self._thread_exit_flag.wait(2) # Sleep for a little bit 260 261 logger.write_info_to_report(f"Cell {self.id} thread died") 262 263 def _advanced_cell_simulator(self): 264 """The full cell simulator.""" 265 266 # Patch join to set the kill flag before waiting 267 old_join_method = threading.current_thread().join 268 threading.current_thread().join = lambda: [self._thread_exit_flag.set(), old_join_method()] 269 270 # Wait until voltage is set 271 while not self._thread_exit_flag.is_set() and self.volts == 0: 272 self._thread_exit_flag.wait(0.5) # Sleep for a little bit 273 274 # Interpolate impedance with voltage and temperature 275 temp_low, temperature, temp_high = self._temperature_data() 276 ohm_lower = self._impedance_from_voltage(temp_low, self.volts) 277 ohm_higher = self._impedance_from_voltage(temp_high, self.volts) 278 self.ohms = self._interpolate_value(ohm_lower, ohm_higher, temp_low, temp_high, temperature) 279 280 # Start the main loop 281 self._timer.reset() 282 while not self._thread_exit_flag.is_set(): 283 delta_t = self._timer.elapsed_time 284 self._thread_pause_flag.wait() 285 self._timer.reset() 286 287 temp_low, temperature, temp_high = self._temperature_data() 288 with self._lock: # Update voltage and impedance atomically 289 # Calculate new voltage 290 rate_low = self._rate_from_voltage(temp_low, self.volts) 291 rate_high = self._rate_from_voltage(temp_high, self.volts) 292 rate = self._interpolate_value(rate_low, rate_high, temp_low, temp_high, temperature) 293 delta_v = rate * self.amps * delta_t / 3600 # dV/dQ * mAh 294 self.volts -= delta_v 295 296 # Calculate impedance from new voltage 297 ohm_lower = self._impedance_from_voltage(temp_low, self.volts) 298 ohm_higher = self._impedance_from_voltage(temp_high, self.volts) 299 self.ohms = self._interpolate_value(ohm_lower, ohm_higher, temp_low, temp_high, temperature) 300 301 self._thread_exit_flag.wait(2) # Sleep for a little bit 302 303 logger.write_debug_to_report(f"Cell {self.id} thread died") 304 305 def _interpolate_value(self, low_a: float, high_a: float, low_b: float, high_b: float, b: float) -> float: 306 """Interpolate A in range A based on where B lies in range B.""" 307 return low_a + (b - low_b) * (high_a - low_a) / (high_b - low_b) 308 309 def _temperature_data(self) -> tuple[float, float, float]: 310 """Get the temperature, and the two boundaries between which the given temperature lies.""" 311 temp_list = list(self.data.volts) # Get the complete temperature range (keys in volts dict) 312 temperature_reading = (plateset.thermistor1 + plateset.thermistor2) / 2 313 temp_index, temperature_reading = self._bisect_and_clamp(temp_list, temperature_reading) 314 return temp_list[temp_index - 1], temperature_reading, temp_list[temp_index] 315 316 def _impedance_from_voltage(self, temperature_key: float, voltage: float) -> float: 317 """Calculate the impedance from a temperature and voltage.""" 318 ohms_list = self.data.impedance[temperature_key] 319 volts_list = self.data.volts[temperature_key] 320 volts_index, voltage = self._bisect_and_clamp(volts_list, voltage) # Determine two closest data point indexes 321 return self._interpolate_value( 322 ohms_list[volts_index - 1], 323 ohms_list[volts_index], 324 volts_list[volts_index - 1], 325 volts_list[volts_index], 326 voltage, 327 ) 328 329 def volts_to_soc(self, voltage: float | None) -> float | None: 330 """Calculate the state of charge from a voltage.""" 331 logger.write_debug_to_report(f"Got voltage {voltage}") 332 if voltage is None: 333 return None 334 335 temp_low, temperature, temp_high = self._temperature_data() 336 337 volts_list = self.data.volts[temp_low] 338 volts_index, voltage = self._bisect_and_clamp(volts_list, voltage) # Determine two closest data points 339 temp_low_soc = self._interpolate_value( 340 self.data.soc[volts_index - 1], 341 self.data.soc[volts_index], 342 volts_list[volts_index - 1], 343 volts_list[volts_index], 344 voltage, 345 ) 346 347 volts_list = self.data.volts[temp_high] 348 volts_index, voltage = self._bisect_and_clamp(volts_list, voltage) # Determine two closest data points 349 temp_high_soc = self._interpolate_value( 350 self.data.soc[volts_index - 1], 351 self.data.soc[volts_index], 352 volts_list[volts_index - 1], 353 volts_list[volts_index], 354 voltage, 355 ) 356 357 result = self._interpolate_value(temp_low_soc, temp_high_soc, temp_low, temp_high, temperature) 358 logger.write_debug_to_report( 359 f"Got result {[result, temp_low_soc, temp_high_soc, temp_low, temp_high, temperature]}" 360 ) 361 return result 362 363 @property 364 def state_of_charge(self) -> float: 365 """Calculate the state of charge of the simulator.""" 366 367 return self.volts_to_soc(self.volts) or 0.0 368 369 @state_of_charge.setter 370 def state_of_charge(self, new_soc: float): 371 """Set the voltage of the simulator based on the state of charge.""" 372 temp_low, temperature, temp_high = self._temperature_data() 373 374 volts_list = self.data.volts[temp_low] 375 soc_index, soc = self._bisect_and_clamp(self.data.soc, new_soc) 376 temp_low_volts = self._interpolate_value( 377 volts_list[soc_index - 1], 378 volts_list[soc_index], 379 self.data.soc[soc_index - 1], 380 self.data.soc[soc_index], 381 soc, 382 ) 383 384 volts_list = self.data.volts[temp_high] 385 soc_index, soc = self._bisect_and_clamp(self.data.soc, new_soc) 386 temp_high_volts = self._interpolate_value( 387 volts_list[soc_index - 1], 388 volts_list[soc_index], 389 self.data.soc[soc_index - 1], 390 self.data.soc[soc_index], 391 soc, 392 ) 393 394 self.volts = self._interpolate_value(temp_low_volts, temp_high_volts, temp_low, temp_high, temperature) 395 396 def _rate_from_voltage(self, temperature_key: float, voltage: float) -> float: 397 """Calculate the change rate from a temperature and voltage.""" 398 volts_list = self.data.volts[temperature_key] 399 volts_index, voltage = self._bisect_and_clamp(volts_list, voltage) 400 401 delta_soc = self.data.soc[volts_index] - self.data.soc[volts_index - 1] 402 delta_v = volts_list[volts_index] - volts_list[volts_index - 1] 403 delta_q = delta_soc * self.data.capacity 404 return float(delta_v / delta_q)
Agilent 66321 Battery Simulator command wrapper.
Cell( cell_id: int, resource: hitl_tester.modules.bms_types.SafeResource, cell_chemistry: str)
76 def __init__(self, cell_id: int, resource: SafeResource, cell_chemistry: str): 77 """ 78 Initialize the 66321 wrapper with a specific PyVISA resource. 79 """ 80 self.id: int = cell_id 81 self._resource = resource 82 self._resource.timeout = float("+inf") 83 self._volts: float = 0.0 84 self._resistance: float = 0.0 85 self._timer = StopWatch() 86 self._thread_exit_flag = threading.Event() 87 self._thread_pause_flag = threading.Event() 88 self._lock = threading.RLock() # A reentrant lock can acquire multiple times for the same thread 89 self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1) 90 self.disengage_safety_protocols: bool = False 91 self.data = CellData(CELL_DATA_FILE, cell_chemistry) 92 93 if cell_chemistry: 94 # Using concurrent.futures allows us to catch all exceptions raised by the thread 95 self._executor.submit(self._advanced_cell_simulator).add_done_callback(self._reraise_thread_exceptions) 96 # In Python 3.9+, concurrent.futures waits for all threads to end before running any atexit 97 # functions, which we use as destructors. This means we're unable to tell a thread to end, 98 # causing the program to hang. We could register our destructor with threading, but this is a private 99 # method, so instead we'll modify join in the thread itself. 100 # threading._register_atexit(self.disable) # type: ignore[attr-defined] # Kill thread to prevent hanging 101 102 @atexit.register 103 def __atexit__(): 104 """Configure a safe shut down for when the class instance is destroyed.""" 105 logger.write_info_to_report(f"Disabling cell {self.id}") 106 if not self._thread_exit_flag.is_set(): 107 logger.write_debug_to_report("Killing thread") 108 self._thread_exit_flag.set() 109 self._executor.shutdown(wait=True) 110 self.disable()
Initialize the 66321 wrapper with a specific PyVISA resource.
measured_volts: float
118 @property 119 def measured_volts(self) -> float: 120 """Measures actual cell voltage.""" 121 with self._lock: 122 result = float(self._resource.query(":MEAS:VOLT?")) 123 return result if result != 9.91e37 else 0.0 # Convert NaN to 0
Measures actual cell voltage.
exact_volts: float
125 @property 126 def exact_volts(self) -> float: 127 """Gets the last target voltage.""" 128 return self.volts
Gets the last target voltage.
volts: float
156 @property 157 def volts(self) -> float: 158 """Gets the last target voltage.""" 159 with self._lock: 160 return self._volts
Gets the last target voltage.
ohms: float
181 @property 182 def ohms(self) -> float: 183 """Measures internal resistance of the cell""" 184 with self._lock: 185 return self._resistance
Measures internal resistance of the cell
amps: float
194 @property 195 def amps(self) -> float: 196 """Measures cell current.""" 197 with self._lock: 198 result = float(self._resource.query(":MEAS:CURR?")) 199 return result if result != 9.91e37 else 0.0 # Convert NaN to 0
Measures cell current.
compensation_mode: hitl_tester.modules.bms_types.CellCompMode
201 @property 202 def compensation_mode(self) -> CellCompMode: 203 """Sets compensation mode""" 204 return CellCompMode[self._resource.query(":OUTP:COMP:MODE?")]
Sets compensation mode
def
enable(self):
211 def enable(self): 212 """Enables the cell.""" 213 self._resource.write(":OUTP ON") 214 self._thread_pause_flag.set()
Enables the cell.
def
disable(self):
216 def disable(self): 217 """Disables the cell.""" 218 self._thread_pause_flag.clear() 219 self._resource.write(":OUTP OFF")
Disables the cell.
def
reset(self):
221 def reset(self): 222 """Resets the instrument""" 223 self._resource.write("*RST") 224 self.compensation_mode = CellCompMode.LLOCAL
Resets the instrument
def
volts_to_soc(self, voltage: float | None) -> float | None:
329 def volts_to_soc(self, voltage: float | None) -> float | None: 330 """Calculate the state of charge from a voltage.""" 331 logger.write_debug_to_report(f"Got voltage {voltage}") 332 if voltage is None: 333 return None 334 335 temp_low, temperature, temp_high = self._temperature_data() 336 337 volts_list = self.data.volts[temp_low] 338 volts_index, voltage = self._bisect_and_clamp(volts_list, voltage) # Determine two closest data points 339 temp_low_soc = self._interpolate_value( 340 self.data.soc[volts_index - 1], 341 self.data.soc[volts_index], 342 volts_list[volts_index - 1], 343 volts_list[volts_index], 344 voltage, 345 ) 346 347 volts_list = self.data.volts[temp_high] 348 volts_index, voltage = self._bisect_and_clamp(volts_list, voltage) # Determine two closest data points 349 temp_high_soc = self._interpolate_value( 350 self.data.soc[volts_index - 1], 351 self.data.soc[volts_index], 352 volts_list[volts_index - 1], 353 volts_list[volts_index], 354 voltage, 355 ) 356 357 result = self._interpolate_value(temp_low_soc, temp_high_soc, temp_low, temp_high, temperature) 358 logger.write_debug_to_report( 359 f"Got result {[result, temp_low_soc, temp_high_soc, temp_low, temp_high, temperature]}" 360 ) 361 return result
Calculate the state of charge from a voltage.