hitl_tester.test_cases.cyber_6t.vulnerability
A temporary file for quick development. Some of these should be broken out into test cases later on.
(c) 2020-2024 TurnAround Factor, Inc.
#
CUI DISTRIBUTION CONTROL
Controlled by: DLA J68 R&D SBIP
CUI Category: Small Business Research and Technology
Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
POC: GOV SBIP Program Manager Denise Price, 571-767-0111
Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
Fort Belvoir, VA 22060-6221
#
SBIR DATA RIGHTS
Contract No.:SP4701-23-C-0083
Contractor Name: TurnAround Factor, Inc.
Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
Expiration of SBIR Data Rights Period: September 24, 2029
The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
the markings.
Used in these test plans:
- vulnerability ⠀⠀⠀(cyber_6t/vulnerability.plan)
Example Command (warning: test plan may run other test cases):
./hitl_tester.py vulnerability -DBATTERY_CHANNEL=can0 -DHARD_RESET=False -DBATTERY_ADDRESS=0 -DBATTERY_INFO=<BatteryInfo object at 0x7f34793687a0>
1""" 2A temporary file for quick development. Some of these should be broken out into test cases later on. 3 4# (c) 2020-2024 TurnAround Factor, Inc. 5# 6# CUI DISTRIBUTION CONTROL 7# Controlled by: DLA J68 R&D SBIP 8# CUI Category: Small Business Research and Technology 9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, 15# Fort Belvoir, VA 22060-6221 16# 17# SBIR DATA RIGHTS 18# Contract No.:SP4701-23-C-0083 19# Contractor Name: TurnAround Factor, Inc. 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 21# Expiration of SBIR Data Rights Period: September 24, 2029 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce 27# the markings. 28""" 29 30import itertools 31import time 32from contextlib import suppress 33from enum import IntEnum 34from typing import Literal 35 36import pytest 37 38from hitl_tester.modules.cyber_6t import spn_types 39from hitl_tester.modules.cyber_6t.canbus import CANBus, CANFrame 40from hitl_tester.modules.cyber_6t.j1939da import PGN 41 42from hitl_tester.modules import properties 43from hitl_tester.modules.logger import logger 44 45BATTERY_CHANNEL = "can0" 46"""Channel identification. Expected type is backend dependent.""" 47 48HARD_RESET = False 49"""Whether to use a soft or hard reset before each test.""" 50 51properties.apply() # Allow modifying the above globals 52 53 54class ManufacturerID(IntEnum): 55 """Enum to hold Manufacturer IDs.""" 56 57 SAFT = 269 # NOTE: unused 58 BRENTRONICS = 822 59 60 61BATTERY_ADDRESS = 0 62 63 64class BatteryInfo: 65 """class to store Battery Information storage""" 66 67 def __init__(self): 68 self.data = {} 69 70 def get_item(self, key): 71 """Gets a specific item from data""" 72 return self.data[key] 73 74 def set_item(self, key, value): 75 """Sets a specific item from data""" 76 self.data[key] = value 77 78 79BATTERY_INFO = BatteryInfo() 80 81 82def cmp( 83 a: float | str, 84 sign: Literal["<", "<=", ">", ">=", "=="], 85 b: float | str, 86 unit_a: str = "", 87 unit_b: str = "", 88 form: str = "", 89) -> str: 90 """Generate a formatted string based on a comparison.""" 91 if not unit_b: 92 unit_b = unit_a 93 94 if isinstance(a, str) or isinstance(b, str): 95 return f"{a:{form}}{unit_a} {('≠', '=')[a == b]} {b:{form}}{unit_b}" 96 97 sign_str = { 98 "<": ("≮", "<")[a < b], 99 "<=": ("≰", "≤")[a <= b], 100 ">": ("≯", ">")[a > b], 101 ">=": ("≱", "≥")[a >= b], 102 "==": ("≠", "=")[a == b], 103 } 104 105 return f"{a:{form}}{unit_a} {sign_str[sign]} {b:{form}}{unit_b}" 106 107 108@pytest.fixture(scope="class", autouse=True) 109def reset_test_environment(): 110 """Before each test class, reset the 6T.""" 111 112 if BATTERY_ADDRESS: 113 power_cycle_frame = CANFrame( 114 destination_address=BATTERY_ADDRESS, 115 pgn=PGN["PropA"], 116 data=[0, 0, 1, 1, 1, not HARD_RESET, 1, 3, 0, -1], 117 ) 118 maintenance_mode_frame = CANFrame( 119 destination_address=BATTERY_ADDRESS, 120 pgn=PGN["PropA"], 121 data=[0, 0, 1, 1, 1, 3, 1, 3, 0, -1], 122 ) 123 factory_reset_frame = CANFrame( 124 destination_address=BATTERY_ADDRESS, 125 pgn=PGN["PropA"], 126 data=[1, 0, 0, -1, 3, 0xF, 3, 0, 0x1F, -1], 127 ) 128 with CANBus(BATTERY_CHANNEL) as bus: 129 logger.write_info_to_report("Power-Cycling 6T") 130 bus.process_call(power_cycle_frame) 131 time.sleep(10) 132 logger.write_info_to_report("Entering maintenance mode") 133 bus.process_call(maintenance_mode_frame) 134 logger.write_info_to_report("Factory resetting 6T") 135 bus.process_call(factory_reset_frame) 136 time.sleep(10) 137 else: # Battery has not yet been found 138 return 139 140 141class TestLocate6T: 142 """Scan for 6T battery.""" 143 144 MAX_ATTEMPTS = 12 # 2 minutes worth of attempts 145 146 def __init__(self): 147 self.id = 0 148 self.manufacturer_code = 0 149 self.manufacturer_name = "" 150 151 def test_profile(self) -> None: 152 """ 153 | Description | Scan the bus for devices | 154 | :------------------- | :--------------------------------------------------------------- | 155 | Instructions | 1. Request the names of everyone on the bus </br>\ 156 2. Use the response data for all communication | 157 | Estimated Duration | 1 second | 158 """ 159 global BATTERY_ADDRESS 160 161 name_request = CANFrame(pgn=PGN["Request"], data=[PGN["Address Claimed"].id]) 162 with CANBus(BATTERY_CHANNEL) as bus: 163 for attempt in range(self.MAX_ATTEMPTS): 164 with suppress(IndexError): 165 if name_frame := bus.process_call(name_request): 166 # Save responses to profile 167 self.id = int(name_frame.data[0]) 168 self.manufacturer_code = name_frame.data[1] 169 BATTERY_INFO.set_item("manufacturer_code", name_frame.data[1]) 170 if (mfg_name := spn_types.manufacturer_id(name_frame.data[1])) == "Reserved": 171 mfg_name = f"Unknown ({name_frame.data[1]})" 172 self.manufacturer_name = mfg_name 173 BATTERY_ADDRESS = name_frame.source_address 174 break 175 logger.write_warning_to_html_report(f"Failed to locate 6T. Retrying, attempt {attempt + 1}") 176 time.sleep(10) 177 else: 178 message = "Could not locate 6T" 179 logger.write_warning_to_html_report(message) 180 pytest.exit(message) 181 182 # Log results 183 printable_id = "".join(chr(i) if chr(i).isprintable() else "." for i in self.id.to_bytes(3, byteorder="big")) 184 logger.write_result_to_html_report( 185 f"Found {self.manufacturer_name} 6T at address {BATTERY_ADDRESS} " 186 f"(ID: {self.id:06X}, ASCII ID: {printable_id})" 187 ) 188 189 190class TestScanForProprietary: 191 """Scan all 512 proprietary commands for any responses.""" 192 193 def test_scan_for_proprietary(self): 194 """ 195 | Description | Scan all 512 proprietary commands for any responses | 196 | :------------------- | :--------------------------------------------------------------- | 197 | Instructions | 1. Scan addresses $EFxx and $FFxx </br>\ 198 2. Log any successful responses </br>\ 199 3. Check if responded commands are not documented | 200 | Estimated Duration | 85 seconds | 201 """ 202 TestLocate6T().test_profile() 203 proprietary_request = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["RQST"], data=[0]) 204 ack_responses: dict[int, CANFrame] = {} # type: ignore # address: response 205 with CANBus(BATTERY_CHANNEL) as bus: 206 for address in itertools.chain(range(0xEF00, 0xF000), range(0xFF00, 0x10000)): 207 proprietary_request.data = [address] 208 if response_frame := bus.process_call(proprietary_request): # Log if response is not NACK 209 if not (response_frame.pgn.id == PGN["Acknowledgement"].id and response_frame.data[0] == 1): 210 ack_responses[address] = response_frame 211 logger.write_info_to_report(f"Got response at address {address:04X}:\n{response_frame}") 212 213 milprf_commands = [0xFF00, 0xFF01, 0xFF02, 0xFF03, 0xFF04, 0xFF05, 0xFF06, 0xFF07, 0xFF08] 214 j1939_transmit_commands = [0xE800, 0xEE00, 0xFEE6, 0xFCB6, 0xFECA, 0xFECB, 0xFE50, 0xFDC5, 0xFEDA, 0xD800] 215 216 if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.SAFT: 217 saft_commands = [ 218 0xFFD2, 219 0xFFD4, 220 0xFFD5, 221 0xFFD6, 222 0xFFD7, 223 0xFFD8, 224 0xFFDC, 225 0xFFDD, 226 0xFFDE, 227 0xFFE0, 228 0xFFE1, 229 0xFFE1, 230 0xFFE4, 231 0xFFE9, 232 0x1EF00, 233 ] 234 documented_commands = list(dict.fromkeys(milprf_commands + j1939_transmit_commands + saft_commands)) 235 else: 236 documented_commands = milprf_commands + j1939_transmit_commands 237 238 if unknown_proprietary_commands := set(ack_responses) - set(documented_commands): 239 message = ( 240 f"Unknown Proprietary Command{'s' if len(unknown_proprietary_commands) > 1 else ''}: " 241 f"{', '.join(map(str, unknown_proprietary_commands))}" 242 ) 243 logger.write_warning_to_html_report(message) 244 message = ( 245 f"{len(unknown_proprietary_commands)} command" 246 f"{'s' if len(unknown_proprietary_commands) > 1 else ''} missing known documentation" 247 ) 248 logger.write_failure_to_html_report(message) 249 pytest.fail(message) 250 251 logger.write_result_to_html_report("All responded commands are known and documented") 252 253 254def request_data(end_of_message: bool = True) -> bool: 255 """Send a RTS.""" 256 cts_pgn = PGN["ECUID"].id 257 end_acknowledge_frame = CANFrame( # Memory access only works in maintenance mode 258 destination_address=BATTERY_ADDRESS, 259 pgn=PGN["TP.CM"], 260 data=[17, 0x0203, 0xFF, 0xFF, cts_pgn], # End of Message Acknowledge, bytes, packets, reserved, PGN 261 ) 262 cts_r = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["RQST"], data=[cts_pgn]) 263 264 with CANBus(BATTERY_CHANNEL) as bus: 265 # Get request to send 266 logger.write_info_to_report("Sending RTS") 267 rts_frame = bus.process_call(cts_r) 268 logger.write_info_to_report(f"Got response:\n{rts_frame}") 269 logger.write_info_to_report("Not sending CTS. Purposefully waiting") 270 271 # Prep for End of Message 272 expected_bytes = int(rts_frame.data[1]) 273 expected_packets = int(rts_frame.data[2]) 274 rts_pgn_id = int(rts_frame.data[-1]) 275 276 # Acknowledge packets 277 if end_of_message: 278 end_acknowledge_frame.data[1] = expected_bytes 279 end_acknowledge_frame.data[2] = expected_packets 280 end_acknowledge_frame.data[-1] = rts_pgn_id 281 op_frame = bus.process_call(end_acknowledge_frame) 282 logger.write_info_to_report(f"End of message response:\n{op_frame}") 283 return True 284 285 time.sleep(10) 286 if response_frame := bus.read_frame(): 287 if response_frame.data[0] == 255: 288 if response_frame.data[1] == 3: 289 logger.write_info_to_report(f"Got timeout response:\n{response_frame}") 290 else: 291 logger.write_info_to_report(f"Received different connection abort message:\n{response_frame}") 292 return True 293 294 logger.write_info_to_report(f"May not have received a timeout response:\n{response_frame}") 295 296 logger.write_info_to_report("No response after waiting...") 297 return False 298 299 300def request_data_cts_test(size: int | None = None, sequence_id: int | None = None, end_of_message: bool = True) -> bool: 301 """Send a CTS.""" 302 test_passed = True 303 cts_pgn = PGN["ECUID"].id 304 cts_frame = CANFrame( 305 destination_address=BATTERY_ADDRESS, 306 pgn=PGN[0xEC00], 307 data=[17, 0x0203, 0xFF, 0xFF, cts_pgn], # Packets that can be sent, next packet#, reserved, PGN 308 ) 309 end_acknowledge_frame = CANFrame( # Memory access only works in maintenance mode 310 destination_address=BATTERY_ADDRESS, 311 pgn=PGN["TP.CM"], 312 data=[19, 0, 0, 0xFF, 0], # End of Message Acknowledge, bytes, packets, reserved, PGN 313 ) 314 cts_r = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["Request"], data=[cts_pgn]) 315 316 with CANBus(BATTERY_CHANNEL) as bus: 317 # Get request to send 318 logger.write_info_to_report("Sending RTS") 319 rts_frame = bus.process_call(cts_r) 320 logger.write_info_to_report(f"Got response:\n{rts_frame}") 321 322 # Send malicious CTS 323 expected_bytes = int(rts_frame.data[1]) 324 expected_packets = int(rts_frame.data[2]) 325 rts_pgn_id = int(rts_frame.data[-1]) 326 cts_frame.data[1] = expected_packets if size is None else size 327 cts_frame.data[2] = 1 if sequence_id is None else sequence_id 328 cts_frame.data[-1] = rts_pgn_id 329 330 logger.write_info_to_report("Sending CTS") 331 bus.send_frame(cts_frame) 332 packet_count = 0 333 while response_frame := bus.read_frame(): 334 335 logger.write_info_to_report(f"Got response:\n{response_frame}") 336 337 if response_frame.pgn.id == PGN["TP.CM", [32]].id: 338 continue 339 340 packet_count += 1 341 342 if sequence_id is not None: 343 if expected_packets >= sequence_id > response_frame.data[0]: 344 345 cmp_text = cmp(response_frame.data[0], ">=", sequence_id) 346 347 logger.write_warning_to_html_report( 348 f"Received sequence ID {response_frame.data[0]}, " 349 f"which is before the requested sequence ID: {cmp_text}" 350 ) 351 # Pass BT for now with warning, special case 352 if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS: 353 continue 354 test_passed = False 355 356 # Acknowledge packets 357 if end_of_message: 358 end_acknowledge_frame.data[1] = expected_bytes 359 end_acknowledge_frame.data[2] = expected_packets 360 end_acknowledge_frame.data[-1] = rts_pgn_id 361 op_frame = bus.process_call(end_acknowledge_frame) 362 logger.write_info_to_report(f"End of message response:\n{op_frame}") 363 else: 364 time.sleep(10) 365 if response_frame := bus.read_frame(): 366 logger.write_info_to_report(f"Got possible timeout response:\n{response_frame}") 367 else: 368 logger.write_info_to_report("No response after waiting...") 369 370 if size is not None: 371 cmp_text = cmp(packet_count, "==", expected_packets) 372 if size == 0: 373 if packet_count != 0: 374 logger.write_warning_to_html_report( 375 f"Did not receive expected packets with invalid size: {cmp(packet_count, '==', 0)}" 376 ) 377 test_passed = False 378 else: 379 logger.write_result_to_html_report(f"Received expected packets: {cmp(packet_count, '==', 0)}") 380 elif size > expected_packets: 381 if packet_count != expected_packets: 382 logger.write_warning_to_html_report(f"Did not receive expected packets: {cmp_text}") 383 test_passed = False 384 else: 385 logger.write_result_to_html_report(f"Received expected packets: {cmp_text}") 386 elif size != packet_count: 387 logger.write_warning_to_html_report(f"Did not receive expected packets: {cmp_text}") 388 test_passed = False 389 else: 390 logger.write_result_to_html_report(f"Received expected packets: {cmp(packet_count, '==', size)}") 391 392 if sequence_id is not None: 393 if sequence_id == 0 or sequence_id > expected_packets: 394 cmp_text = cmp(packet_count, "==", 0) 395 if packet_count != 0: 396 logger.write_warning_to_html_report( 397 f"Did not receive expected number of packets with invalid request: {cmp_text}" 398 ) 399 test_passed = False 400 else: 401 logger.write_result_to_html_report( 402 f"Received expected packets with invalid sequence request: {cmp_text}" 403 ) 404 else: 405 overall_expected_packets = expected_packets - sequence_id + 1 406 cmp_text = cmp(packet_count, "==", overall_expected_packets) 407 if packet_count != abs(expected_packets - sequence_id + 1): 408 logger.write_warning_to_html_report(f"Did not receive expected number of packets: {cmp_text}") 409 else: 410 logger.write_result_to_html_report(f"Received expected number of packets: {cmp_text}") 411 412 if sequence_id is None and size is None: 413 if expected_packets == packet_count: 414 logger.write_result_to_html_report( 415 f"Received expected number of packets: {cmp(packet_count, '==', expected_packets)}" 416 ) 417 else: 418 logger.write_warning_to_html_report( 419 f"Did not receive expected number of packets: {cmp(packet_count, '==', expected_packets)}" 420 ) 421 422 if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS and test_passed is False: 423 return True 424 425 return test_passed 426 427 428class TestMaliciousCTS: 429 """Send a CTS with bad sequence ID or size.""" 430 431 def test_malicious_cts(self): 432 """ 433 | Description | Send a CTS with bad sequence ID or size | 434 | :------------------- | :--------------------------------------------------------------- | 435 | Instructions | 1. Send a normal CTS </br>\ 436 2. Send a CTS with sequence ID of zero </br>\ 437 3. Send a CTS with sequence ID too high </br>\ 438 4. Send a CTS with zero size </br>\ 439 5. Send a CTS with small size </br>\ 440 6. Send a CTS with size too big | 441 | Estimated Duration | 85 seconds | 442 """ 443 tests_failed = [] 444 TestLocate6T().test_profile() 445 446 cts_tests = [ 447 {"sequence_id": None, "size": None, "log_text": "Sending normal CTS"}, 448 {"sequence_id": 0, "size": None, "log_text": "Sending CTS with sequence ID of zero"}, 449 {"sequence_id": 3, "size": None, "log_text": "Sending CTS with sequence ID of three"}, 450 {"sequence_id": 200, "size": None, "log_text": "Sending CTS with sequence ID too high"}, 451 {"sequence_id": None, "size": 0, "log_text": "Sending CTS with zero size"}, 452 {"sequence_id": None, "size": 1, "log_text": "Sending CTS with small size"}, 453 {"sequence_id": None, "size": 200, "log_text": "Sending CTS with size too big"}, 454 ] 455 456 for cts_test in cts_tests: 457 if cts_test["sequence_id"] is None: 458 sequence_id = None 459 else: 460 sequence_id = cts_test["sequence_id"] 461 462 if cts_test["size"] is None: 463 size = None 464 else: 465 size = cts_test["size"] 466 467 logger.write_result_to_html_report(cts_test["log_text"]) 468 did_test_pass = request_data_cts_test(size, sequence_id) 469 470 if did_test_pass is False: 471 logger.write_failure_to_html_report(f"Test failed when {cts_test['log_text']}.") 472 tests_failed.append(cts_test["log_text"]) 473 474 if len(tests_failed) > 0: 475 pytest.fail("One or more tests failed Malicious CTS vulnerability scan") 476 elif BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS: 477 logger.write_warning_to_html_report("See warnings for possible discrepancies with J1939 expectations") 478 else: 479 logger.write_result_to_html_report("Malicious CTS vulnerability scans met expectations") 480 481 482class TestConnectionExhaustion: 483 """Start a connection, but don't close it.""" 484 485 def test_connection_exhaustion(self): 486 """ 487 | Description | Start a connection, but don't close it | 488 | :------------------- | :--------------------------------------------------------------- | 489 | Instructions | 1. Open a connection </br>\ 490 2. Refuse to close it and check for timeouts | 491 | Estimated Duration | 85 seconds | 492 """ 493 TestLocate6T().test_profile() 494 495 logger.write_info_to_report("Establishing connection...") 496 test_passed = request_data(end_of_message=False) 497 498 if test_passed is False: 499 logger.write_failure_to_html_report("No Connection Abort message was received after waiting 10 seconds") 500 pytest.fail("No Connection Abort message was received after waiting 10 seconds") 501 else: 502 logger.write_result_to_html_report("Connection Abort message was received after waiting 10 seconds")
BATTERY_CHANNEL =
'can0'
Channel identification. Expected type is backend dependent.
HARD_RESET =
False
Whether to use a soft or hard reset before each test.
class
ManufacturerID(enum.IntEnum):
55class ManufacturerID(IntEnum): 56 """Enum to hold Manufacturer IDs.""" 57 58 SAFT = 269 # NOTE: unused 59 BRENTRONICS = 822
Enum to hold Manufacturer IDs.
SAFT =
<ManufacturerID.SAFT: 269>
BRENTRONICS =
<ManufacturerID.BRENTRONICS: 822>
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- is_integer
- real
- imag
- numerator
- denominator
BATTERY_ADDRESS =
0
class
BatteryInfo:
65class BatteryInfo: 66 """class to store Battery Information storage""" 67 68 def __init__(self): 69 self.data = {} 70 71 def get_item(self, key): 72 """Gets a specific item from data""" 73 return self.data[key] 74 75 def set_item(self, key, value): 76 """Sets a specific item from data""" 77 self.data[key] = value
class to store Battery Information storage
BATTERY_INFO =
<BatteryInfo object>
def
cmp( a: float | str, sign: Literal['<', '<=', '>', '>=', '=='], b: float | str, unit_a: str = '', unit_b: str = '', form: str = '') -> str:
83def cmp( 84 a: float | str, 85 sign: Literal["<", "<=", ">", ">=", "=="], 86 b: float | str, 87 unit_a: str = "", 88 unit_b: str = "", 89 form: str = "", 90) -> str: 91 """Generate a formatted string based on a comparison.""" 92 if not unit_b: 93 unit_b = unit_a 94 95 if isinstance(a, str) or isinstance(b, str): 96 return f"{a:{form}}{unit_a} {('≠', '=')[a == b]} {b:{form}}{unit_b}" 97 98 sign_str = { 99 "<": ("≮", "<")[a < b], 100 "<=": ("≰", "≤")[a <= b], 101 ">": ("≯", ">")[a > b], 102 ">=": ("≱", "≥")[a >= b], 103 "==": ("≠", "=")[a == b], 104 } 105 106 return f"{a:{form}}{unit_a} {sign_str[sign]} {b:{form}}{unit_b}"
Generate a formatted string based on a comparison.
@pytest.fixture(scope='class', autouse=True)
def
reset_test_environment():
109@pytest.fixture(scope="class", autouse=True) 110def reset_test_environment(): 111 """Before each test class, reset the 6T.""" 112 113 if BATTERY_ADDRESS: 114 power_cycle_frame = CANFrame( 115 destination_address=BATTERY_ADDRESS, 116 pgn=PGN["PropA"], 117 data=[0, 0, 1, 1, 1, not HARD_RESET, 1, 3, 0, -1], 118 ) 119 maintenance_mode_frame = CANFrame( 120 destination_address=BATTERY_ADDRESS, 121 pgn=PGN["PropA"], 122 data=[0, 0, 1, 1, 1, 3, 1, 3, 0, -1], 123 ) 124 factory_reset_frame = CANFrame( 125 destination_address=BATTERY_ADDRESS, 126 pgn=PGN["PropA"], 127 data=[1, 0, 0, -1, 3, 0xF, 3, 0, 0x1F, -1], 128 ) 129 with CANBus(BATTERY_CHANNEL) as bus: 130 logger.write_info_to_report("Power-Cycling 6T") 131 bus.process_call(power_cycle_frame) 132 time.sleep(10) 133 logger.write_info_to_report("Entering maintenance mode") 134 bus.process_call(maintenance_mode_frame) 135 logger.write_info_to_report("Factory resetting 6T") 136 bus.process_call(factory_reset_frame) 137 time.sleep(10) 138 else: # Battery has not yet been found 139 return
Before each test class, reset the 6T.
class
TestLocate6T:
142class TestLocate6T: 143 """Scan for 6T battery.""" 144 145 MAX_ATTEMPTS = 12 # 2 minutes worth of attempts 146 147 def __init__(self): 148 self.id = 0 149 self.manufacturer_code = 0 150 self.manufacturer_name = "" 151 152 def test_profile(self) -> None: 153 """ 154 | Description | Scan the bus for devices | 155 | :------------------- | :--------------------------------------------------------------- | 156 | Instructions | 1. Request the names of everyone on the bus </br>\ 157 2. Use the response data for all communication | 158 | Estimated Duration | 1 second | 159 """ 160 global BATTERY_ADDRESS 161 162 name_request = CANFrame(pgn=PGN["Request"], data=[PGN["Address Claimed"].id]) 163 with CANBus(BATTERY_CHANNEL) as bus: 164 for attempt in range(self.MAX_ATTEMPTS): 165 with suppress(IndexError): 166 if name_frame := bus.process_call(name_request): 167 # Save responses to profile 168 self.id = int(name_frame.data[0]) 169 self.manufacturer_code = name_frame.data[1] 170 BATTERY_INFO.set_item("manufacturer_code", name_frame.data[1]) 171 if (mfg_name := spn_types.manufacturer_id(name_frame.data[1])) == "Reserved": 172 mfg_name = f"Unknown ({name_frame.data[1]})" 173 self.manufacturer_name = mfg_name 174 BATTERY_ADDRESS = name_frame.source_address 175 break 176 logger.write_warning_to_html_report(f"Failed to locate 6T. Retrying, attempt {attempt + 1}") 177 time.sleep(10) 178 else: 179 message = "Could not locate 6T" 180 logger.write_warning_to_html_report(message) 181 pytest.exit(message) 182 183 # Log results 184 printable_id = "".join(chr(i) if chr(i).isprintable() else "." for i in self.id.to_bytes(3, byteorder="big")) 185 logger.write_result_to_html_report( 186 f"Found {self.manufacturer_name} 6T at address {BATTERY_ADDRESS} " 187 f"(ID: {self.id:06X}, ASCII ID: {printable_id})" 188 )
Scan for 6T battery.
def
test_profile(self) -> None:
152 def test_profile(self) -> None: 153 """ 154 | Description | Scan the bus for devices | 155 | :------------------- | :--------------------------------------------------------------- | 156 | Instructions | 1. Request the names of everyone on the bus </br>\ 157 2. Use the response data for all communication | 158 | Estimated Duration | 1 second | 159 """ 160 global BATTERY_ADDRESS 161 162 name_request = CANFrame(pgn=PGN["Request"], data=[PGN["Address Claimed"].id]) 163 with CANBus(BATTERY_CHANNEL) as bus: 164 for attempt in range(self.MAX_ATTEMPTS): 165 with suppress(IndexError): 166 if name_frame := bus.process_call(name_request): 167 # Save responses to profile 168 self.id = int(name_frame.data[0]) 169 self.manufacturer_code = name_frame.data[1] 170 BATTERY_INFO.set_item("manufacturer_code", name_frame.data[1]) 171 if (mfg_name := spn_types.manufacturer_id(name_frame.data[1])) == "Reserved": 172 mfg_name = f"Unknown ({name_frame.data[1]})" 173 self.manufacturer_name = mfg_name 174 BATTERY_ADDRESS = name_frame.source_address 175 break 176 logger.write_warning_to_html_report(f"Failed to locate 6T. Retrying, attempt {attempt + 1}") 177 time.sleep(10) 178 else: 179 message = "Could not locate 6T" 180 logger.write_warning_to_html_report(message) 181 pytest.exit(message) 182 183 # Log results 184 printable_id = "".join(chr(i) if chr(i).isprintable() else "." for i in self.id.to_bytes(3, byteorder="big")) 185 logger.write_result_to_html_report( 186 f"Found {self.manufacturer_name} 6T at address {BATTERY_ADDRESS} " 187 f"(ID: {self.id:06X}, ASCII ID: {printable_id})" 188 )
| Description | Scan the bus for devices |
|---|---|
| Instructions | 1. Request the names of everyone on the bus 2. Use the response data for all communication |
| Estimated Duration | 1 second |
class
TestScanForProprietary:
191class TestScanForProprietary: 192 """Scan all 512 proprietary commands for any responses.""" 193 194 def test_scan_for_proprietary(self): 195 """ 196 | Description | Scan all 512 proprietary commands for any responses | 197 | :------------------- | :--------------------------------------------------------------- | 198 | Instructions | 1. Scan addresses $EFxx and $FFxx </br>\ 199 2. Log any successful responses </br>\ 200 3. Check if responded commands are not documented | 201 | Estimated Duration | 85 seconds | 202 """ 203 TestLocate6T().test_profile() 204 proprietary_request = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["RQST"], data=[0]) 205 ack_responses: dict[int, CANFrame] = {} # type: ignore # address: response 206 with CANBus(BATTERY_CHANNEL) as bus: 207 for address in itertools.chain(range(0xEF00, 0xF000), range(0xFF00, 0x10000)): 208 proprietary_request.data = [address] 209 if response_frame := bus.process_call(proprietary_request): # Log if response is not NACK 210 if not (response_frame.pgn.id == PGN["Acknowledgement"].id and response_frame.data[0] == 1): 211 ack_responses[address] = response_frame 212 logger.write_info_to_report(f"Got response at address {address:04X}:\n{response_frame}") 213 214 milprf_commands = [0xFF00, 0xFF01, 0xFF02, 0xFF03, 0xFF04, 0xFF05, 0xFF06, 0xFF07, 0xFF08] 215 j1939_transmit_commands = [0xE800, 0xEE00, 0xFEE6, 0xFCB6, 0xFECA, 0xFECB, 0xFE50, 0xFDC5, 0xFEDA, 0xD800] 216 217 if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.SAFT: 218 saft_commands = [ 219 0xFFD2, 220 0xFFD4, 221 0xFFD5, 222 0xFFD6, 223 0xFFD7, 224 0xFFD8, 225 0xFFDC, 226 0xFFDD, 227 0xFFDE, 228 0xFFE0, 229 0xFFE1, 230 0xFFE1, 231 0xFFE4, 232 0xFFE9, 233 0x1EF00, 234 ] 235 documented_commands = list(dict.fromkeys(milprf_commands + j1939_transmit_commands + saft_commands)) 236 else: 237 documented_commands = milprf_commands + j1939_transmit_commands 238 239 if unknown_proprietary_commands := set(ack_responses) - set(documented_commands): 240 message = ( 241 f"Unknown Proprietary Command{'s' if len(unknown_proprietary_commands) > 1 else ''}: " 242 f"{', '.join(map(str, unknown_proprietary_commands))}" 243 ) 244 logger.write_warning_to_html_report(message) 245 message = ( 246 f"{len(unknown_proprietary_commands)} command" 247 f"{'s' if len(unknown_proprietary_commands) > 1 else ''} missing known documentation" 248 ) 249 logger.write_failure_to_html_report(message) 250 pytest.fail(message) 251 252 logger.write_result_to_html_report("All responded commands are known and documented")
Scan all 512 proprietary commands for any responses.
def
test_scan_for_proprietary(self):
194 def test_scan_for_proprietary(self): 195 """ 196 | Description | Scan all 512 proprietary commands for any responses | 197 | :------------------- | :--------------------------------------------------------------- | 198 | Instructions | 1. Scan addresses $EFxx and $FFxx </br>\ 199 2. Log any successful responses </br>\ 200 3. Check if responded commands are not documented | 201 | Estimated Duration | 85 seconds | 202 """ 203 TestLocate6T().test_profile() 204 proprietary_request = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["RQST"], data=[0]) 205 ack_responses: dict[int, CANFrame] = {} # type: ignore # address: response 206 with CANBus(BATTERY_CHANNEL) as bus: 207 for address in itertools.chain(range(0xEF00, 0xF000), range(0xFF00, 0x10000)): 208 proprietary_request.data = [address] 209 if response_frame := bus.process_call(proprietary_request): # Log if response is not NACK 210 if not (response_frame.pgn.id == PGN["Acknowledgement"].id and response_frame.data[0] == 1): 211 ack_responses[address] = response_frame 212 logger.write_info_to_report(f"Got response at address {address:04X}:\n{response_frame}") 213 214 milprf_commands = [0xFF00, 0xFF01, 0xFF02, 0xFF03, 0xFF04, 0xFF05, 0xFF06, 0xFF07, 0xFF08] 215 j1939_transmit_commands = [0xE800, 0xEE00, 0xFEE6, 0xFCB6, 0xFECA, 0xFECB, 0xFE50, 0xFDC5, 0xFEDA, 0xD800] 216 217 if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.SAFT: 218 saft_commands = [ 219 0xFFD2, 220 0xFFD4, 221 0xFFD5, 222 0xFFD6, 223 0xFFD7, 224 0xFFD8, 225 0xFFDC, 226 0xFFDD, 227 0xFFDE, 228 0xFFE0, 229 0xFFE1, 230 0xFFE1, 231 0xFFE4, 232 0xFFE9, 233 0x1EF00, 234 ] 235 documented_commands = list(dict.fromkeys(milprf_commands + j1939_transmit_commands + saft_commands)) 236 else: 237 documented_commands = milprf_commands + j1939_transmit_commands 238 239 if unknown_proprietary_commands := set(ack_responses) - set(documented_commands): 240 message = ( 241 f"Unknown Proprietary Command{'s' if len(unknown_proprietary_commands) > 1 else ''}: " 242 f"{', '.join(map(str, unknown_proprietary_commands))}" 243 ) 244 logger.write_warning_to_html_report(message) 245 message = ( 246 f"{len(unknown_proprietary_commands)} command" 247 f"{'s' if len(unknown_proprietary_commands) > 1 else ''} missing known documentation" 248 ) 249 logger.write_failure_to_html_report(message) 250 pytest.fail(message) 251 252 logger.write_result_to_html_report("All responded commands are known and documented")
| Description | Scan all 512 proprietary commands for any responses |
|---|---|
| Instructions | 1. Scan addresses $EFxx and $FFxx 2. Log any successful responses 3. Check if responded commands are not documented |
| Estimated Duration | 85 seconds |
def
request_data(end_of_message: bool = True) -> bool:
255def request_data(end_of_message: bool = True) -> bool: 256 """Send a RTS.""" 257 cts_pgn = PGN["ECUID"].id 258 end_acknowledge_frame = CANFrame( # Memory access only works in maintenance mode 259 destination_address=BATTERY_ADDRESS, 260 pgn=PGN["TP.CM"], 261 data=[17, 0x0203, 0xFF, 0xFF, cts_pgn], # End of Message Acknowledge, bytes, packets, reserved, PGN 262 ) 263 cts_r = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["RQST"], data=[cts_pgn]) 264 265 with CANBus(BATTERY_CHANNEL) as bus: 266 # Get request to send 267 logger.write_info_to_report("Sending RTS") 268 rts_frame = bus.process_call(cts_r) 269 logger.write_info_to_report(f"Got response:\n{rts_frame}") 270 logger.write_info_to_report("Not sending CTS. Purposefully waiting") 271 272 # Prep for End of Message 273 expected_bytes = int(rts_frame.data[1]) 274 expected_packets = int(rts_frame.data[2]) 275 rts_pgn_id = int(rts_frame.data[-1]) 276 277 # Acknowledge packets 278 if end_of_message: 279 end_acknowledge_frame.data[1] = expected_bytes 280 end_acknowledge_frame.data[2] = expected_packets 281 end_acknowledge_frame.data[-1] = rts_pgn_id 282 op_frame = bus.process_call(end_acknowledge_frame) 283 logger.write_info_to_report(f"End of message response:\n{op_frame}") 284 return True 285 286 time.sleep(10) 287 if response_frame := bus.read_frame(): 288 if response_frame.data[0] == 255: 289 if response_frame.data[1] == 3: 290 logger.write_info_to_report(f"Got timeout response:\n{response_frame}") 291 else: 292 logger.write_info_to_report(f"Received different connection abort message:\n{response_frame}") 293 return True 294 295 logger.write_info_to_report(f"May not have received a timeout response:\n{response_frame}") 296 297 logger.write_info_to_report("No response after waiting...") 298 return False
Send a RTS.
def
request_data_cts_test( size: int | None = None, sequence_id: int | None = None, end_of_message: bool = True) -> bool:
301def request_data_cts_test(size: int | None = None, sequence_id: int | None = None, end_of_message: bool = True) -> bool: 302 """Send a CTS.""" 303 test_passed = True 304 cts_pgn = PGN["ECUID"].id 305 cts_frame = CANFrame( 306 destination_address=BATTERY_ADDRESS, 307 pgn=PGN[0xEC00], 308 data=[17, 0x0203, 0xFF, 0xFF, cts_pgn], # Packets that can be sent, next packet#, reserved, PGN 309 ) 310 end_acknowledge_frame = CANFrame( # Memory access only works in maintenance mode 311 destination_address=BATTERY_ADDRESS, 312 pgn=PGN["TP.CM"], 313 data=[19, 0, 0, 0xFF, 0], # End of Message Acknowledge, bytes, packets, reserved, PGN 314 ) 315 cts_r = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["Request"], data=[cts_pgn]) 316 317 with CANBus(BATTERY_CHANNEL) as bus: 318 # Get request to send 319 logger.write_info_to_report("Sending RTS") 320 rts_frame = bus.process_call(cts_r) 321 logger.write_info_to_report(f"Got response:\n{rts_frame}") 322 323 # Send malicious CTS 324 expected_bytes = int(rts_frame.data[1]) 325 expected_packets = int(rts_frame.data[2]) 326 rts_pgn_id = int(rts_frame.data[-1]) 327 cts_frame.data[1] = expected_packets if size is None else size 328 cts_frame.data[2] = 1 if sequence_id is None else sequence_id 329 cts_frame.data[-1] = rts_pgn_id 330 331 logger.write_info_to_report("Sending CTS") 332 bus.send_frame(cts_frame) 333 packet_count = 0 334 while response_frame := bus.read_frame(): 335 336 logger.write_info_to_report(f"Got response:\n{response_frame}") 337 338 if response_frame.pgn.id == PGN["TP.CM", [32]].id: 339 continue 340 341 packet_count += 1 342 343 if sequence_id is not None: 344 if expected_packets >= sequence_id > response_frame.data[0]: 345 346 cmp_text = cmp(response_frame.data[0], ">=", sequence_id) 347 348 logger.write_warning_to_html_report( 349 f"Received sequence ID {response_frame.data[0]}, " 350 f"which is before the requested sequence ID: {cmp_text}" 351 ) 352 # Pass BT for now with warning, special case 353 if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS: 354 continue 355 test_passed = False 356 357 # Acknowledge packets 358 if end_of_message: 359 end_acknowledge_frame.data[1] = expected_bytes 360 end_acknowledge_frame.data[2] = expected_packets 361 end_acknowledge_frame.data[-1] = rts_pgn_id 362 op_frame = bus.process_call(end_acknowledge_frame) 363 logger.write_info_to_report(f"End of message response:\n{op_frame}") 364 else: 365 time.sleep(10) 366 if response_frame := bus.read_frame(): 367 logger.write_info_to_report(f"Got possible timeout response:\n{response_frame}") 368 else: 369 logger.write_info_to_report("No response after waiting...") 370 371 if size is not None: 372 cmp_text = cmp(packet_count, "==", expected_packets) 373 if size == 0: 374 if packet_count != 0: 375 logger.write_warning_to_html_report( 376 f"Did not receive expected packets with invalid size: {cmp(packet_count, '==', 0)}" 377 ) 378 test_passed = False 379 else: 380 logger.write_result_to_html_report(f"Received expected packets: {cmp(packet_count, '==', 0)}") 381 elif size > expected_packets: 382 if packet_count != expected_packets: 383 logger.write_warning_to_html_report(f"Did not receive expected packets: {cmp_text}") 384 test_passed = False 385 else: 386 logger.write_result_to_html_report(f"Received expected packets: {cmp_text}") 387 elif size != packet_count: 388 logger.write_warning_to_html_report(f"Did not receive expected packets: {cmp_text}") 389 test_passed = False 390 else: 391 logger.write_result_to_html_report(f"Received expected packets: {cmp(packet_count, '==', size)}") 392 393 if sequence_id is not None: 394 if sequence_id == 0 or sequence_id > expected_packets: 395 cmp_text = cmp(packet_count, "==", 0) 396 if packet_count != 0: 397 logger.write_warning_to_html_report( 398 f"Did not receive expected number of packets with invalid request: {cmp_text}" 399 ) 400 test_passed = False 401 else: 402 logger.write_result_to_html_report( 403 f"Received expected packets with invalid sequence request: {cmp_text}" 404 ) 405 else: 406 overall_expected_packets = expected_packets - sequence_id + 1 407 cmp_text = cmp(packet_count, "==", overall_expected_packets) 408 if packet_count != abs(expected_packets - sequence_id + 1): 409 logger.write_warning_to_html_report(f"Did not receive expected number of packets: {cmp_text}") 410 else: 411 logger.write_result_to_html_report(f"Received expected number of packets: {cmp_text}") 412 413 if sequence_id is None and size is None: 414 if expected_packets == packet_count: 415 logger.write_result_to_html_report( 416 f"Received expected number of packets: {cmp(packet_count, '==', expected_packets)}" 417 ) 418 else: 419 logger.write_warning_to_html_report( 420 f"Did not receive expected number of packets: {cmp(packet_count, '==', expected_packets)}" 421 ) 422 423 if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS and test_passed is False: 424 return True 425 426 return test_passed
Send a CTS.
class
TestMaliciousCTS:
429class TestMaliciousCTS: 430 """Send a CTS with bad sequence ID or size.""" 431 432 def test_malicious_cts(self): 433 """ 434 | Description | Send a CTS with bad sequence ID or size | 435 | :------------------- | :--------------------------------------------------------------- | 436 | Instructions | 1. Send a normal CTS </br>\ 437 2. Send a CTS with sequence ID of zero </br>\ 438 3. Send a CTS with sequence ID too high </br>\ 439 4. Send a CTS with zero size </br>\ 440 5. Send a CTS with small size </br>\ 441 6. Send a CTS with size too big | 442 | Estimated Duration | 85 seconds | 443 """ 444 tests_failed = [] 445 TestLocate6T().test_profile() 446 447 cts_tests = [ 448 {"sequence_id": None, "size": None, "log_text": "Sending normal CTS"}, 449 {"sequence_id": 0, "size": None, "log_text": "Sending CTS with sequence ID of zero"}, 450 {"sequence_id": 3, "size": None, "log_text": "Sending CTS with sequence ID of three"}, 451 {"sequence_id": 200, "size": None, "log_text": "Sending CTS with sequence ID too high"}, 452 {"sequence_id": None, "size": 0, "log_text": "Sending CTS with zero size"}, 453 {"sequence_id": None, "size": 1, "log_text": "Sending CTS with small size"}, 454 {"sequence_id": None, "size": 200, "log_text": "Sending CTS with size too big"}, 455 ] 456 457 for cts_test in cts_tests: 458 if cts_test["sequence_id"] is None: 459 sequence_id = None 460 else: 461 sequence_id = cts_test["sequence_id"] 462 463 if cts_test["size"] is None: 464 size = None 465 else: 466 size = cts_test["size"] 467 468 logger.write_result_to_html_report(cts_test["log_text"]) 469 did_test_pass = request_data_cts_test(size, sequence_id) 470 471 if did_test_pass is False: 472 logger.write_failure_to_html_report(f"Test failed when {cts_test['log_text']}.") 473 tests_failed.append(cts_test["log_text"]) 474 475 if len(tests_failed) > 0: 476 pytest.fail("One or more tests failed Malicious CTS vulnerability scan") 477 elif BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS: 478 logger.write_warning_to_html_report("See warnings for possible discrepancies with J1939 expectations") 479 else: 480 logger.write_result_to_html_report("Malicious CTS vulnerability scans met expectations")
Send a CTS with bad sequence ID or size.
def
test_malicious_cts(self):
432 def test_malicious_cts(self): 433 """ 434 | Description | Send a CTS with bad sequence ID or size | 435 | :------------------- | :--------------------------------------------------------------- | 436 | Instructions | 1. Send a normal CTS </br>\ 437 2. Send a CTS with sequence ID of zero </br>\ 438 3. Send a CTS with sequence ID too high </br>\ 439 4. Send a CTS with zero size </br>\ 440 5. Send a CTS with small size </br>\ 441 6. Send a CTS with size too big | 442 | Estimated Duration | 85 seconds | 443 """ 444 tests_failed = [] 445 TestLocate6T().test_profile() 446 447 cts_tests = [ 448 {"sequence_id": None, "size": None, "log_text": "Sending normal CTS"}, 449 {"sequence_id": 0, "size": None, "log_text": "Sending CTS with sequence ID of zero"}, 450 {"sequence_id": 3, "size": None, "log_text": "Sending CTS with sequence ID of three"}, 451 {"sequence_id": 200, "size": None, "log_text": "Sending CTS with sequence ID too high"}, 452 {"sequence_id": None, "size": 0, "log_text": "Sending CTS with zero size"}, 453 {"sequence_id": None, "size": 1, "log_text": "Sending CTS with small size"}, 454 {"sequence_id": None, "size": 200, "log_text": "Sending CTS with size too big"}, 455 ] 456 457 for cts_test in cts_tests: 458 if cts_test["sequence_id"] is None: 459 sequence_id = None 460 else: 461 sequence_id = cts_test["sequence_id"] 462 463 if cts_test["size"] is None: 464 size = None 465 else: 466 size = cts_test["size"] 467 468 logger.write_result_to_html_report(cts_test["log_text"]) 469 did_test_pass = request_data_cts_test(size, sequence_id) 470 471 if did_test_pass is False: 472 logger.write_failure_to_html_report(f"Test failed when {cts_test['log_text']}.") 473 tests_failed.append(cts_test["log_text"]) 474 475 if len(tests_failed) > 0: 476 pytest.fail("One or more tests failed Malicious CTS vulnerability scan") 477 elif BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS: 478 logger.write_warning_to_html_report("See warnings for possible discrepancies with J1939 expectations") 479 else: 480 logger.write_result_to_html_report("Malicious CTS vulnerability scans met expectations")
| Description | Send a CTS with bad sequence ID or size |
|---|---|
| Instructions | 1. Send a normal CTS 2. Send a CTS with sequence ID of zero 3. Send a CTS with sequence ID too high 4. Send a CTS with zero size 5. Send a CTS with small size 6. Send a CTS with size too big |
| Estimated Duration | 85 seconds |
class
TestConnectionExhaustion:
483class TestConnectionExhaustion: 484 """Start a connection, but don't close it.""" 485 486 def test_connection_exhaustion(self): 487 """ 488 | Description | Start a connection, but don't close it | 489 | :------------------- | :--------------------------------------------------------------- | 490 | Instructions | 1. Open a connection </br>\ 491 2. Refuse to close it and check for timeouts | 492 | Estimated Duration | 85 seconds | 493 """ 494 TestLocate6T().test_profile() 495 496 logger.write_info_to_report("Establishing connection...") 497 test_passed = request_data(end_of_message=False) 498 499 if test_passed is False: 500 logger.write_failure_to_html_report("No Connection Abort message was received after waiting 10 seconds") 501 pytest.fail("No Connection Abort message was received after waiting 10 seconds") 502 else: 503 logger.write_result_to_html_report("Connection Abort message was received after waiting 10 seconds")
Start a connection, but don't close it.
def
test_connection_exhaustion(self):
486 def test_connection_exhaustion(self): 487 """ 488 | Description | Start a connection, but don't close it | 489 | :------------------- | :--------------------------------------------------------------- | 490 | Instructions | 1. Open a connection </br>\ 491 2. Refuse to close it and check for timeouts | 492 | Estimated Duration | 85 seconds | 493 """ 494 TestLocate6T().test_profile() 495 496 logger.write_info_to_report("Establishing connection...") 497 test_passed = request_data(end_of_message=False) 498 499 if test_passed is False: 500 logger.write_failure_to_html_report("No Connection Abort message was received after waiting 10 seconds") 501 pytest.fail("No Connection Abort message was received after waiting 10 seconds") 502 else: 503 logger.write_result_to_html_report("Connection Abort message was received after waiting 10 seconds")
| Description | Start a connection, but don't close it |
|---|---|
| Instructions | 1. Open a connection 2. Refuse to close it and check for timeouts |
| Estimated Duration | 85 seconds |