hitl_tester.modules.bms.cell

Provides controls for the Agilent 66321 Battery Simulator.

(c) 2020-2024 TurnAround Factor, Inc.

#

CUI DISTRIBUTION CONTROL

Controlled by: DLA J68 R&D SBIP

CUI Category: Small Business Research and Technology

Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS

POC: GOV SBIP Program Manager Denise Price, 571-767-0111

Distribution authorized to U.S. Government Agencies only, to protect information not owned by the

U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that

it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests

for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,

Fort Belvoir, VA 22060-6221

#

SBIR DATA RIGHTS

Contract No.:SP4701-23-C-0083

Contractor Name: TurnAround Factor, Inc.

Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005

Expiration of SBIR Data Rights Period: September 24, 2029

The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer

software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights

in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause

contained in the above identified contract. No restrictions apply after the expiration date shown above. Any

reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce

the markings.

  1"""
  2Provides controls for the Agilent 66321 Battery Simulator.
  3
  4# (c) 2020-2024 TurnAround Factor, Inc.
  5#
  6# CUI DISTRIBUTION CONTROL
  7# Controlled by: DLA J68 R&D SBIP
  8# CUI Category: Small Business Research and Technology
  9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111
 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
 15# Fort Belvoir, VA 22060-6221
 16#
 17# SBIR DATA RIGHTS
 18# Contract No.:SP4701-23-C-0083
 19# Contractor Name: TurnAround Factor, Inc.
 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
 21# Expiration of SBIR Data Rights Period: September 24, 2029
 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
 27# the markings.
 28"""
 29
 30from __future__ import annotations
 31
 32import atexit
 33import bisect
 34import concurrent.futures
 35import json
 36import signal
 37import threading
 38import time
 39from pathlib import Path
 40
 41from hitl_tester.modules.bms.plateset import Plateset
 42from hitl_tester.modules.bms_types import OverVoltageError, UnderVoltageError, CellCompMode, StopWatch, SafeResource
 43from hitl_tester.modules.logger import logger
 44
 45CELL_DATA_FILE = Path(__file__).resolve().parent / "cell_data.json"  # Path relative to this file
 46
 47CELL_PARALLEL_COUNT = 3
 48MAX_ATTEMPTS = 60
 49
 50plateset = Plateset()
 51
 52
 53class CellData:
 54    """A datastructure for all cell properties."""
 55
 56    def __init__(self, filename: Path, cell_chemistry: str):
 57        with open(filename, encoding="utf-8") as json_file:
 58            self._raw_data = json.load(json_file)[cell_chemistry or "ZERO"]
 59            self.capacity: float = self._raw_data["capacity"] * CELL_PARALLEL_COUNT
 60            self.ov_protection: float = self._raw_data["overvoltage_protection"]
 61            self.uv_protection: float = self._raw_data["undervoltage_protection"]
 62            self.soc: list[float] = self._raw_data["soc"]  # State of charge (header for voltage / impedance)
 63            self.impedance: dict[float, list[float]] = {  # Convert keys to float (not valid in json)
 64                float(temperature): [impedance / 1000 / CELL_PARALLEL_COUNT for impedance in impedances]
 65                for temperature, impedances in self._raw_data["impedance"].items()
 66            }
 67            self.volts: dict[float, list[float]] = {  # Convert keys to float (not valid in json)
 68                float(temperature): voltage for temperature, voltage in self._raw_data["voltage"].items()
 69            }
 70
 71
 72class Cell:
 73    """Agilent 66321 Battery Simulator command wrapper."""
 74
 75    def __init__(self, cell_id: int, resource: SafeResource, cell_chemistry: str):
 76        """
 77        Initialize the 66321 wrapper with a specific PyVISA resource.
 78        """
 79        self.id: int = cell_id
 80        self._resource = resource
 81        self._resource.timeout = float("+inf")
 82        self._volts: float = 0.0
 83        self._resistance: float = 0.0
 84        self._timer = StopWatch()
 85        self._thread_exit_flag = threading.Event()
 86        self._thread_pause_flag = threading.Event()
 87        self._lock = threading.RLock()  # A reentrant lock can acquire multiple times for the same thread
 88        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
 89        self.disengage_safety_protocols: bool = False
 90        self.data = CellData(CELL_DATA_FILE, cell_chemistry)
 91
 92        if cell_chemistry:
 93            # Using concurrent.futures allows us to catch all exceptions raised by the thread
 94            self._executor.submit(self._advanced_cell_simulator).add_done_callback(self._reraise_thread_exceptions)
 95            # In Python 3.9+, concurrent.futures waits for all threads to end before running any atexit
 96            # functions, which we use as destructors. This means we're unable to tell a thread to end,
 97            # causing the program to hang. We could register our destructor with threading, but this is a private
 98            # method, so instead we'll modify join in the thread itself.
 99            # threading._register_atexit(self.disable)  # type: ignore[attr-defined]  # Kill thread to prevent hanging
