hitl_tester.modules.bms.i2c_driver
Captures i2c traffic to a csv file.
1"""Captures i2c traffic to a csv file.""" 2 3import atexit 4import csv 5import threading 6import time 7from pathlib import Path 8from typing import ClassVar 9 10import pytest 11from i2cdriver import I2CDriver 12from typing_extensions import Self 13 14from hitl_tester.modules.bms_types import BMSFlags 15from hitl_tester.modules.logger import logger 16 17 18class I2CFileSniffer: 19 """Log i2c traffic to a CSV.""" 20 21 instance: ClassVar[Self | None] = None 22 23 def __new__(cls, packet_buffer_size: int = 4096): 24 """Make this class a singleton.""" 25 if cls.instance is None: 26 cls.instance = super().__new__(cls) 27 return cls.instance 28 29 def __init__(self): 30 31 self.daemon = None 32 if ( 33 hasattr(pytest, "flags") 34 and isinstance(pytest.flags, BMSFlags) 35 and not (pytest.flags.doc_generation or pytest.flags.dry_run) 36 and "i2c_sniffer_id" in pytest.flags.config 37 ): 38 # Set up logging 39 self.report_path = Path(f"{pytest.flags.report_filename}_i2c_traffic.csv") 40 41 # Start i2c monitor thread 42 self.i2c_port = I2CDriver(f"/dev/serial/by-id/{pytest.flags.config['i2c_sniffer_id']}") 43 self.daemon = threading.Thread(target=self.runtime, daemon=True) 44 self.exit_event = threading.Event() 45 self.start_time = 0 46 47 @atexit.register 48 def __atexit__(): 49 """Configure a safe shut down for when the class instance is destroyed.""" 50 logger.write_debug_to_report("Killing i2cdriver thread...") 51 self.stop() 52 53 def start(self, start_time: float = 0): 54 """Start the thread.""" 55 self.start_time = start_time or time.perf_counter() 56 if self.daemon and not self.daemon.is_alive(): 57 self.exit_event.clear() 58 self.daemon.start() 59 60 def stop(self): 61 """Stop the thread.""" 62 if self.daemon and self.daemon.is_alive(): 63 self.exit_event.set() 64 time.sleep(1) 65 66 def runtime(self): 67 """Fetch tokens as quickly as possible.""" 68 capture_generator = self.i2c_port.capture_start() 69 70 # Save to file 71 with open(self.report_path, "a", encoding="UTF-8") as csvfile: 72 writer = csv.writer(csvfile) 73 writer.writerow(("Type", "RW", "Data", "Ack", "Elapsed Time (s)")) 74 for token in capture_generator(): 75 writer.writerow((None, None, None, None, str(time.perf_counter() - self.start_time))) 76 token.dump(writer, "csv") # Write to file 77 csvfile.flush() # Save row immediately 78 if self.exit_event.is_set(): 79 break
class
I2CFileSniffer:
19class I2CFileSniffer: 20 """Log i2c traffic to a CSV.""" 21 22 instance: ClassVar[Self | None] = None 23 24 def __new__(cls, packet_buffer_size: int = 4096): 25 """Make this class a singleton.""" 26 if cls.instance is None: 27 cls.instance = super().__new__(cls) 28 return cls.instance 29 30 def __init__(self): 31 32 self.daemon = None 33 if ( 34 hasattr(pytest, "flags") 35 and isinstance(pytest.flags, BMSFlags) 36 and not (pytest.flags.doc_generation or pytest.flags.dry_run) 37 and "i2c_sniffer_id" in pytest.flags.config 38 ): 39 # Set up logging 40 self.report_path = Path(f"{pytest.flags.report_filename}_i2c_traffic.csv") 41 42 # Start i2c monitor thread 43 self.i2c_port = I2CDriver(f"/dev/serial/by-id/{pytest.flags.config['i2c_sniffer_id']}") 44 self.daemon = threading.Thread(target=self.runtime, daemon=True) 45 self.exit_event = threading.Event() 46 self.start_time = 0 47 48 @atexit.register 49 def __atexit__(): 50 """Configure a safe shut down for when the class instance is destroyed.""" 51 logger.write_debug_to_report("Killing i2cdriver thread...") 52 self.stop() 53 54 def start(self, start_time: float = 0): 55 """Start the thread.""" 56 self.start_time = start_time or time.perf_counter() 57 if self.daemon and not self.daemon.is_alive(): 58 self.exit_event.clear() 59 self.daemon.start() 60 61 def stop(self): 62 """Stop the thread.""" 63 if self.daemon and self.daemon.is_alive(): 64 self.exit_event.set() 65 time.sleep(1) 66 67 def runtime(self): 68 """Fetch tokens as quickly as possible.""" 69 capture_generator = self.i2c_port.capture_start() 70 71 # Save to file 72 with open(self.report_path, "a", encoding="UTF-8") as csvfile: 73 writer = csv.writer(csvfile) 74 writer.writerow(("Type", "RW", "Data", "Ack", "Elapsed Time (s)")) 75 for token in capture_generator(): 76 writer.writerow((None, None, None, None, str(time.perf_counter() - self.start_time))) 77 token.dump(writer, "csv") # Write to file 78 csvfile.flush() # Save row immediately 79 if self.exit_event.is_set(): 80 break
Log i2c traffic to a CSV.
I2CFileSniffer(packet_buffer_size: int = 4096)
24 def __new__(cls, packet_buffer_size: int = 4096): 25 """Make this class a singleton.""" 26 if cls.instance is None: 27 cls.instance = super().__new__(cls) 28 return cls.instance
Make this class a singleton.
def
start(self, start_time: float = 0):
54 def start(self, start_time: float = 0): 55 """Start the thread.""" 56 self.start_time = start_time or time.perf_counter() 57 if self.daemon and not self.daemon.is_alive(): 58 self.exit_event.clear() 59 self.daemon.start()
Start the thread.
def
stop(self):
61 def stop(self): 62 """Stop the thread.""" 63 if self.daemon and self.daemon.is_alive(): 64 self.exit_event.set() 65 time.sleep(1)
Stop the thread.
def
runtime(self):
67 def runtime(self): 68 """Fetch tokens as quickly as possible.""" 69 capture_generator = self.i2c_port.capture_start() 70 71 # Save to file 72 with open(self.report_path, "a", encoding="UTF-8") as csvfile: 73 writer = csv.writer(csvfile) 74 writer.writerow(("Type", "RW", "Data", "Ack", "Elapsed Time (s)")) 75 for token in capture_generator(): 76 writer.writerow((None, None, None, None, str(time.perf_counter() - self.start_time))) 77 token.dump(writer, "csv") # Write to file 78 csvfile.flush() # Save row immediately 79 if self.exit_event.is_set(): 80 break
Fetch tokens as quickly as possible.