hitl_tester.modules.cyber_6t.canbus

Functions for processing J1939 messages.

(c) 2020-2024 TurnAround Factor, Inc.

#

CUI DISTRIBUTION CONTROL

Controlled by: DLA J68 R&D SBIP

CUI Category: Small Business Research and Technology

Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS

POC: GOV SBIP Program Manager Denise Price, 571-767-0111

Distribution authorized to U.S. Government Agencies only, to protect information not owned by the

U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that

it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests

for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,

Fort Belvoir, VA 22060-6221

#

SBIR DATA RIGHTS

Contract No.:SP4701-23-C-0083

Contractor Name: TurnAround Factor, Inc.

Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005

Expiration of SBIR Data Rights Period: September 24, 2029

The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer

software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights

in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause

contained in the above identified contract. No restrictions apply after the expiration date shown above. Any

reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce

the markings.

  1"""
  2Functions for processing J1939 messages.
  3
  4# (c) 2020-2024 TurnAround Factor, Inc.
  5#
  6# CUI DISTRIBUTION CONTROL
  7# Controlled by: DLA J68 R&D SBIP
  8# CUI Category: Small Business Research and Technology
  9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111
 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
 15# Fort Belvoir, VA 22060-6221
 16#
 17# SBIR DATA RIGHTS
 18# Contract No.:SP4701-23-C-0083
 19# Contractor Name: TurnAround Factor, Inc.
 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
 21# Expiration of SBIR Data Rights Period: September 24, 2029
 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
 27# the markings.
 28"""
 29
 30from __future__ import annotations
 31
 32import queue
 33import shutil
 34import textwrap
 35from dataclasses import dataclass
 36from math import ceil
 37
 38import can
 39from colorama import init, Fore
 40from hitl_tester.modules.cyber_6t.j1939da import PGNType, PGN
 41from hitl_tester.modules.cyber_6t.spn_types import Address, addresses
 42
 43from hitl_tester.modules.logger import logger
 44
 45init(autoreset=True)
 46
 47
 48@dataclass
 49class Bits:
 50    """Simplifies working with bits."""
 51
 52    data: int = 0
 53
 54    def peek(self, bits: int) -> int:
 55        """Get bits from data."""
 56        return self.data & (1 << bits) - 1
 57
 58    def consume(self, bits: int) -> int:
 59        """Get bits and shift."""
 60        result = self.peek(bits)
 61        self.data >>= bits
 62        return result
 63
 64    def produce(self, bits: int, data: int):
 65        """Append bits from data."""
 66        self.data = (self.data << bits) | (data & (1 << bits) - 1)
 67
 68
 69class CANFrame:
 70    """A complete CAN frame."""
 71
 72    def __init__(
 73        self,
 74        pgn: PGNType,
 75        data: list[float],
 76        source_address: int = Address.DIAGNOSTIC_TOOL_1,
 77        destination_address: int = Address.GLOBAL,
 78        priority: int = 6,
 79    ):
 80        self.source_address = source_address
 81        self.destination_address = destination_address
 82        self.priority = priority
 83        self.data = data
 84        self.pgn = PGN[pgn.name if pgn.id == -1 else pgn.id, self.data]
 85        if len(data) != len(self.pgn.data_field):
 86            raise TypeError(f"Expected {len(self.pgn.data_field)} arguments for data, got {len(data)}.")
 87
 88    @classmethod
 89    def decode(cls, can_id: int, data: int | bytes) -> CANFrame:
 90        """Decode frame from raw can ID and data."""
 91
 92        # Decode CAN ID
 93        can_id_bits = Bits(can_id)
 94        source_address = can_id_bits.consume(8)
 95        pdu_specific = can_id_bits.consume(8)
 96        pdu_format = can_id_bits.consume(8)
 97        data_page = can_id_bits.consume(2)  # Always 0 or 1 for J1939 frames
 98        priority = can_id_bits.consume(3)
 99        if pdu_format < 240:  # Addressable