100
101        @atexit.register
102        def __atexit__():
103            """Configure a safe shut down for when the class instance is destroyed."""
104            logger.write_info_to_report(f"Disabling cell {self.id}")
105            if not self._thread_exit_flag.is_set():
106                logger.write_debug_to_report("Killing thread")
107                self._thread_exit_flag.set()
108                self._executor.shutdown(wait=True)
109            self.disable()
110
111    def _reraise_thread_exceptions(self, future: concurrent.futures.Future):  # type: ignore[type-arg]  # Future
112        if future.exception():
113            # _thread.interrupt_main(signal.SIGCHLD)  # Python 3.10+ only
114            logger.write_critical_to_report("Thread encountered an exception", exc_info=future.exception())
115            signal.raise_signal(signal.SIGCHLD)  # Child process stopped or terminated. Kill main thread
116
117    @property
118    def measured_volts(self) -> float:
119        """Measures actual cell voltage."""
120        with self._lock:
121            result = float(self._resource.query(":MEAS:VOLT?"))
122            return result if result != 9.91e37 else 0.0  # Convert NaN to 0
123
124    @property
125    def exact_volts(self) -> float:
126        """Gets the last target voltage."""
127        return self.volts
128
129    @exact_volts.setter
130    def exact_volts(self, target_v):
131        """
132        The cell sim isn't always accurate in the voltage that it actually sets, so we need to adjust it until
133        it's correct.
134        """
135        error = 0.01
136        with self._lock:
137            self.volts = target_v
138            measured = self.measured_volts
139            if not target_v - error <= measured <= target_v + error:  # Might be a stale reading
140                time.sleep(5)
141                measured = self.measured_volts
142            if not target_v - error <= measured <= target_v + error:
143                logger.write_debug_to_report(f"{measured} should be {target_v}")
144                self.volts -= measured - target_v
145                logger.write_debug_to_report(f"-> {self.measured_volts=} {self.volts=}")
146            while not target_v - error <= measured <= target_v + error:
147                logger.write_debug_to_report(f"{measured} should be {target_v}")
148                if target_v < measured:
149                    self.volts -= 0.001
150                else:
151                    self.volts += 0.001
152                measured = self.measured_volts
153            logger.write_debug_to_report(f"-> {self.measured_volts=} {self.volts=}")
154
155    @property
156    def volts(self) -> float:
157        """Gets the last target voltage."""
158        with self._lock:
159            return self._volts
160
161    @volts.setter
162    def volts(self, new_voltage: float):
163        """Sets the output voltage."""
164        with self._lock:
165            if not self.disengage_safety_protocols and new_voltage <= self.data.uv_protection:
166                raise UnderVoltageError(
167                    f"Undervoltage protection triggered at {time.strftime('%x %X')} on cell {self.id}. "
168                    f"Voltage {new_voltage} is lower than {self.data.uv_protection}."
169                )
170            if not self.disengage_safety_protocols and new_voltage >= self.data.ov_protection:
171                raise OverVoltageError(
172                    f"Overvoltage protection triggered at {time.strftime('%x %X')} on cell {self.id}. "
173                    f"Voltage {new_voltage} is higher than {self.data.ov_protection}."
174                )
175
176            self._resource.write(f":VOLT {new_voltage}")
177            self._volts = new_voltage
178            self.enable()  # FIXME(JA): explicitly enable cells
179
180    @property
181    def ohms(self) -> float:
182        """Measures internal resistance of the cell"""
183        with self._lock:
184            return self._resistance
185
186    @ohms.setter
187    def ohms(self, new_ohms: float):
188        """Sets the internal resistance of the cell."""
189        with self._lock:
190            self._resistance = new_ohms
191            self._resource.write(f":RES {new_ohms}")
192
193    @property
194    def amps(self) -> float:
195        """Measures cell current."""
196        with self._lock:
197            result = float(self._resource.query(":MEAS:CURR?"))
198            return result if result != 9.91e37 else 0.0  # Convert NaN to 0
199
200    @property
201    def compensation_mode(self) -> CellCompMode:
202        """Sets compensation mode"""
203        return CellCompMode[self._resource.query(":OUTP:COMP:MODE?")]
204
205    @compensation_mode.setter
206    def compensation_mode(self, mode: CellCompMode):
207        """Sets compensation mode"""
208        self._resource.write(f":OUTP:COMP:MODE {mode.name}")
209
210    def enable(self):
211        """Enables the cell."""
212        self._resource.write(":OUTP ON")
213        self._thread_pause_flag.set()
214
215    def disable(self):
216        """Disables the cell."""
217        self._thread_pause_flag.clear()
218        self._resource.write(":OUTP OFF")
219
220    def reset(self):
221        """Resets the instrument"""
222        self._resource.write("*RST")
223        self.compensation_mode = CellCompMode.LLOCAL
224
225    def _bisect_and_clamp(self, array: list[float], element: float) -> tuple[int, float]:
226        """
227        Clamp an element to be in the valid range of an array, then return it, and its index.
228        Bisect returns an insertion point, which may be an invalid index. Thus, we clamp it.
229        """
230        element = min(max(array), max(min(array), element))
231        insertion_point_clamped = min(len(array) - 1, bisect.bisect(array, element))
232        return insertion_point_clamped, element
233
234    def _basic_cell_simulator(self):
235        """A simplified simulator to confirm our code is working."""
236
237        # Patch join to set the kill flag before waiting
238        old_join_method = threading.current_thread().join
239        threading.current_thread().join = lambda: [self._thread_exit_flag.set(), old_join_method()]
240
241        volts_list = [0.0, 2.55, 2.83, 3.109, 3.42, 3.74, 4.2]
242        rate_list = [0.0, 0.000877, 0.000357, 0.000148, 8.63e-05, 8.88e-05, 0.000127]
243
244        self.ohms = 0.35
245        self.volts = volts_list[-1]
246
247        self._timer.reset()
248        while not self._thread_exit_flag.is_set():
249            delta_t = self._timer.elapsed_time
250            self._thread_pause_flag.wait()
251            self._timer.reset()
252
253            with self._lock:  # Update voltage atomically
254                rate = rate_list[self._bisect_and_clamp(volts_list, self.volts)[0]]
255                delta_v = rate * self.amps * delta_t  # dV/mAS times the rate divided by millivolts
256                self.volts -= delta_v
257
258            self._thread_exit_flag.wait(2)  # Sleep for a little bit
259
260        logger.write_info_to_report(f"Cell {self.id} thread died")
261
262    def _advanced_cell_simulator(self):
263        """The full cell simulator."""
264
265        # Patch join to set the kill flag before waiting
266        old_join_method = threading.current_thread().join
267        threading.current_thread().join = lambda: [self._thread_exit_flag.set(), old_join_method()]
268
269        # Wait until voltage is set
270        while not self._thread_exit_flag.is_set() and self.volts == 0:
271            self._thread_exit_flag.wait(0.5)  # Sleep for a little bit
272
273        # Interpolate impedance with voltage and temperature
274        temp_low, temperature, temp_high = self._temperature_data()
275        ohm_lower = self._impedance_from_voltage(temp_low, self.volts)
276        ohm_higher = self._impedance_from_voltage(temp_high, self.volts)
277        self.ohms = self._interpolate_value(ohm_lower, ohm_higher, temp_low, temp_high, temperature)
278
279        # Start the main loop
280        self._timer.reset()
281        while not self._thread_exit_flag.is_set():
282            delta_t = self._timer.elapsed_time
283            self._thread_pause_flag.wait()
284            self._timer.reset()
285
286            temp_low, temperature, temp_high = self._temperature_data()
287            with self._lock:  # Update voltage and impedance atomically
288                # Calculate new voltage
289                rate_low = self._rate_from_voltage(temp_low, self.volts)
290                rate_high = self._rate_from_voltage(temp_high, self.volts)
291                rate = self._interpolate_value(rate_low, rate_high, temp_low, temp_high, temperature)
292                delta_v = rate * self.amps * delta_t / 3600  # dV/dQ * mAh
293                self.volts -= delta_v
294
295                # Calculate impedance from new voltage
296                ohm_lower = self._impedance_from_voltage(temp_low, self.volts)
297                ohm_higher = self._impedance_from_voltage(temp_high, self.volts)
298                self.ohms = self._interpolate_value(ohm_lower, ohm_higher, temp_low, temp_high, temperature)
299
300            self._thread_exit_flag.wait(2)  # Sleep for a little bit
301
302        logger.write_debug_to_report(f"Cell {self.id} thread died")
303
304    def _interpolate_value(self, low_a: float, high_a: float, low_b: float, high_b: float, b: float) -> float:
305        """Interpolate A in range A based on where B lies in range B."""
306        return low_a + (b - low_b) * (high_a - low_a) / (high_b - low_b)
307
308    def _temperature_data(self) -> tuple[float, float, float]:
309        """Get the temperature, and the two boundaries between which the given temperature lies."""
310        temp_list = list(self.data.volts)  # Get the complete temperature range (keys in volts dict)
311        temperature_reading = (plateset.thermistor1 + plateset.thermistor2) / 2
312        temp_index, temperature_reading = self._bisect_and_clamp(temp_list, temperature_reading)
313        return temp_list[temp_index - 1], temperature_reading, temp_list[temp_index]
314
315    def _impedance_from_voltage(self, temperature_key: float, voltage: float) -> float:
316        """Calculate the impedance from a temperature and voltage."""
317        ohms_list = self.data.impedance[temperature_key]
318        volts_list = self.data.volts[temperature_key]
319        volts_index, voltage = self._bisect_and_clamp(volts_list, voltage)  # Determine two closest data point indexes
320        return self._interpolate_value(
321            ohms_list[volts_index - 1],
322            ohms_list[volts_index],
323            volts_list[volts_index - 1],
324            volts_list[volts_index],
325            voltage,
326        )
327
328    def volts_to_soc(self, voltage: float | None) -> float | None:
329        """Calculate the state of charge from a voltage."""
330        logger.write_debug_to_report(f"Got voltage {voltage}")
331        if voltage is None:
332            return None
333
334        temp_low, temperature, temp_high = self._temperature_data()
335
336        volts_list = self.data.volts[temp_low]
337        volts_index, voltage = self._bisect_and_clamp(volts_list, voltage)  # Determine two closest data points
338        temp_low_soc = self._interpolate_value(
339            self.data.soc[volts_index - 1],
340            self.data.soc[volts_index],
341            volts_list[volts_index - 1],
342            volts_list[volts_index],
343            voltage,
344        )
345
346        volts_list = self.data.volts[temp_high]
347        volts_index, voltage = self._bisect_and_clamp(volts_list, voltage)  # Determine two closest data points
348        temp_high_soc = self._interpolate_value(
349            self.data.soc[volts_index - 1],
350            self.data.soc[volts_index],
351            volts_list[volts_index - 1],
352            volts_list[volts_index],
353            voltage,
354        )
355
356        result = self._interpolate_value(temp_low_soc, temp_high_soc, temp_low, temp_high, temperature)
357        logger.write_debug_to_report(
358            f"Got result {[result, temp_low_soc, temp_high_soc, temp_low, temp_high, temperature]}"
359        )
360        return result
361
362    @property
363    def state_of_charge(self) -> float:
364        """Calculate the state of charge of the simulator."""
365
366        return self.volts_to_soc(self.volts) or 0.0
367
368    @state_of_charge.setter
369    def state_of_charge(self, new_soc: float):
370        """Set the voltage of the simulator based on the state of charge."""
371        temp_low, temperature, temp_high = self._temperature_data()
372
373        volts_list = self.data.volts[temp_low]
374        soc_index, soc = self._bisect_and_clamp(self.data.soc, new_soc)
375        temp_low_volts = self._interpolate_value(
376            volts_list[soc_index - 1],
377            volts_list[soc_index],
378            self.data.soc[soc_index - 1],
379            self.data.soc[soc_index],
380            soc,
381        )
382
383        volts_list = self.data.volts[temp_high]
384        soc_index, soc = self._bisect_and_clamp(self.data.soc, new_soc)
385        temp_high_volts = self._interpolate_value(
386            volts_list[soc_index - 1],
387            volts_list[soc_index],
388            self.data.soc[soc_index - 1],
389            self.data.soc[soc_index],
390            soc,
391        )
392
393        self.volts = self._interpolate_value(temp_low_volts, temp_high_volts, temp_low, temp_high, temperature)
394
395    def _rate_from_voltage(self, temperature_key: float, voltage: float) -> float:
396        """Calculate the change rate from a temperature and voltage."""
397        volts_list = self.data.volts[temperature_key]
398        volts_index, voltage = self._bisect_and_clamp(volts_list, voltage)
399
400        delta_soc = self.data.soc[volts_index] - self.data.soc[volts_index - 1]
401        delta_v = volts_list[volts_index] - volts_list[volts_index - 1]
402        delta_q = delta_soc * self.data.capacity
403        return float(delta_v / delta_q)
CELL_DATA_FILE = PosixPath('/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/site-packages/hitl_tester/modules/bms/cell_data.json')
CELL_PARALLEL_COUNT = 3
MAX_ATTEMPTS = 60
class CellData:
54class CellData:
55    """A datastructure for all cell properties."""
56
57    def __init__(self, filename: Path, cell_chemistry: str):
58        with open(filename, encoding="utf-8") as json_file:
59            self._raw_data = json.load(json_file)[cell_chemistry or "ZERO"]
60            self.capacity: float = self._raw_data["capacity"] * CELL_PARALLEL_COUNT
61            self.ov_protection: float = self._raw_data["overvoltage_protection"]
62            self.uv_protection: float = self._raw_data["undervoltage_protection"]
63            self.soc: list[float] = self._raw_data["soc"]  # State of charge (header for voltage / impedance)
64            self.impedance: dict[float, list[float]] = {  # Convert keys to float (not valid in json)
65                float(temperature): [impedance / 1000 / CELL_PARALLEL_COUNT for impedance in impedances]
66                for temperature, impedances in self._raw_data["impedance"].items()
67            }
68            self.volts: dict[float, list[float]] = {  # Convert keys to float (not valid in json)
69                float(temperature): voltage for temperature, voltage in self._raw_data["voltage"].items()
70            }

