hitl_tester.modules.bms.i2c_driver

Captures i2c traffic to a csv file.

 1"""Captures i2c traffic to a csv file."""
 2
 3import atexit
 4import csv
 5import threading
 6import time
 7from pathlib import Path
 8from typing import ClassVar
 9
10import pytest
11from i2cdriver import I2CDriver
12from typing_extensions import Self
13
14from hitl_tester.modules.bms_types import BMSFlags
15from hitl_tester.modules.logger import logger
16
17
18class I2CFileSniffer:
19    """Log i2c traffic to a CSV."""
20
21    instance: ClassVar[Self | None] = None
22
23    def __new__(cls, packet_buffer_size: int = 4096):
24        """Make this class a singleton."""
25        if cls.instance is None:
26            cls.instance = super().__new__(cls)
27        return cls.instance
28
29    def __init__(self):
30
31        self.daemon = None
32        if (
33            hasattr(pytest, "flags")
34            and isinstance(pytest.flags, BMSFlags)
35            and not (pytest.flags.doc_generation or pytest.flags.dry_run)
36            and "i2c_sniffer_id" in pytest.flags.config
37        ):
38            # Set up logging
39            self.report_path = Path(f"{pytest.flags.report_filename}_i2c_traffic.csv")
40
41            # Start i2c monitor thread
42            self.i2c_port = I2CDriver(f"/dev/serial/by-id/{pytest.flags.config['i2c_sniffer_id']}")
43            self.daemon = threading.Thread(target=self.runtime, daemon=True)
44            self.exit_event = threading.Event()
45            self.start_time = 0
46
47        @atexit.register
48        def __atexit__():
49            """Configure a safe shut down for when the class instance is destroyed."""
50            logger.write_debug_to_report("Killing i2cdriver thread...")
51            self.stop()
52
53    def start(self, start_time: float = 0):
54        """Start the thread."""
55        self.start_time = start_time or time.perf_counter()
56        if self.daemon and not self.daemon.is_alive():
57            self.exit_event.clear()
58            self.daemon.start()
59
60    def stop(self):
61        """Stop the thread."""
62        if self.daemon and self.daemon.is_alive():
63            self.exit_event.set()
64            time.sleep(1)
65
66    def runtime(self):
67        """Fetch tokens as quickly as possible."""
68        capture_generator = self.i2c_port.capture_start()
69
70        # Save to file
71        with open(self.report_path, "a", encoding="UTF-8") as csvfile:
72            writer = csv.writer(csvfile)
73            writer.writerow(("Type", "RW", "Data", "Ack", "Elapsed Time (s)"))
74            for token in capture_generator():
75                writer.writerow((None, None, None, None, str(time.perf_counter() - self.start_time)))
76                token.dump(writer, "csv")  # Write to file
77                csvfile.flush()  # Save row immediately
78                if self.exit_event.is_set():
79                    break
class I2CFileSniffer:
19class I2CFileSniffer:
20    """Log i2c traffic to a CSV."""
21
22    instance: ClassVar[Self | None] = None
23
24    def __new__(cls, packet_buffer_size: int = 4096):
25        """Make this class a singleton."""
26        if cls.instance is None:
27            cls.instance = super().__new__(cls)
28        return cls.instance
29
30    def __init__(self):
31
32        self.daemon = None
33        if (
34            hasattr(pytest, "flags")
35            and isinstance(pytest.flags, BMSFlags)
36            and not (pytest.flags.doc_generation or pytest.flags.dry_run)
37            and "i2c_sniffer_id" in pytest.flags.config
38        ):
39            # Set up logging
40            self.report_path = Path(f"{pytest.flags.report_filename}_i2c_traffic.csv")
41
42            # Start i2c monitor thread
43            self.i2c_port = I2CDriver(f"/dev/serial/by-id/{pytest.flags.config['i2c_sniffer_id']}")
44            self.daemon = threading.Thread(target=self.runtime, daemon=True)
45            self.exit_event = threading.Event()
46            self.start_time = 0
47
48        @atexit.register
49        def __atexit__():
50            """Configure a safe shut down for when the class instance is destroyed."""
51            logger.write_debug_to_report("Killing i2cdriver thread...")
52            self.stop()
53
54    def start(self, start_time: float = 0):
55        """Start the thread."""
56        self.start_time = start_time or time.perf_counter()
57        if self.daemon and not self.daemon.is_alive():
58            self.exit_event.clear()
59            self.daemon.start()
60
61    def stop(self):
62        """Stop the thread."""
63        if self.daemon and self.daemon.is_alive():
64            self.exit_event.set()
65            time.sleep(1)
66
67    def runtime(self):
68        """Fetch tokens as quickly as possible."""
69        capture_generator = self.i2c_port.capture_start()
70
71        # Save to file
72        with open(self.report_path, "a", encoding="UTF-8") as csvfile:
73            writer = csv.writer(csvfile)
74            writer.writerow(("Type", "RW", "Data", "Ack", "Elapsed Time (s)"))
75            for token in capture_generator():
76                writer.writerow((None, None, None, None, str(time.perf_counter() - self.start_time)))
77                token.dump(writer, "csv")  # Write to file
78                csvfile.flush()  # Save row immediately
79                if self.exit_event.is_set():
80                    break

Log i2c traffic to a CSV.

I2CFileSniffer(packet_buffer_size: int = 4096)
24    def __new__(cls, packet_buffer_size: int = 4096):
25        """Make this class a singleton."""
26        if cls.instance is None:
27            cls.instance = super().__new__(cls)
28        return cls.instance

Make this class a singleton.

instance: ClassVar[Optional[Self]] = <I2CFileSniffer object>
daemon
def start(self, start_time: float = 0):
54    def start(self, start_time: float = 0):
55        """Start the thread."""
56        self.start_time = start_time or time.perf_counter()
57        if self.daemon and not self.daemon.is_alive():
58            self.exit_event.clear()
59            self.daemon.start()

Start the thread.

def stop(self):
61    def stop(self):
62        """Stop the thread."""
63        if self.daemon and self.daemon.is_alive():
64            self.exit_event.set()
65            time.sleep(1)

Stop the thread.

def runtime(self):
67    def runtime(self):
68        """Fetch tokens as quickly as possible."""
69        capture_generator = self.i2c_port.capture_start()
70
71        # Save to file
72        with open(self.report_path, "a", encoding="UTF-8") as csvfile:
73            writer = csv.writer(csvfile)
74            writer.writerow(("Type", "RW", "Data", "Ack", "Elapsed Time (s)"))
75            for token in capture_generator():
76                writer.writerow((None, None, None, None, str(time.perf_counter() - self.start_time)))
77                token.dump(writer, "csv")  # Write to file
78                csvfile.flush()  # Save row immediately
79                if self.exit_event.is_set():
80                    break

Fetch tokens as quickly as possible.