100            destination_address = pdu_specific
101            pgn = PGN[(data_page << 16) | (pdu_format << 8)]
102        else:  # Broadcast
103            destination_address = Address.GLOBAL
104            pgn = PGN[(data_page << 16) | (pdu_format << 8 | pdu_specific)]
105
106        # Decode data
107        if isinstance(data, int):
108            byte_length = ceil(sum(spn.length for spn in pgn.data_field) / 8)
109            data = data.to_bytes(byte_length, byteorder="big")
110        data = int.from_bytes(data, byteorder="little")
111        data_bits = Bits(data)
112        partitioned_data: list[float] = []
113        pgn = PGN[pgn.id, [data_bits.peek(8)]]  # FIXME(JA): for now we'll assume 8 bits
114        for spn in pgn.data_field:
115            partitioned_data.append(spn.decode(data_bits.consume(spn.length)))
116        return cls(pgn, partitioned_data, source_address, destination_address, priority)
117
118    def message(self) -> can.Message:
119        """Get this frame as a message object."""
120        return can.Message(arbitration_id=self.can_id, is_extended_id=True, data=self.packed_data)
121
122    @property
123    def can_id(self) -> int:
124        """Encode a raw can ID."""
125        bits = Bits()
126        bits.produce(3, self.priority)
127        bits.produce(2, self.pgn.data_page)
128        pdu_format = self.pgn.id >> 8
129        bits.produce(8, self.pgn.id >> 8)
130        bits.produce(8, self.destination_address if pdu_format < 240 else self.pgn.id)
131        bits.produce(8, self.source_address)
132        return int(bits.data)
133
134    @property
135    def packed_data(self) -> bytes:
136        """Encode spn data to packed bytes."""
137        bits = Bits()
138        total_bits = 0
139        self.pgn = PGN[self.pgn.id, self.data]
140        for spn, data in zip(self.pgn.data_field[::-1], self.data[::-1]):
141            bits.produce(spn.length, spn.encode(data))
142            total_bits += spn.length
143        length = max(ceil(total_bits / 8), 1)
144        return bits.data.to_bytes(length, byteorder="little")
145
146    def __repr__(self) -> str:
147        return f"{self.can_id:02X}#{self.packed_data.hex().upper()}"
148
149    def __str__(self):
150        width = shutil.get_terminal_size((80, 20)).columns
151        lines = []
152        title = f"[FRAME {self!r}]"
153        lines.append(f"{Fore.GREEN}{title:=^{width}}{Fore.RESET}")
154        lines.append(f"Source: {self.source_address} ({addresses(self.source_address)})")
155        lines.append(f"Destination: {self.destination_address} ({addresses(self.destination_address)})")
156        lines.append(f"Priority: {self.priority}")
157        lines.append(f"PGN:\n{textwrap.indent(str(self.pgn), '   ')}")
158
159        lines.append("Data Fields:")
160        self.pgn = PGN[self.pgn.id, self.data]
161        for spn, data in zip(self.pgn.data_field, self.data):
162            if spn.name == "Reserved":
163                continue
164            description = f"({desc})" if (desc := spn.data_type(data)) else ""
165            lines.append(textwrap.indent(f"{spn.name}: {data} {description}", "   "))
166
167        lines.append(f"{Fore.GREEN}{'=' * width}{Fore.RESET}")
168        return "\n".join(lines)
169
170
171class CANBus:
172    """Manage a CAN bus."""
173
174    def __init__(self, channel: str = "can2"):
175        self.bus = can.Bus(channel=channel, interface="socketcan")
176        self.buffer = can.BufferedReader()
177        self.notifier = can.Notifier(self.bus, [self.buffer])
178
179    def __enter__(self):
180        """Return this object for use in the context."""
181        return self
182
183    def __exit__(self, exc_type, exc_value, exc_traceback):
184        """Configure a safe shut down for when the class instance is destroyed."""
185        self.notifier.stop(timeout=5)
186        self.bus.shutdown()
187
188    def send_message(self, message: can.Message) -> bool:
189        """Send a CAN message."""
190        try:
191            self.bus.send(message)
192            return True
193        except can.CanError:
194            print("Message not sent.")
195            return False
196
197    def read_input(self, timeout: float = 10) -> can.Message | None:
198        """Get any messages received."""
199        try:
200            return self.buffer.get_message(timeout)  # FIXME(JA): filter for messages
201        except queue.Empty:
202            logger.write_error_to_report(f"No response after {timeout} seconds. Is the 6T charged and on?")
203            return None
204
205    def send_frame(self, frame: CANFrame) -> bool:
206        """Send a CAN frame."""
207        return self.send_message(frame.message())
208
209    def read_frame(self, timeout: float | None = None) -> CANFrame | None:
210        """Return a decoded frame."""
211        received_message = self.read_input(timeout) if timeout else self.read_input()
212        if received_message:
213            return CANFrame.decode(received_message.arbitration_id, received_message.data)
214        return None
215
216    def process_call(self, message_frame: CANFrame, timeout: float | None = None) -> CANFrame | None:
217        """Send a message and read a response."""
218        if self.send_message(message_frame.message()):
219            return self.read_frame(timeout)
220        return None
221
222
223if __name__ == "__main__":
224    frame_get_all_addresses = CANFrame(
225        source_address=Address.DIAGNOSTIC_TOOL_1,
226        destination_address=Address.GLOBAL,
227        pgn=PGN["Request"],  # Example showing how to get PGNs via a full name
228        data=[PGN["Address Claimed"].id],  # Abbreviation, and ID are also valid (e.g. PGN["AC"], PGN[0xEE00])
229    )
230
231    # print(CANFrame.decode(0x1CD8FAE8, 0x0019090100060000))  # Example to manually decode frame
232
233    try:
234        with CANBus("can0") as bus:
235            response = bus.process_call(frame_get_all_addresses)
236            if response:
237                print(CANFrame)
238            else:
239                print("No response :(")
240    except OSError:
241        print("Cannot connect to CAN bus.")
@dataclass
class Bits:
49@dataclass
50class Bits:
51    """Simplifies working with bits."""
52
53    data: int = 0
54
55    def peek(self, bits: int) -> int:
56        """Get bits from data."""
57        return self.data & (1 << bits) - 1
58
59    def consume(self, bits: int) -> int:
60        """Get bits and shift."""
61        result = self.peek(bits)
62        self.data >>= bits
63        return result
64
65    def produce(self, bits: int, data: int):
66        """Append bits from data."""
67        self.data = (self.data << bits) | (data & (1 << bits) - 1)