A datastructure for all cell properties.

CellData(filename: pathlib.Path, cell_chemistry: str)
57    def __init__(self, filename: Path, cell_chemistry: str):
58        with open(filename, encoding="utf-8") as json_file:
59            self._raw_data = json.load(json_file)[cell_chemistry or "ZERO"]
60            self.capacity: float = self._raw_data["capacity"] * CELL_PARALLEL_COUNT
61            self.ov_protection: float = self._raw_data["overvoltage_protection"]
62            self.uv_protection: float = self._raw_data["undervoltage_protection"]
63            self.soc: list[float] = self._raw_data["soc"]  # State of charge (header for voltage / impedance)
64            self.impedance: dict[float, list[float]] = {  # Convert keys to float (not valid in json)
65                float(temperature): [impedance / 1000 / CELL_PARALLEL_COUNT for impedance in impedances]
66                for temperature, impedances in self._raw_data["impedance"].items()
67            }
68            self.volts: dict[float, list[float]] = {  # Convert keys to float (not valid in json)
69                float(temperature): voltage for temperature, voltage in self._raw_data["voltage"].items()
70            }
class Cell:
 73class Cell:
 74    """Agilent 66321 Battery Simulator command wrapper."""
 75
 76    def __init__(self, cell_id: int, resource: SafeResource, cell_chemistry: str):
 77        """
 78        Initialize the 66321 wrapper with a specific PyVISA resource.
 79        """
 80        self.id: int = cell_id
 81        self._resource = resource
 82        self._resource.timeout = float("+inf")
 83        self._volts: float = 0.0
 84        self._resistance: float = 0.0
 85        self._timer = StopWatch()
 86        self._thread_exit_flag = threading.Event()
 87        self._thread_pause_flag = threading.Event()
 88        self._lock = threading.RLock()  # A reentrant lock can acquire multiple times for the same thread
 89        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
 90        self.disengage_safety_protocols: bool = False
 91        self.data = CellData(CELL_DATA_FILE, cell_chemistry)
 92
 93        if cell_chemistry:
 94            # Using concurrent.futures allows us to catch all exceptions raised by the thread
 95            self._executor.submit(self._advanced_cell_simulator).add_done_callback(self._reraise_thread_exceptions)
 96            # In Python 3.9+, concurrent.futures waits for all threads to end before running any atexit
 97            # functions, which we use as destructors. This means we're unable to tell a thread to end,
 98            # causing the program to hang. We could register our destructor with threading, but this is a private
 99            # method, so instead we'll modify join in the thread itself.
