hitl_tester.modules.cyber_6t.canbus
Functions for processing J1939 messages.
(c) 2020-2024 TurnAround Factor, Inc.
#
CUI DISTRIBUTION CONTROL
Controlled by: DLA J68 R&D SBIP
CUI Category: Small Business Research and Technology
Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
POC: GOV SBIP Program Manager Denise Price, 571-767-0111
Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
Fort Belvoir, VA 22060-6221
#
SBIR DATA RIGHTS
Contract No.:SP4701-23-C-0083
Contractor Name: TurnAround Factor, Inc.
Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
Expiration of SBIR Data Rights Period: September 24, 2029
The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
the markings.
1""" 2Functions for processing J1939 messages. 3 4# (c) 2020-2024 TurnAround Factor, Inc. 5# 6# CUI DISTRIBUTION CONTROL 7# Controlled by: DLA J68 R&D SBIP 8# CUI Category: Small Business Research and Technology 9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, 15# Fort Belvoir, VA 22060-6221 16# 17# SBIR DATA RIGHTS 18# Contract No.:SP4701-23-C-0083 19# Contractor Name: TurnAround Factor, Inc. 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 21# Expiration of SBIR Data Rights Period: September 24, 2029 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce 27# the markings. 28""" 29 30from __future__ import annotations 31 32import queue 33import shutil 34import textwrap 35from dataclasses import dataclass 36from math import ceil 37 38import can 39from colorama import init, Fore 40from hitl_tester.modules.cyber_6t.j1939da import PGNType, PGN 41from hitl_tester.modules.cyber_6t.spn_types import Address, addresses 42 43from hitl_tester.modules.logger import logger 44 45init(autoreset=True) 46 47 48@dataclass 49class Bits: 50 """Simplifies working with bits.""" 51 52 data: int = 0 53 54 def peek(self, bits: int) -> int: 55 """Get bits from data.""" 56 return self.data & (1 << bits) - 1 57 58 def consume(self, bits: int) -> int: 59 """Get bits and shift.""" 60 result = self.peek(bits) 61 self.data >>= bits 62 return result 63 64 def produce(self, bits: int, data: int): 65 """Append bits from data.""" 66 self.data = (self.data << bits) | (data & (1 << bits) - 1) 67 68 69class CANFrame: 70 """A complete CAN frame.""" 71 72 def __init__( 73 self, 74 pgn: PGNType, 75 data: list[float], 76 source_address: int = Address.DIAGNOSTIC_TOOL_1, 77 destination_address: int = Address.GLOBAL, 78 priority: int = 6, 79 ): 80 self.source_address = source_address 81 self.destination_address = destination_address 82 self.priority = priority 83 self.data = data 84 self.pgn = PGN[pgn.name if pgn.id == -1 else pgn.id, self.data] 85 if len(data) != len(self.pgn.data_field): 86 raise TypeError(f"Expected {len(self.pgn.data_field)} arguments for data, got {len(data)}.") 87 88 @classmethod 89 def decode(cls, can_id: int, data: int | bytes) -> CANFrame: 90 """Decode frame from raw can ID and data.""" 91 92 # Decode CAN ID 93 can_id_bits = Bits(can_id) 94 source_address = can_id_bits.consume(8) 95 pdu_specific = can_id_bits.consume(8) 96 pdu_format = can_id_bits.consume(8) 97 data_page = can_id_bits.consume(2) # Always 0 or 1 for J1939 frames 98 priority = can_id_bits.consume(3) 99 if pdu_format < 240: # Addressable 100 destination_address = pdu_specific 101 pgn = PGN[(data_page << 16) | (pdu_format << 8)] 102 else: # Broadcast 103 destination_address = Address.GLOBAL 104 pgn = PGN[(data_page << 16) | (pdu_format << 8 | pdu_specific)] 105 106 # Decode data 107 if isinstance(data, int): 108 byte_length = ceil(sum(spn.length for spn in pgn.data_field) / 8) 109 data = data.to_bytes(byte_length, byteorder="big") 110 data = int.from_bytes(data, byteorder="little") 111 data_bits = Bits(data) 112 partitioned_data: list[float] = [] 113 pgn = PGN[pgn.id, [data_bits.peek(8)]] # FIXME(JA): for now we'll assume 8 bits 114 for spn in pgn.data_field: 115 partitioned_data.append(spn.decode(data_bits.consume(spn.length))) 116 return cls(pgn, partitioned_data, source_address, destination_address, priority) 117 118 def message(self) -> can.Message: 119 """Get this frame as a message object.""" 120 return can.Message(arbitration_id=self.can_id, is_extended_id=True, data=self.packed_data) 121 122 @property 123 def can_id(self) -> int: 124 """Encode a raw can ID.""" 125 bits = Bits() 126 bits.produce(3, self.priority) 127 bits.produce(2, self.pgn.data_page) 128 pdu_format = self.pgn.id >> 8 129 bits.produce(8, self.pgn.id >> 8) 130 bits.produce(8, self.destination_address if pdu_format < 240 else self.pgn.id) 131 bits.produce(8, self.source_address) 132 return int(bits.data) 133 134 @property 135 def packed_data(self) -> bytes: 136 """Encode spn data to packed bytes.""" 137 bits = Bits() 138 total_bits = 0 139 self.pgn = PGN[self.pgn.id, self.data] 140 for spn, data in zip(self.pgn.data_field[::-1], self.data[::-1]): 141 bits.produce(spn.length, spn.encode(data)) 142 total_bits += spn.length 143 length = max(ceil(total_bits / 8), 1) 144 return bits.data.to_bytes(length, byteorder="little") 145 146 def __repr__(self) -> str: 147 return f"{self.can_id:02X}#{self.packed_data.hex().upper()}" 148 149 def __str__(self): 150 width = shutil.get_terminal_size((80, 20)).columns 151 lines = [] 152 title = f"[FRAME {self!r}]" 153 lines.append(f"{Fore.GREEN}{title:=^{width}}{Fore.RESET}") 154 lines.append(f"Source: {self.source_address} ({addresses(self.source_address)})") 155 lines.append(f"Destination: {self.destination_address} ({addresses(self.destination_address)})") 156 lines.append(f"Priority: {self.priority}") 157 lines.append(f"PGN:\n{textwrap.indent(str(self.pgn), ' ')}") 158 159 lines.append("Data Fields:") 160 self.pgn = PGN[self.pgn.id, self.data] 161 for spn, data in zip(self.pgn.data_field, self.data): 162 if spn.name == "Reserved": 163 continue 164 description = f"({desc})" if (desc := spn.data_type(data)) else "" 165 lines.append(textwrap.indent(f"{spn.name}: {data} {description}", " ")) 166 167 lines.append(f"{Fore.GREEN}{'=' * width}{Fore.RESET}") 168 return "\n".join(lines) 169 170 171class CANBus: 172 """Manage a CAN bus.""" 173 174 def __init__(self, channel: str = "can2"): 175 self.bus = can.Bus(channel=channel, interface="socketcan") 176 self.buffer = can.BufferedReader() 177 self.notifier = can.Notifier(self.bus, [self.buffer]) 178 179 def __enter__(self): 180 """Return this object for use in the context.""" 181 return self 182 183 def __exit__(self, exc_type, exc_value, exc_traceback): 184 """Configure a safe shut down for when the class instance is destroyed.""" 185 self.notifier.stop(timeout=5) 186 self.bus.shutdown() 187 188 def send_message(self, message: can.Message) -> bool: 189 """Send a CAN message.""" 190 try: 191 self.bus.send(message) 192 return True 193 except can.CanError: 194 print("Message not sent.") 195 return False 196 197 def read_input(self, timeout: float = 10) -> can.Message | None: 198 """Get any messages received.""" 199 try: 200 return self.buffer.get_message(timeout) # FIXME(JA): filter for messages 201 except queue.Empty: 202 logger.write_error_to_report(f"No response after {timeout} seconds. Is the 6T charged and on?") 203 return None 204 205 def send_frame(self, frame: CANFrame) -> bool: 206 """Send a CAN frame.""" 207 return self.send_message(frame.message()) 208 209 def read_frame(self, timeout: float | None = None) -> CANFrame | None: 210 """Return a decoded frame.""" 211 received_message = self.read_input(timeout) if timeout else self.read_input() 212 if received_message: 213 return CANFrame.decode(received_message.arbitration_id, received_message.data) 214 return None 215 216 def process_call(self, message_frame: CANFrame, timeout: float | None = None) -> CANFrame | None: 217 """Send a message and read a response.""" 218 if self.send_message(message_frame.message()): 219 return self.read_frame(timeout) 220 return None 221 222 223if __name__ == "__main__": 224 frame_get_all_addresses = CANFrame( 225 source_address=Address.DIAGNOSTIC_TOOL_1, 226 destination_address=Address.GLOBAL, 227 pgn=PGN["Request"], # Example showing how to get PGNs via a full name 228 data=[PGN["Address Claimed"].id], # Abbreviation, and ID are also valid (e.g. PGN["AC"], PGN[0xEE00]) 229 ) 230 231 # print(CANFrame.decode(0x1CD8FAE8, 0x0019090100060000)) # Example to manually decode frame 232 233 try: 234 with CANBus("can0") as bus: 235 response = bus.process_call(frame_get_all_addresses) 236 if response: 237 print(CANFrame) 238 else: 239 print("No response :(") 240 except OSError: 241 print("Cannot connect to CAN bus.")
@dataclass
class
Bits:
49@dataclass 50class Bits: 51 """Simplifies working with bits.""" 52 53 data: int = 0 54 55 def peek(self, bits: int) -> int: 56 """Get bits from data.""" 57 return self.data & (1 << bits) - 1 58 59 def consume(self, bits: int) -> int: 60 """Get bits and shift.""" 61 result = self.peek(bits) 62 self.data >>= bits 63 return result 64 65 def produce(self, bits: int, data: int): 66 """Append bits from data.""" 67 self.data = (self.data << bits) | (data & (1 << bits) - 1)
Simplifies working with bits.
def
peek(self, bits: int) -> int:
55 def peek(self, bits: int) -> int: 56 """Get bits from data.""" 57 return self.data & (1 << bits) - 1
Get bits from data.
class
CANFrame:
70class CANFrame: 71 """A complete CAN frame.""" 72 73 def __init__( 74 self, 75 pgn: PGNType, 76 data: list[float], 77 source_address: int = Address.DIAGNOSTIC_TOOL_1, 78 destination_address: int = Address.GLOBAL, 79 priority: int = 6, 80 ): 81 self.source_address = source_address 82 self.destination_address = destination_address 83 self.priority = priority 84 self.data = data 85 self.pgn = PGN[pgn.name if pgn.id == -1 else pgn.id, self.data] 86 if len(data) != len(self.pgn.data_field): 87 raise TypeError(f"Expected {len(self.pgn.data_field)} arguments for data, got {len(data)}.") 88 89 @classmethod 90 def decode(cls, can_id: int, data: int | bytes) -> CANFrame: 91 """Decode frame from raw can ID and data.""" 92 93 # Decode CAN ID 94 can_id_bits = Bits(can_id) 95 source_address = can_id_bits.consume(8) 96 pdu_specific = can_id_bits.consume(8) 97 pdu_format = can_id_bits.consume(8) 98 data_page = can_id_bits.consume(2) # Always 0 or 1 for J1939 frames 99 priority = can_id_bits.consume(3) 100 if pdu_format < 240: # Addressable 101 destination_address = pdu_specific 102 pgn = PGN[(data_page << 16) | (pdu_format << 8)] 103 else: # Broadcast 104 destination_address = Address.GLOBAL 105 pgn = PGN[(data_page << 16) | (pdu_format << 8 | pdu_specific)] 106 107 # Decode data 108 if isinstance(data, int): 109 byte_length = ceil(sum(spn.length for spn in pgn.data_field) / 8) 110 data = data.to_bytes(byte_length, byteorder="big") 111 data = int.from_bytes(data, byteorder="little") 112 data_bits = Bits(data) 113 partitioned_data: list[float] = [] 114 pgn = PGN[pgn.id, [data_bits.peek(8)]] # FIXME(JA): for now we'll assume 8 bits 115 for spn in pgn.data_field: 116 partitioned_data.append(spn.decode(data_bits.consume(spn.length))) 117 return cls(pgn, partitioned_data, source_address, destination_address, priority) 118 119 def message(self) -> can.Message: 120 """Get this frame as a message object.""" 121 return can.Message(arbitration_id=self.can_id, is_extended_id=True, data=self.packed_data) 122 123 @property 124 def can_id(self) -> int: 125 """Encode a raw can ID.""" 126 bits = Bits() 127 bits.produce(3, self.priority) 128 bits.produce(2, self.pgn.data_page) 129 pdu_format = self.pgn.id >> 8 130 bits.produce(8, self.pgn.id >> 8) 131 bits.produce(8, self.destination_address if pdu_format < 240 else self.pgn.id) 132 bits.produce(8, self.source_address) 133 return int(bits.data) 134 135 @property 136 def packed_data(self) -> bytes: 137 """Encode spn data to packed bytes.""" 138 bits = Bits() 139 total_bits = 0 140 self.pgn = PGN[self.pgn.id, self.data] 141 for spn, data in zip(self.pgn.data_field[::-1], self.data[::-1]): 142 bits.produce(spn.length, spn.encode(data)) 143 total_bits += spn.length 144 length = max(ceil(total_bits / 8), 1) 145 return bits.data.to_bytes(length, byteorder="little") 146 147 def __repr__(self) -> str: 148 return f"{self.can_id:02X}#{self.packed_data.hex().upper()}" 149 150 def __str__(self): 151 width = shutil.get_terminal_size((80, 20)).columns 152 lines = [] 153 title = f"[FRAME {self!r}]" 154 lines.append(f"{Fore.GREEN}{title:=^{width}}{Fore.RESET}") 155 lines.append(f"Source: {self.source_address} ({addresses(self.source_address)})") 156 lines.append(f"Destination: {self.destination_address} ({addresses(self.destination_address)})") 157 lines.append(f"Priority: {self.priority}") 158 lines.append(f"PGN:\n{textwrap.indent(str(self.pgn), ' ')}") 159 160 lines.append("Data Fields:") 161 self.pgn = PGN[self.pgn.id, self.data] 162 for spn, data in zip(self.pgn.data_field, self.data): 163 if spn.name == "Reserved": 164 continue 165 description = f"({desc})" if (desc := spn.data_type(data)) else "" 166 lines.append(textwrap.indent(f"{spn.name}: {data} {description}", " ")) 167 168 lines.append(f"{Fore.GREEN}{'=' * width}{Fore.RESET}") 169 return "\n".join(lines)
A complete CAN frame.
CANFrame( pgn: hitl_tester.modules.cyber_6t.j1939da.PGNType, data: list[float], source_address: int = <Address.DIAGNOSTIC_TOOL_1: 250>, destination_address: int = <Address.GLOBAL: 255>, priority: int = 6)
73 def __init__( 74 self, 75 pgn: PGNType, 76 data: list[float], 77 source_address: int = Address.DIAGNOSTIC_TOOL_1, 78 destination_address: int = Address.GLOBAL, 79 priority: int = 6, 80 ): 81 self.source_address = source_address 82 self.destination_address = destination_address 83 self.priority = priority 84 self.data = data 85 self.pgn = PGN[pgn.name if pgn.id == -1 else pgn.id, self.data] 86 if len(data) != len(self.pgn.data_field): 87 raise TypeError(f"Expected {len(self.pgn.data_field)} arguments for data, got {len(data)}.")
89 @classmethod 90 def decode(cls, can_id: int, data: int | bytes) -> CANFrame: 91 """Decode frame from raw can ID and data.""" 92 93 # Decode CAN ID 94 can_id_bits = Bits(can_id) 95 source_address = can_id_bits.consume(8) 96 pdu_specific = can_id_bits.consume(8) 97 pdu_format = can_id_bits.consume(8) 98 data_page = can_id_bits.consume(2) # Always 0 or 1 for J1939 frames 99 priority = can_id_bits.consume(3) 100 if pdu_format < 240: # Addressable 101 destination_address = pdu_specific 102 pgn = PGN[(data_page << 16) | (pdu_format << 8)] 103 else: # Broadcast 104 destination_address = Address.GLOBAL 105 pgn = PGN[(data_page << 16) | (pdu_format << 8 | pdu_specific)] 106 107 # Decode data 108 if isinstance(data, int): 109 byte_length = ceil(sum(spn.length for spn in pgn.data_field) / 8) 110 data = data.to_bytes(byte_length, byteorder="big") 111 data = int.from_bytes(data, byteorder="little") 112 data_bits = Bits(data) 113 partitioned_data: list[float] = [] 114 pgn = PGN[pgn.id, [data_bits.peek(8)]] # FIXME(JA): for now we'll assume 8 bits 115 for spn in pgn.data_field: 116 partitioned_data.append(spn.decode(data_bits.consume(spn.length))) 117 return cls(pgn, partitioned_data, source_address, destination_address, priority)
Decode frame from raw can ID and data.
def
message(self) -> can.message.Message:
119 def message(self) -> can.Message: 120 """Get this frame as a message object.""" 121 return can.Message(arbitration_id=self.can_id, is_extended_id=True, data=self.packed_data)
Get this frame as a message object.
can_id: int
123 @property 124 def can_id(self) -> int: 125 """Encode a raw can ID.""" 126 bits = Bits() 127 bits.produce(3, self.priority) 128 bits.produce(2, self.pgn.data_page) 129 pdu_format = self.pgn.id >> 8 130 bits.produce(8, self.pgn.id >> 8) 131 bits.produce(8, self.destination_address if pdu_format < 240 else self.pgn.id) 132 bits.produce(8, self.source_address) 133 return int(bits.data)
Encode a raw can ID.
packed_data: bytes
135 @property 136 def packed_data(self) -> bytes: 137 """Encode spn data to packed bytes.""" 138 bits = Bits() 139 total_bits = 0 140 self.pgn = PGN[self.pgn.id, self.data] 141 for spn, data in zip(self.pgn.data_field[::-1], self.data[::-1]): 142 bits.produce(spn.length, spn.encode(data)) 143 total_bits += spn.length 144 length = max(ceil(total_bits / 8), 1) 145 return bits.data.to_bytes(length, byteorder="little")
Encode spn data to packed bytes.
class
CANBus:
172class CANBus: 173 """Manage a CAN bus.""" 174 175 def __init__(self, channel: str = "can2"): 176 self.bus = can.Bus(channel=channel, interface="socketcan") 177 self.buffer = can.BufferedReader() 178 self.notifier = can.Notifier(self.bus, [self.buffer]) 179 180 def __enter__(self): 181 """Return this object for use in the context.""" 182 return self 183 184 def __exit__(self, exc_type, exc_value, exc_traceback): 185 """Configure a safe shut down for when the class instance is destroyed.""" 186 self.notifier.stop(timeout=5) 187 self.bus.shutdown() 188 189 def send_message(self, message: can.Message) -> bool: 190 """Send a CAN message.""" 191 try: 192 self.bus.send(message) 193 return True 194 except can.CanError: 195 print("Message not sent.") 196 return False 197 198 def read_input(self, timeout: float = 10) -> can.Message | None: 199 """Get any messages received.""" 200 try: 201 return self.buffer.get_message(timeout) # FIXME(JA): filter for messages 202 except queue.Empty: 203 logger.write_error_to_report(f"No response after {timeout} seconds. Is the 6T charged and on?") 204 return None 205 206 def send_frame(self, frame: CANFrame) -> bool: 207 """Send a CAN frame.""" 208 return self.send_message(frame.message()) 209 210 def read_frame(self, timeout: float | None = None) -> CANFrame | None: 211 """Return a decoded frame.""" 212 received_message = self.read_input(timeout) if timeout else self.read_input() 213 if received_message: 214 return CANFrame.decode(received_message.arbitration_id, received_message.data) 215 return None 216 217 def process_call(self, message_frame: CANFrame, timeout: float | None = None) -> CANFrame | None: 218 """Send a message and read a response.""" 219 if self.send_message(message_frame.message()): 220 return self.read_frame(timeout) 221 return None
Manage a CAN bus.
def
send_message(self, message: can.message.Message) -> bool:
189 def send_message(self, message: can.Message) -> bool: 190 """Send a CAN message.""" 191 try: 192 self.bus.send(message) 193 return True 194 except can.CanError: 195 print("Message not sent.") 196 return False
Send a CAN message.
def
read_input(self, timeout: float = 10) -> can.message.Message | None:
198 def read_input(self, timeout: float = 10) -> can.Message | None: 199 """Get any messages received.""" 200 try: 201 return self.buffer.get_message(timeout) # FIXME(JA): filter for messages 202 except queue.Empty: 203 logger.write_error_to_report(f"No response after {timeout} seconds. Is the 6T charged and on?") 204 return None
Get any messages received.
206 def send_frame(self, frame: CANFrame) -> bool: 207 """Send a CAN frame.""" 208 return self.send_message(frame.message())
Send a CAN frame.
210 def read_frame(self, timeout: float | None = None) -> CANFrame | None: 211 """Return a decoded frame.""" 212 received_message = self.read_input(timeout) if timeout else self.read_input() 213 if received_message: 214 return CANFrame.decode(received_message.arbitration_id, received_message.data) 215 return None
Return a decoded frame.