Simplifies working with bits.

Bits(data: int = 0)
data: int = 0
def peek(self, bits: int) -> int:
55    def peek(self, bits: int) -> int:
56        """Get bits from data."""
57        return self.data & (1 << bits) - 1

Get bits from data.

def consume(self, bits: int) -> int:
59    def consume(self, bits: int) -> int:
60        """Get bits and shift."""
61        result = self.peek(bits)
62        self.data >>= bits
63        return result

Get bits and shift.

def produce(self, bits: int, data: int):
65    def produce(self, bits: int, data: int):
66        """Append bits from data."""
67        self.data = (self.data << bits) | (data & (1 << bits) - 1)

Append bits from data.

class CANFrame:
 70class CANFrame:
 71    """A complete CAN frame."""
 72
 73    def __init__(
 74        self,
 75        pgn: PGNType,
 76        data: list[float],
 77        source_address: int = Address.DIAGNOSTIC_TOOL_1,
 78        destination_address: int = Address.GLOBAL,
 79        priority: int = 6,
 80    ):
 81        self.source_address = source_address
 82        self.destination_address = destination_address
 83        self.priority = priority
 84        self.data = data
 85        self.pgn = PGN[pgn.name if pgn.id == -1 else pgn.id, self.data]
 86        if len(data) != len(self.pgn.data_field):
 87            raise TypeError(f"Expected {len(self.pgn.data_field)} arguments for data, got {len(data)}.")
 88
 89    @classmethod
 90    def decode(cls, can_id: int, data: int | bytes) -> CANFrame:
 91        """Decode frame from raw can ID and data."""
 92
 93        # Decode CAN ID
 94        can_id_bits = Bits(can_id)
 95        source_address = can_id_bits.consume(8)
 96        pdu_specific = can_id_bits.consume(8)
 97        pdu_format = can_id_bits.consume(8)
 98        data_page = can_id_bits.consume(2)  # Always 0 or 1 for J1939 frames
 99        priority = can_id_bits.consume(3)