100            # threading._register_atexit(self.disable)  # type: ignore[attr-defined]  # Kill thread to prevent hanging
101
102        @atexit.register
103        def __atexit__():
104            """Configure a safe shut down for when the class instance is destroyed."""
105            logger.write_info_to_report(f"Disabling cell {self.id}")
106            if not self._thread_exit_flag.is_set():
107                logger.write_debug_to_report("Killing thread")
108                self._thread_exit_flag.set()
109                self._executor.shutdown(wait=True)
110            self.disable()
111
112    def _reraise_thread_exceptions(self, future: concurrent.futures.Future):  # type: ignore[type-arg]  # Future
113        if future.exception():
114            # _thread.interrupt_main(signal.SIGCHLD)  # Python 3.10+ only
115            logger.write_critical_to_report("Thread encountered an exception", exc_info=future.exception())
116            signal.raise_signal(signal.SIGCHLD)  # Child process stopped or terminated. Kill main thread
117
118    @property
119    def measured_volts(self) -> float:
120        """Measures actual cell voltage."""
121        with self._lock:
122            result = float(self._resource.query(":MEAS:VOLT?"))
123            return result if result != 9.91e37 else 0.0  # Convert NaN to 0
124
125    @property
126    def exact_volts(self) -> float:
127        """Gets the last target voltage."""
128        return self.volts
129
130    @exact_volts.setter
131    def exact_volts(self, target_v):
132        """
133        The cell sim isn't always accurate in the voltage that it actually sets, so we need to adjust it until
134        it's correct.
135        """
136        error = 0.01
137        with self._lock:
138            self.volts = target_v
139            measured = self.measured_volts
140            if not target_v - error <= measured <= target_v + error:  # Might be a stale reading
141                time.sleep(5)
142                measured = self.measured_volts
143            if not target_v - error <= measured <= target_v + error:
144                logger.write_debug_to_report(f"{measured} should be {target_v}")
145                self.volts -= measured - target_v
146                logger.write_debug_to_report(f"-> {self.measured_volts=} {self.volts=}")
147            while not target_v - error <= measured <= target_v + error:
148                logger.write_debug_to_report(f"{measured} should be {target_v}")
149                if target_v < measured:
150                    self.volts -= 0.001
151                else:
152                    self.volts += 0.001
153                measured = self.measured_volts
154            logger.write_debug_to_report(f"-> {self.measured_volts=} {self.volts=}")
155
156    @property
157    def volts(self) -> float:
158        """Gets the last target voltage."""
159        with self._lock:
160            return self._volts
161
162    @volts.setter
163    def volts(self, new_voltage: float):
164        """Sets the output voltage."""
165        with self._lock:
166            if not self.disengage_safety_protocols and new_voltage <= self.data.uv_protection:
167                raise UnderVoltageError(
168                    f"Undervoltage protection triggered at {time.strftime('%x %X')} on cell {self.id}. "
169                    f"Voltage {new_voltage} is lower than {self.data.uv_protection}."
170                )
171            if not self.disengage_safety_protocols and new_voltage >= self.data.ov_protection:
172                raise OverVoltageError(
173                    f"Overvoltage protection triggered at {time.strftime('%x %X')} on cell {self.id}. "
174                    f"Voltage {new_voltage} is higher than {self.data.ov_protection}."
175                )
176
177            self._resource.write(f":VOLT {new_voltage}")
178            self._volts = new_voltage
179            self.enable()  # FIXME(JA): explicitly enable cells
180
181    @property
182    def ohms(self) -> float:
183        """Measures internal resistance of the cell"""
184        with self._lock:
185            return self._resistance
186
187    @ohms.setter
188    def ohms(self, new_ohms: float):
189        """Sets the internal resistance of the cell."""
190        with self._lock:
191            self._resistance = new_ohms
192            self._resource.write(f":RES {new_ohms}")
193
194    @property
195    def amps(self) -> float:
196        """Measures cell current."""
197        with self._lock:
198            result = float(self._resource.query(":MEAS:CURR?"))
199            return result if result != 9.91e37 else 0.0  # Convert NaN to 0
200
201    @property
202    def compensation_mode(self) -> CellCompMode:
203        """Sets compensation mode"""
204        return CellCompMode[self._resource.query(":OUTP:COMP:MODE?")]
205
206    @compensation_mode.setter
207    def compensation_mode(self, mode: CellCompMode):
208        """Sets compensation mode"""
209        self._resource.write(f":OUTP:COMP:MODE {mode.name}")
210
211    def enable(self):
212        """Enables the cell."""
213        self._resource.write(":OUTP ON")
214        self._thread_pause_flag.set()
215
216    def disable(self):
217        """Disables the cell."""
218        self._thread_pause_flag.clear()
219        self._resource.write(":OUTP OFF")
220
221    def reset(self):
222        """Resets the instrument"""
223        self._resource.write("*RST")
224        self.compensation_mode = CellCompMode.LLOCAL
225
226    def _bisect_and_clamp(self, array: list[float], element: float) -> tuple[int, float]:
227        """
228        Clamp an element to be in the valid range of an array, then return it, and its index.
229        Bisect returns an insertion point, which may be an invalid index. Thus, we clamp it.
230        """
231        element = min(max(array), max(min(array), element))
232        insertion_point_clamped = min(len(array) - 1, bisect.bisect(array, element))
233        return insertion_point_clamped, element
234
235    def _basic_cell_simulator(self):
236        """A simplified simulator to confirm our code is working."""
237
238        # Patch join to set the kill flag before waiting
239        old_join_method = threading.current_thread().join
240        threading.current_thread().join = lambda: [self._thread_exit_flag.set(), old_join_method()]
241
242        volts_list = [0.0, 2.55, 2.83, 3.109, 3.42, 3.74, 4.2]
243        rate_list = [0.0, 0.000877, 0.000357, 0.000148, 8.63e-05, 8.88e-05, 0.000127]
244
245        self.ohms = 0.35
246        self.volts = volts_list[-1]
247
248        self._timer.reset()
249        while not self._thread_exit_flag.is_set():
250            delta_t = self._timer.elapsed_time
251            self._thread_pause_flag.wait()
252            self._timer.reset()
253
254            with self._lock:  # Update voltage atomically
255                rate = rate_list[self._bisect_and_clamp(volts_list, self.volts)[0]]
256                delta_v = rate * self.amps * delta_t  # dV/mAS times the rate divided by millivolts
257                self.volts -= delta_v
258
259            self._thread_exit_flag.wait(2)  # Sleep for a little bit
260
261        logger.write_info_to_report(f"Cell {self.id} thread died")
262
263    def _advanced_cell_simulator(self):
264        """The full cell simulator."""
265
266        # Patch join to set the kill flag before waiting
267        old_join_method = threading.current_thread().join
268        threading.current_thread().join = lambda: [self._thread_exit_flag.set(), old_join_method()]
269
270        # Wait until voltage is set
271        while not self._thread_exit_flag.is_set() and self.volts == 0:
272            self._thread_exit_flag.wait(0.5)  # Sleep for a little bit
273
274        # Interpolate impedance with voltage and temperature
275        temp_low, temperature, temp_high = self._temperature_data()
276        ohm_lower = self._impedance_from_voltage(temp_low, self.volts)
277        ohm_higher = self._impedance_from_voltage(temp_high, self.volts)
278        self.ohms = self._interpolate_value(ohm_lower, ohm_higher, temp_low, temp_high, temperature)
279
280        # Start the main loop
281        self._timer.reset()
282        while not self._thread_exit_flag.is_set():
283            delta_t = self._timer.elapsed_time
284            self._thread_pause_flag.wait()
285            self._timer.reset()
286
287            temp_low, temperature, temp_high = self._temperature_data()
288            with self._lock:  # Update voltage and impedance atomically
289                # Calculate new voltage
290                rate_low = self._rate_from_voltage(temp_low, self.volts)
291                rate_high = self._rate_from_voltage(temp_high, self.volts)
292                rate = self._interpolate_value(rate_low, rate_high, temp_low, temp_high, temperature)
293                delta_v = rate * self.amps * delta_t / 3600  # dV/dQ * mAh
294                self.volts -= delta_v
295
296                # Calculate impedance from new voltage
297                ohm_lower = self._impedance_from_voltage(temp_low, self.volts)
298                ohm_higher = self._impedance_from_voltage(temp_high, self.volts)
299                self.ohms = self._interpolate_value(ohm_lower, ohm_higher, temp_low, temp_high, temperature)
300
301            self._thread_exit_flag.wait(2)  # Sleep for a little bit
302
303        logger.write_debug_to_report(f"Cell {self.id} thread died")
304
305    def _interpolate_value(self, low_a: float, high_a: float, low_b: float, high_b: float, b: float) -> float:
306        """Interpolate A in range A based on where B lies in range B."""
307        return low_a + (b - low_b) * (high_a - low_a) / (high_b - low_b)
308
309    def _temperature_data(self) -> tuple[float, float, float]:
310        """Get the temperature, and the two boundaries between which the given temperature lies."""
311        temp_list = list(self.data.volts)  # Get the complete temperature range (keys in volts dict)
312        temperature_reading = (plateset.thermistor1 + plateset.thermistor2) / 2
313        temp_index, temperature_reading = self._bisect_and_clamp(temp_list, temperature_reading)
314        return temp_list[temp_index - 1], temperature_reading, temp_list[temp_index]
315
316    def _impedance_from_voltage(self, temperature_key: float, voltage: float) -> float:
317        """Calculate the impedance from a temperature and voltage."""
318        ohms_list = self.data.impedance[temperature_key]
319        volts_list = self.data.volts[temperature_key]
320        volts_index, voltage = self._bisect_and_clamp(volts_list, voltage)  # Determine two closest data point indexes
321        return self._interpolate_value(
322            ohms_list[volts_index - 1],
323            ohms_list[volts_index],
324            volts_list[volts_index - 1],
325            volts_list[volts_index],
326            voltage,
327        )
328
329    def volts_to_soc(self, voltage: float | None) -> float | None:
330        """Calculate the state of charge from a voltage."""
331        logger.write_debug_to_report(f"Got voltage {voltage}")
332        if voltage is None:
333            return None
334
335        temp_low, temperature, temp_high = self._temperature_data()
336
337        volts_list = self.data.volts[temp_low]
338        volts_index, voltage = self._bisect_and_clamp(volts_list, voltage)  # Determine two closest data points
339        temp_low_soc = self._interpolate_value(
340            self.data.soc[volts_index - 1],
341            self.data.soc[volts_index],
342            volts_list[volts_index - 1],
343            volts_list[volts_index],
344            voltage,
345        )
346
347        volts_list = self.data.volts[temp_high]
348        volts_index, voltage = self._bisect_and_clamp(volts_list, voltage)  # Determine two closest data points
349        temp_high_soc = self._interpolate_value(
350            self.data.soc[volts_index - 1],
351            self.data.soc[volts_index],
352            volts_list[volts_index - 1],
353            volts_list[volts_index],
354            voltage,
355        )
356
357        result = self._interpolate_value(temp_low_soc, temp_high_soc, temp_low, temp_high, temperature)
358        logger.write_debug_to_report(
359            f"Got result {[result, temp_low_soc, temp_high_soc, temp_low, temp_high, temperature]}"
360        )
361        return result
362
363    @property
364    def state_of_charge(self) -> float:
365        """Calculate the state of charge of the simulator."""
366
367        return self.volts_to_soc(self.volts) or 0.0
368
369    @state_of_charge.setter
370    def state_of_charge(self, new_soc: float):
371        """Set the voltage of the simulator based on the state of charge."""
372        temp_low, temperature, temp_high = self._temperature_data()
373
374        volts_list = self.data.volts[temp_low]
375        soc_index, soc = self._bisect_and_clamp(self.data.soc, new_soc)
376        temp_low_volts = self._interpolate_value(
377            volts_list[soc_index - 1],
378            volts_list[soc_index],
379            self.data.soc[soc_index - 1],
380            self.data.soc[soc_index],
381            soc,
382        )
383
384        volts_list = self.data.volts[temp_high]
385        soc_index, soc = self._bisect_and_clamp(self.data.soc, new_soc)
386        temp_high_volts = self._interpolate_value(
387            volts_list[soc_index - 1],
388            volts_list[soc_index],
389            self.data.soc[soc_index - 1],
390            self.data.soc[soc_index],
391            soc,
392        )
393
394        self.volts = self._interpolate_value(temp_low_volts, temp_high_volts, temp_low, temp_high, temperature)
395
396    def _rate_from_voltage(self, temperature_key: float, voltage: float) -> float:
397        """Calculate the change rate from a temperature and voltage."""
398        volts_list = self.data.volts[temperature_key]
399        volts_index, voltage = self._bisect_and_clamp(volts_list, voltage)
400
401        delta_soc = self.data.soc[volts_index] - self.data.soc[volts_index - 1]
402        delta_v = volts_list[volts_index] - volts_list[volts_index - 1]
403        delta_q = delta_soc * self.data.capacity
404        return float(delta_v / delta_q)

Agilent 66321 Battery Simulator command wrapper.

Cell( cell_id: int, resource: hitl_tester.modules.bms_types.SafeResource, cell_chemistry: str)
 76    def __init__(self, cell_id: int, resource: SafeResource, cell_chemistry: str):
 77        """
 78        Initialize the 66321 wrapper with a specific PyVISA resource.
 79        """
 80        self.id: int = cell_id
 81        self._resource = resource
 82        self._resource.timeout = float("+inf")
 83        self._volts: float = 0.0
 84        self._resistance: float = 0.0
 85        self._timer = StopWatch()
 86        self._thread_exit_flag = threading.Event()
 87        self._thread_pause_flag = threading.Event()
 88        self._lock = threading.RLock()  # A reentrant lock can acquire multiple times for the same thread
 89        self._executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
 90        self.disengage_safety_protocols: bool = False
 91        self.data = CellData(CELL_DATA_FILE, cell_chemistry)
 92
 93        if cell_chemistry:
 94            # Using concurrent.futures allows us to catch all exceptions raised by the thread
 95            self._executor.submit(self._advanced_cell_simulator).add_done_callback(self._reraise_thread_exceptions)
 96            # In Python 3.9+, concurrent.futures waits for all threads to end before running any atexit
 97            # functions, which we use as destructors. This means we're unable to tell a thread to end,
 98            # causing the program to hang. We could register our destructor with threading, but this is a private
 99            # method, so instead we'll modify join in the thread itself.