100        if pdu_format < 240:  # Addressable
101            destination_address = pdu_specific
102            pgn = PGN[(data_page << 16) | (pdu_format << 8)]
103        else:  # Broadcast
104            destination_address = Address.GLOBAL
105            pgn = PGN[(data_page << 16) | (pdu_format << 8 | pdu_specific)]
106
107        # Decode data
108        if isinstance(data, int):
109            byte_length = ceil(sum(spn.length for spn in pgn.data_field) / 8)
110            data = data.to_bytes(byte_length, byteorder="big")
111        data = int.from_bytes(data, byteorder="little")
112        data_bits = Bits(data)
113        partitioned_data: list[float] = []
114        pgn = PGN[pgn.id, [data_bits.peek(8)]]  # FIXME(JA): for now we'll assume 8 bits
115        for spn in pgn.data_field:
116            partitioned_data.append(spn.decode(data_bits.consume(spn.length)))
117        return cls(pgn, partitioned_data, source_address, destination_address, priority)
118
119    def message(self) -> can.Message:
120        """Get this frame as a message object."""
121        return can.Message(arbitration_id=self.can_id, is_extended_id=True, data=self.packed_data)
122
123    @property
124    def can_id(self) -> int:
125        """Encode a raw can ID."""
126        bits = Bits()
127        bits.produce(3, self.priority)
128        bits.produce(2, self.pgn.data_page)
129        pdu_format = self.pgn.id >> 8
130        bits.produce(8, self.pgn.id >> 8)
131        bits.produce(8, self.destination_address if pdu_format < 240 else self.pgn.id)
132        bits.produce(8, self.source_address)
133        return int(bits.data)
134
135    @property
136    def packed_data(self) -> bytes:
137        """Encode spn data to packed bytes."""
138        bits = Bits()
139        total_bits = 0
140        self.pgn = PGN[self.pgn.id, self.data]
141        for spn, data in zip(self.pgn.data_field[::-1], self.data[::-1]):
142            bits.produce(spn.length, spn.encode(data))
143            total_bits += spn.length
144        length = max(ceil(total_bits / 8), 1)
145        return bits.data.to_bytes(length, byteorder="little")
146
147    def __repr__(self) -> str:
148        return f"{self.can_id:02X}#{self.packed_data.hex().upper()}"
149
150    def __str__(self):
151        width = shutil.get_terminal_size((80, 20)).columns
152        lines = []
153        title = f"[FRAME {self!r}]"
154        lines.append(f"{Fore.GREEN}{title:=^{width}}{Fore.RESET}")
155        lines.append(f"Source: {self.source_address} ({addresses(self.source_address)})")
156        lines.append(f"Destination: {self.destination_address} ({addresses(self.destination_address)})")
157        lines.append(f"Priority: {self.priority}")
158        lines.append(f"PGN:\n{textwrap.indent(str(self.pgn), '   ')}")
159
160        lines.append("Data Fields:")
161        self.pgn = PGN[self.pgn.id, self.data]
162        for spn, data in zip(self.pgn.data_field, self.data):
163            if spn.name == "Reserved":
164                continue
165            description = f"({desc})" if (desc := spn.data_type(data)) else ""
166            lines.append(textwrap.indent(f"{spn.name}: {data} {description}", "   "))
167
168        lines.append(f"{Fore.GREEN}{'=' * width}{Fore.RESET}")
169        return "\n".join(lines)