100            # threading._register_atexit(self.disable)  # type: ignore[attr-defined]  # Kill thread to prevent hanging
101
102        @atexit.register
103        def __atexit__():
104            """Configure a safe shut down for when the class instance is destroyed."""
105            logger.write_info_to_report(f"Disabling cell {self.id}")
106            if not self._thread_exit_flag.is_set():
107                logger.write_debug_to_report("Killing thread")
108                self._thread_exit_flag.set()
109                self._executor.shutdown(wait=True)
110            self.disable()

Initialize the 66321 wrapper with a specific PyVISA resource.

id: int
disengage_safety_protocols: bool
data
measured_volts: float
118    @property
119    def measured_volts(self) -> float:
120        """Measures actual cell voltage."""
121        with self._lock:
122            result = float(self._resource.query(":MEAS:VOLT?"))
123            return result if result != 9.91e37 else 0.0  # Convert NaN to 0

Measures actual cell voltage.

exact_volts: float
125    @property
126    def exact_volts(self) -> float:
127        """Gets the last target voltage."""
128        return self.volts

Gets the last target voltage.

volts: float
156    @property
157    def volts(self) -> float:
158        """Gets the last target voltage."""
159        with self._lock:
160            return self._volts

Gets the last target voltage.

ohms: float
181    @property
182    def ohms(self) -> float:
183        """Measures internal resistance of the cell"""
184        with self._lock:
185            return self._resistance

Measures internal resistance of the cell

amps: float
194    @property
195    def amps(self) -> float:
196        """Measures cell current."""
197        with self._lock:
198            result = float(self._resource.query(":MEAS:CURR?"))
199            return result if result != 9.91e37 else 0.0  # Convert NaN to 0

Measures cell current.

compensation_mode: hitl_tester.modules.bms_types.CellCompMode
201    @property
202    def compensation_mode(self) -> CellCompMode:
203        """Sets compensation mode"""
204        return CellCompMode[self._resource.query(":OUTP:COMP:MODE?")]

Sets compensation mode

def enable(self):
211    def enable(self):
212        """Enables the cell."""
213        self._resource.write(":OUTP ON")
214        self._thread_pause_flag.set()

Enables the cell.

def disable(self):
216    def disable(self):
217        """Disables the cell."""
218        self._thread_pause_flag.clear()
219        self._resource.write(":OUTP OFF")

Disables the cell.

def reset(self):
221    def reset(self):
222        """Resets the instrument"""
223        self._resource.write("*RST")
224        self.compensation_mode = CellCompMode.LLOCAL

Resets the instrument

def volts_to_soc(self, voltage: float | None) -> float | None:
329    def volts_to_soc(self, voltage: float | None) -> float | None:
330        """Calculate the state of charge from a voltage."""
331        logger.write_debug_to_report(f"Got voltage {voltage}")
332        if voltage is None:
333            return None
334
335        temp_low, temperature, temp_high = self._temperature_data()
336
337        volts_list = self.data.volts[temp_low]
338        volts_index, voltage = self._bisect_and_clamp(volts_list, voltage)  # Determine two closest data points
339        temp_low_soc = self._interpolate_value(
340            self.data.soc[volts_index - 1],
341            self.data.soc[volts_index],
342            volts_list[volts_index - 1],
343            volts_list[volts_index],
344            voltage,
345        )
346
347        volts_list = self.data.volts[temp_high]
348        volts_index, voltage = self._bisect_and_clamp(volts_list, voltage)  # Determine two closest data points
349        temp_high_soc = self._interpolate_value(
350            self.data.soc[volts_index - 1],
351            self.data.soc[volts_index],
352            volts_list[volts_index - 1],
353            volts_list[volts_index],
354            voltage,
355        )
356
357        result = self._interpolate_value(temp_low_soc, temp_high_soc, temp_low, temp_high, temperature)
358        logger.write_debug_to_report(
359            f"Got result {[result, temp_low_soc, temp_high_soc, temp_low, temp_high, temperature]}"
360        )
361        return result

Calculate the state of charge from a voltage.

state_of_charge: float
363    @property
364    def state_of_charge(self) -> float:
365        """Calculate the state of charge of the simulator."""
366
367        return self.volts_to_soc(self.volts) or 0.0

Calculate the state of charge of the simulator.