A complete CAN frame.

CANFrame( pgn: hitl_tester.modules.cyber_6t.j1939da.PGNType, data: list[float], source_address: int = <Address.DIAGNOSTIC_TOOL_1: 250>, destination_address: int = <Address.GLOBAL: 255>, priority: int = 6)
73    def __init__(
74        self,
75        pgn: PGNType,
76        data: list[float],
77        source_address: int = Address.DIAGNOSTIC_TOOL_1,
78        destination_address: int = Address.GLOBAL,
79        priority: int = 6,
80    ):
81        self.source_address = source_address
82        self.destination_address = destination_address
83        self.priority = priority
84        self.data = data
85        self.pgn = PGN[pgn.name if pgn.id == -1 else pgn.id, self.data]
86        if len(data) != len(self.pgn.data_field):
87            raise TypeError(f"Expected {len(self.pgn.data_field)} arguments for data, got {len(data)}.")
source_address
destination_address
priority
data
pgn
@classmethod
def decode( cls, can_id: int, data: int | bytes) -> CANFrame:
 89    @classmethod
 90    def decode(cls, can_id: int, data: int | bytes) -> CANFrame:
 91        """Decode frame from raw can ID and data."""
 92
 93        # Decode CAN ID
 94        can_id_bits = Bits(can_id)
 95        source_address = can_id_bits.consume(8)
 96        pdu_specific = can_id_bits.consume(8)
 97        pdu_format = can_id_bits.consume(8)
 98        data_page = can_id_bits.consume(2)  # Always 0 or 1 for J1939 frames
 99        priority = can_id_bits.consume(3)
100        if pdu_format < 240:  # Addressable
101            destination_address = pdu_specific
102            pgn = PGN[(data_page << 16) | (pdu_format << 8)]
103        else:  # Broadcast
104            destination_address = Address.GLOBAL
105            pgn = PGN[(data_page << 16) | (pdu_format << 8 | pdu_specific)]
106
107        # Decode data
108        if isinstance(data, int):
109            byte_length = ceil(sum(spn.length for spn in pgn.data_field) / 8)
110            data = data.to_bytes(byte_length, byteorder="big")
111        data = int.from_bytes(data, byteorder="little")
112        data_bits = Bits(data)
113        partitioned_data: list[float] = []
114        pgn = PGN[pgn.id, [data_bits.peek(8)]]  # FIXME(JA): for now we'll assume 8 bits
115        for spn in pgn.data_field:
116            partitioned_data.append(spn.decode(data_bits.consume(spn.length)))
117        return cls(pgn, partitioned_data, source_address, destination_address, priority)

Decode frame from raw can ID and data.

def message(self) -> can.message.Message:
119    def message(self) -> can.Message:
120        """Get this frame as a message object."""
121        return can.Message(arbitration_id=self.can_id, is_extended_id=True, data=self.packed_data)

Get this frame as a message object.

can_id: int
123    @property
124    def can_id(self) -> int:
125        """Encode a raw can ID."""
126        bits = Bits()
127        bits.produce(3, self.priority)
128        bits.produce(2, self.pgn.data_page)
129        pdu_format = self.pgn.id >> 8
130        bits.produce(8, self.pgn.id >> 8)
131        bits.produce(8, self.destination_address if pdu_format < 240 else self.pgn.id)
132        bits.produce(8, self.source_address)
133        return int(bits.data)

Encode a raw can ID.

packed_data: bytes
135    @property
136    def packed_data(self) -> bytes:
137        """Encode spn data to packed bytes."""
138        bits = Bits()
139        total_bits = 0
140        self.pgn = PGN[self.pgn.id, self.data]
141        for spn, data in zip(self.pgn.data_field[::-1], self.data[::-1]):
142            bits.produce(spn.length, spn.encode(data))
143            total_bits += spn.length
144        length = max(ceil(total_bits / 8), 1)
145        return bits.data.to_bytes(length, byteorder="little")

Encode spn data to packed bytes.

class CANBus:
172class CANBus:
173    """Manage a CAN bus."""
174
175    def __init__(self, channel: str = "can2"):
176        self.bus = can.Bus(channel=channel, interface="socketcan")
177        self.buffer = can.BufferedReader()
178        self.notifier = can.Notifier(self.bus, [self.buffer])
179
180    def __enter__(self):
181        """Return this object for use in the context."""
182        return self
183
184    def __exit__(self, exc_type, exc_value, exc_traceback):
185        """Configure a safe shut down for when the class instance is destroyed."""
186        self.notifier.stop(timeout=5)
187        self.bus.shutdown()
188
189    def send_message(self, message: can.Message) -> bool:
190        """Send a CAN message."""
191        try:
192            self.bus.send(message)
193            return True
194        except can.CanError:
195            print("Message not sent.")
196            return False
197
198    def read_input(self, timeout: float = 10) -> can.Message | None:
199        """Get any messages received."""
200        try:
201            return self.buffer.get_message(timeout)  # FIXME(JA): filter for messages
202        except queue.Empty:
203            logger.write_error_to_report(f"No response after {timeout} seconds. Is the 6T charged and on?")
204            return None
205
206    def send_frame(self, frame: CANFrame) -> bool:
207        """Send a CAN frame."""
208        return self.send_message(frame.message())
209
210    def read_frame(self, timeout: float | None = None) -> CANFrame | None:
211        """Return a decoded frame."""
212        received_message = self.read_input(timeout) if timeout else self.read_input()
213        if received_message:
214            return CANFrame.decode(received_message.arbitration_id, received_message.data)
215        return None
216
217    def process_call(self, message_frame: CANFrame, timeout: float | None = None) -> CANFrame | None:
218        """Send a message and read a response."""
219        if self.send_message(message_frame.message()):
220            return self.read_frame(timeout)
221        return None

Manage a CAN bus.

CANBus(channel: str = 'can2')
175    def __init__(self, channel: str = "can2"):
176        self.bus = can.Bus(channel=channel, interface="socketcan")
177        self.buffer = can.BufferedReader()
178        self.notifier = can.Notifier(self.bus, [self.buffer])
bus
buffer
notifier
def send_message(self, message: can.message.Message) -> bool:
189    def send_message(self, message: can.Message) -> bool:
190        """Send a CAN message."""
191        try:
192            self.bus.send(message)
193            return True
194        except can.CanError:
195            print("Message not sent.")
196            return False

Send a CAN message.

def read_input(self, timeout: float = 10) -> can.message.Message | None:
198    def read_input(self, timeout: float = 10) -> can.Message | None:
199        """Get any messages received."""
200        try:
201            return self.buffer.get_message(timeout)  # FIXME(JA): filter for messages
202        except queue.Empty:
203            logger.write_error_to_report(f"No response after {timeout} seconds. Is the 6T charged and on?")
204            return None

Get any messages received.

def send_frame(self, frame: CANFrame) -> bool:
206    def send_frame(self, frame: CANFrame) -> bool:
207        """Send a CAN frame."""
208        return self.send_message(frame.message())

Send a CAN frame.

def read_frame( self, timeout: float | None = None) -> CANFrame | None:
210    def read_frame(self, timeout: float | None = None) -> CANFrame | None:
211        """Return a decoded frame."""
212        received_message = self.read_input(timeout) if timeout else self.read_input()
213        if received_message:
214            return CANFrame.decode(received_message.arbitration_id, received_message.data)
215        return None

Return a decoded frame.

def process_call( self, message_frame: CANFrame, timeout: float | None = None) -> CANFrame | None:
217    def process_call(self, message_frame: CANFrame, timeout: float | None = None) -> CANFrame | None:
218        """Send a message and read a response."""
219        if self.send_message(message_frame.message()):
220            return self.read_frame(timeout)
221        return None

Send a message and read a response.