hitl_tester.test_cases.cyber_6t.vulnerability

A temporary file for quick development. Some of these should be broken out into test cases later on.

(c) 2020-2024 TurnAround Factor, Inc.

#

CUI DISTRIBUTION CONTROL

Controlled by: DLA J68 R&D SBIP

CUI Category: Small Business Research and Technology

Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS

POC: GOV SBIP Program Manager Denise Price, 571-767-0111

Distribution authorized to U.S. Government Agencies only, to protect information not owned by the

U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that

it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests

for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,

Fort Belvoir, VA 22060-6221

#

SBIR DATA RIGHTS

Contract No.:SP4701-23-C-0083

Contractor Name: TurnAround Factor, Inc.

Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005

Expiration of SBIR Data Rights Period: September 24, 2029

The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer

software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights

in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause

contained in the above identified contract. No restrictions apply after the expiration date shown above. Any

reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce

the markings.

Used in these test plans:

  • vulnerability ⠀⠀⠀(cyber_6t/vulnerability.plan)

Example Command (warning: test plan may run other test cases):

  • ./hitl_tester.py vulnerability -DBATTERY_CHANNEL=can0 -DHARD_RESET=False -DBATTERY_ADDRESS=0 -DBATTERY_INFO=<BatteryInfo object at 0x7f34793687a0>
  1"""
  2A temporary file for quick development. Some of these should be broken out into test cases later on.
  3
  4# (c) 2020-2024 TurnAround Factor, Inc.
  5#
  6# CUI DISTRIBUTION CONTROL
  7# Controlled by: DLA J68 R&D SBIP
  8# CUI Category: Small Business Research and Technology
  9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111
 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
 15# Fort Belvoir, VA 22060-6221
 16#
 17# SBIR DATA RIGHTS
 18# Contract No.:SP4701-23-C-0083
 19# Contractor Name: TurnAround Factor, Inc.
 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
 21# Expiration of SBIR Data Rights Period: September 24, 2029
 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
 27# the markings.
 28"""
 29
 30import itertools
 31import time
 32from contextlib import suppress
 33from enum import IntEnum
 34from typing import Literal
 35
 36import pytest
 37
 38from hitl_tester.modules.cyber_6t import spn_types
 39from hitl_tester.modules.cyber_6t.canbus import CANBus, CANFrame
 40from hitl_tester.modules.cyber_6t.j1939da import PGN
 41
 42from hitl_tester.modules import properties
 43from hitl_tester.modules.logger import logger
 44
 45BATTERY_CHANNEL = "can0"
 46"""Channel identification. Expected type is backend dependent."""
 47
 48HARD_RESET = False
 49"""Whether to use a soft or hard reset before each test."""
 50
 51properties.apply()  # Allow modifying the above globals
 52
 53
 54class ManufacturerID(IntEnum):
 55    """Enum to hold Manufacturer IDs."""
 56
 57    SAFT = 269  # NOTE: unused
 58    BRENTRONICS = 822
 59
 60
 61BATTERY_ADDRESS = 0
 62
 63
 64class BatteryInfo:
 65    """class to store Battery Information storage"""
 66
 67    def __init__(self):
 68        self.data = {}
 69
 70    def get_item(self, key):
 71        """Gets a specific item from data"""
 72        return self.data[key]
 73
 74    def set_item(self, key, value):
 75        """Sets a specific item from data"""
 76        self.data[key] = value
 77
 78
 79BATTERY_INFO = BatteryInfo()
 80
 81
 82def cmp(
 83    a: float | str,
 84    sign: Literal["<", "<=", ">", ">=", "=="],
 85    b: float | str,
 86    unit_a: str = "",
 87    unit_b: str = "",
 88    form: str = "",
 89) -> str:
 90    """Generate a formatted string based on a comparison."""
 91    if not unit_b:
 92        unit_b = unit_a
 93
 94    if isinstance(a, str) or isinstance(b, str):
 95        return f"{a:{form}}{unit_a} {('≠', '=')[a == b]} {b:{form}}{unit_b}"
 96
 97    sign_str = {
 98        "<": ("≮", "<")[a < b],
 99        "<=": ("≰", "≤")[a <= b],
100        ">": ("≯", ">")[a > b],
101        ">=": ("≱", "≥")[a >= b],
102        "==": ("≠", "=")[a == b],
103    }
104
105    return f"{a:{form}}{unit_a} {sign_str[sign]} {b:{form}}{unit_b}"
106
107
108@pytest.fixture(scope="class", autouse=True)
109def reset_test_environment():
110    """Before each test class, reset the 6T."""
111
112    if BATTERY_ADDRESS:
113        power_cycle_frame = CANFrame(
114            destination_address=BATTERY_ADDRESS,
115            pgn=PGN["PropA"],
116            data=[0, 0, 1, 1, 1, not HARD_RESET, 1, 3, 0, -1],
117        )
118        maintenance_mode_frame = CANFrame(
119            destination_address=BATTERY_ADDRESS,
120            pgn=PGN["PropA"],
121            data=[0, 0, 1, 1, 1, 3, 1, 3, 0, -1],
122        )
123        factory_reset_frame = CANFrame(
124            destination_address=BATTERY_ADDRESS,
125            pgn=PGN["PropA"],
126            data=[1, 0, 0, -1, 3, 0xF, 3, 0, 0x1F, -1],
127        )
128        with CANBus(BATTERY_CHANNEL) as bus:
129            logger.write_info_to_report("Power-Cycling 6T")
130            bus.process_call(power_cycle_frame)
131            time.sleep(10)
132            logger.write_info_to_report("Entering maintenance mode")
133            bus.process_call(maintenance_mode_frame)
134            logger.write_info_to_report("Factory resetting 6T")
135            bus.process_call(factory_reset_frame)
136            time.sleep(10)
137    else:  # Battery has not yet been found
138        return
139
140
141class TestLocate6T:
142    """Scan for 6T battery."""
143
144    MAX_ATTEMPTS = 12  # 2 minutes worth of attempts
145
146    def __init__(self):
147        self.id = 0
148        self.manufacturer_code = 0
149        self.manufacturer_name = ""
150
151    def test_profile(self) -> None:
152        """
153        | Description          | Scan the bus for devices                                         |
154        | :------------------- | :--------------------------------------------------------------- |
155        | Instructions         | 1. Request the names of everyone on the bus                 </br>\
156                                 2. Use the response data for all communication                   |
157        | Estimated Duration   | 1 second                                                         |
158        """
159        global BATTERY_ADDRESS
160
161        name_request = CANFrame(pgn=PGN["Request"], data=[PGN["Address Claimed"].id])
162        with CANBus(BATTERY_CHANNEL) as bus:
163            for attempt in range(self.MAX_ATTEMPTS):
164                with suppress(IndexError):
165                    if name_frame := bus.process_call(name_request):
166                        # Save responses to profile
167                        self.id = int(name_frame.data[0])
168                        self.manufacturer_code = name_frame.data[1]
169                        BATTERY_INFO.set_item("manufacturer_code", name_frame.data[1])
170                        if (mfg_name := spn_types.manufacturer_id(name_frame.data[1])) == "Reserved":
171                            mfg_name = f"Unknown ({name_frame.data[1]})"
172                        self.manufacturer_name = mfg_name
173                        BATTERY_ADDRESS = name_frame.source_address
174                        break
175                logger.write_warning_to_html_report(f"Failed to locate 6T. Retrying, attempt {attempt + 1}")
176                time.sleep(10)
177            else:
178                message = "Could not locate 6T"
179                logger.write_warning_to_html_report(message)
180                pytest.exit(message)
181
182        # Log results
183        printable_id = "".join(chr(i) if chr(i).isprintable() else "." for i in self.id.to_bytes(3, byteorder="big"))
184        logger.write_result_to_html_report(
185            f"Found {self.manufacturer_name} 6T at address {BATTERY_ADDRESS} "
186            f"(ID: {self.id:06X}, ASCII ID: {printable_id})"
187        )
188
189
190class TestScanForProprietary:
191    """Scan all 512 proprietary commands for any responses."""
192
193    def test_scan_for_proprietary(self):
194        """
195        | Description          | Scan all 512 proprietary commands for any responses              |
196        | :------------------- | :--------------------------------------------------------------- |
197        | Instructions         | 1. Scan addresses $EFxx and $FFxx                           </br>\
198                                 2. Log any successful responses                             </br>\
199                                 3. Check if responded commands are not documented                |
200        | Estimated Duration   | 85 seconds                                                       |
201        """
202        TestLocate6T().test_profile()
203        proprietary_request = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["RQST"], data=[0])
204        ack_responses: dict[int, CANFrame] = {}  # type: ignore # address: response
205        with CANBus(BATTERY_CHANNEL) as bus:
206            for address in itertools.chain(range(0xEF00, 0xF000), range(0xFF00, 0x10000)):
207                proprietary_request.data = [address]
208                if response_frame := bus.process_call(proprietary_request):  # Log if response is not NACK
209                    if not (response_frame.pgn.id == PGN["Acknowledgement"].id and response_frame.data[0] == 1):
210                        ack_responses[address] = response_frame
211                        logger.write_info_to_report(f"Got response at address {address:04X}:\n{response_frame}")
212
213            milprf_commands = [0xFF00, 0xFF01, 0xFF02, 0xFF03, 0xFF04, 0xFF05, 0xFF06, 0xFF07, 0xFF08]
214            j1939_transmit_commands = [0xE800, 0xEE00, 0xFEE6, 0xFCB6, 0xFECA, 0xFECB, 0xFE50, 0xFDC5, 0xFEDA, 0xD800]
215
216            if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.SAFT:
217                saft_commands = [
218                    0xFFD2,
219                    0xFFD4,
220                    0xFFD5,
221                    0xFFD6,
222                    0xFFD7,
223                    0xFFD8,
224                    0xFFDC,
225                    0xFFDD,
226                    0xFFDE,
227                    0xFFE0,
228                    0xFFE1,
229                    0xFFE1,
230                    0xFFE4,
231                    0xFFE9,
232                    0x1EF00,
233                ]
234                documented_commands = list(dict.fromkeys(milprf_commands + j1939_transmit_commands + saft_commands))
235            else:
236                documented_commands = milprf_commands + j1939_transmit_commands
237
238            if unknown_proprietary_commands := set(ack_responses) - set(documented_commands):
239                message = (
240                    f"Unknown Proprietary Command{'s' if len(unknown_proprietary_commands) > 1 else ''}: "
241                    f"{', '.join(map(str, unknown_proprietary_commands))}"
242                )
243                logger.write_warning_to_html_report(message)
244                message = (
245                    f"{len(unknown_proprietary_commands)} command"
246                    f"{'s' if len(unknown_proprietary_commands) > 1 else ''} missing known documentation"
247                )
248                logger.write_failure_to_html_report(message)
249                pytest.fail(message)
250
251            logger.write_result_to_html_report("All responded commands are known and documented")
252
253
254def request_data(end_of_message: bool = True) -> bool:
255    """Send a RTS."""
256    cts_pgn = PGN["ECUID"].id
257    end_acknowledge_frame = CANFrame(  # Memory access only works in maintenance mode
258        destination_address=BATTERY_ADDRESS,
259        pgn=PGN["TP.CM"],
260        data=[17, 0x0203, 0xFF, 0xFF, cts_pgn],  # End of Message Acknowledge, bytes, packets, reserved, PGN
261    )
262    cts_r = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["RQST"], data=[cts_pgn])
263
264    with CANBus(BATTERY_CHANNEL) as bus:
265        # Get request to send
266        logger.write_info_to_report("Sending RTS")
267        rts_frame = bus.process_call(cts_r)
268        logger.write_info_to_report(f"Got response:\n{rts_frame}")
269        logger.write_info_to_report("Not sending CTS. Purposefully waiting")
270
271        # Prep for End of Message
272        expected_bytes = int(rts_frame.data[1])
273        expected_packets = int(rts_frame.data[2])
274        rts_pgn_id = int(rts_frame.data[-1])
275
276        # Acknowledge packets
277        if end_of_message:
278            end_acknowledge_frame.data[1] = expected_bytes
279            end_acknowledge_frame.data[2] = expected_packets
280            end_acknowledge_frame.data[-1] = rts_pgn_id
281            op_frame = bus.process_call(end_acknowledge_frame)
282            logger.write_info_to_report(f"End of message response:\n{op_frame}")
283            return True
284
285        time.sleep(10)
286        if response_frame := bus.read_frame():
287            if response_frame.data[0] == 255:
288                if response_frame.data[1] == 3:
289                    logger.write_info_to_report(f"Got timeout response:\n{response_frame}")
290                else:
291                    logger.write_info_to_report(f"Received different connection abort message:\n{response_frame}")
292                return True
293
294            logger.write_info_to_report(f"May not have received a timeout response:\n{response_frame}")
295
296        logger.write_info_to_report("No response after waiting...")
297        return False
298
299
300def request_data_cts_test(size: int | None = None, sequence_id: int | None = None, end_of_message: bool = True) -> bool:
301    """Send a CTS."""
302    test_passed = True
303    cts_pgn = PGN["ECUID"].id
304    cts_frame = CANFrame(
305        destination_address=BATTERY_ADDRESS,
306        pgn=PGN[0xEC00],
307        data=[17, 0x0203, 0xFF, 0xFF, cts_pgn],  # Packets that can be sent, next packet#, reserved, PGN
308    )
309    end_acknowledge_frame = CANFrame(  # Memory access only works in maintenance mode
310        destination_address=BATTERY_ADDRESS,
311        pgn=PGN["TP.CM"],
312        data=[19, 0, 0, 0xFF, 0],  # End of Message Acknowledge, bytes, packets, reserved, PGN
313    )
314    cts_r = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["Request"], data=[cts_pgn])
315
316    with CANBus(BATTERY_CHANNEL) as bus:
317        # Get request to send
318        logger.write_info_to_report("Sending RTS")
319        rts_frame = bus.process_call(cts_r)
320        logger.write_info_to_report(f"Got response:\n{rts_frame}")
321
322        # Send malicious CTS
323        expected_bytes = int(rts_frame.data[1])
324        expected_packets = int(rts_frame.data[2])
325        rts_pgn_id = int(rts_frame.data[-1])
326        cts_frame.data[1] = expected_packets if size is None else size
327        cts_frame.data[2] = 1 if sequence_id is None else sequence_id
328        cts_frame.data[-1] = rts_pgn_id
329
330        logger.write_info_to_report("Sending CTS")
331        bus.send_frame(cts_frame)
332        packet_count = 0
333        while response_frame := bus.read_frame():
334
335            logger.write_info_to_report(f"Got response:\n{response_frame}")
336
337            if response_frame.pgn.id == PGN["TP.CM", [32]].id:
338                continue
339
340            packet_count += 1
341
342            if sequence_id is not None:
343                if expected_packets >= sequence_id > response_frame.data[0]:
344
345                    cmp_text = cmp(response_frame.data[0], ">=", sequence_id)
346
347                    logger.write_warning_to_html_report(
348                        f"Received sequence ID {response_frame.data[0]}, "
349                        f"which is before the requested sequence ID: {cmp_text}"
350                    )
351                    # Pass BT for now with warning, special case
352                    if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS:
353                        continue
354                    test_passed = False
355
356        # Acknowledge packets
357        if end_of_message:
358            end_acknowledge_frame.data[1] = expected_bytes
359            end_acknowledge_frame.data[2] = expected_packets
360            end_acknowledge_frame.data[-1] = rts_pgn_id
361            op_frame = bus.process_call(end_acknowledge_frame)
362            logger.write_info_to_report(f"End of message response:\n{op_frame}")
363        else:
364            time.sleep(10)
365            if response_frame := bus.read_frame():
366                logger.write_info_to_report(f"Got possible timeout response:\n{response_frame}")
367            else:
368                logger.write_info_to_report("No response after waiting...")
369
370        if size is not None:
371            cmp_text = cmp(packet_count, "==", expected_packets)
372            if size == 0:
373                if packet_count != 0:
374                    logger.write_warning_to_html_report(
375                        f"Did not receive expected packets with invalid size: {cmp(packet_count, '==', 0)}"
376                    )
377                    test_passed = False
378                else:
379                    logger.write_result_to_html_report(f"Received expected packets: {cmp(packet_count, '==', 0)}")
380            elif size > expected_packets:
381                if packet_count != expected_packets:
382                    logger.write_warning_to_html_report(f"Did not receive expected packets: {cmp_text}")
383                    test_passed = False
384                else:
385                    logger.write_result_to_html_report(f"Received expected packets: {cmp_text}")
386            elif size != packet_count:
387                logger.write_warning_to_html_report(f"Did not receive expected packets: {cmp_text}")
388                test_passed = False
389            else:
390                logger.write_result_to_html_report(f"Received expected packets: {cmp(packet_count, '==', size)}")
391
392        if sequence_id is not None:
393            if sequence_id == 0 or sequence_id > expected_packets:
394                cmp_text = cmp(packet_count, "==", 0)
395                if packet_count != 0:
396                    logger.write_warning_to_html_report(
397                        f"Did not receive expected number of packets with invalid request: {cmp_text}"
398                    )
399                    test_passed = False
400                else:
401                    logger.write_result_to_html_report(
402                        f"Received expected packets with invalid sequence request: {cmp_text}"
403                    )
404            else:
405                overall_expected_packets = expected_packets - sequence_id + 1
406                cmp_text = cmp(packet_count, "==", overall_expected_packets)
407                if packet_count != abs(expected_packets - sequence_id + 1):
408                    logger.write_warning_to_html_report(f"Did not receive expected number of packets: {cmp_text}")
409                else:
410                    logger.write_result_to_html_report(f"Received expected number of packets: {cmp_text}")
411
412        if sequence_id is None and size is None:
413            if expected_packets == packet_count:
414                logger.write_result_to_html_report(
415                    f"Received expected number of packets: {cmp(packet_count, '==', expected_packets)}"
416                )
417            else:
418                logger.write_warning_to_html_report(
419                    f"Did not receive expected number of packets: {cmp(packet_count, '==', expected_packets)}"
420                )
421
422        if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS and test_passed is False:
423            return True
424
425        return test_passed
426
427
428class TestMaliciousCTS:
429    """Send a CTS with bad sequence ID or size."""
430
431    def test_malicious_cts(self):
432        """
433        | Description          | Send a CTS with bad sequence ID or size                          |
434        | :------------------- | :--------------------------------------------------------------- |
435        | Instructions         | 1. Send a normal CTS                                        </br>\
436                                 2. Send a CTS with sequence ID of zero                      </br>\
437                                 3. Send a CTS with sequence ID too high                     </br>\
438                                 4. Send a CTS with zero size                                </br>\
439                                 5. Send a CTS with small size                               </br>\
440                                 6. Send a CTS with size too big                                  |
441        | Estimated Duration   | 85 seconds                                                       |
442        """
443        tests_failed = []
444        TestLocate6T().test_profile()
445
446        cts_tests = [
447            {"sequence_id": None, "size": None, "log_text": "Sending normal CTS"},
448            {"sequence_id": 0, "size": None, "log_text": "Sending CTS with sequence ID of zero"},
449            {"sequence_id": 3, "size": None, "log_text": "Sending CTS with sequence ID of three"},
450            {"sequence_id": 200, "size": None, "log_text": "Sending CTS with sequence ID too high"},
451            {"sequence_id": None, "size": 0, "log_text": "Sending CTS with zero size"},
452            {"sequence_id": None, "size": 1, "log_text": "Sending CTS with small size"},
453            {"sequence_id": None, "size": 200, "log_text": "Sending CTS with size too big"},
454        ]
455
456        for cts_test in cts_tests:
457            if cts_test["sequence_id"] is None:
458                sequence_id = None
459            else:
460                sequence_id = cts_test["sequence_id"]
461
462            if cts_test["size"] is None:
463                size = None
464            else:
465                size = cts_test["size"]
466
467            logger.write_result_to_html_report(cts_test["log_text"])
468            did_test_pass = request_data_cts_test(size, sequence_id)
469
470            if did_test_pass is False:
471                logger.write_failure_to_html_report(f"Test failed when {cts_test['log_text']}.")
472                tests_failed.append(cts_test["log_text"])
473
474        if len(tests_failed) > 0:
475            pytest.fail("One or more tests failed Malicious CTS vulnerability scan")
476        elif BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS:
477            logger.write_warning_to_html_report("See warnings for possible discrepancies with J1939 expectations")
478        else:
479            logger.write_result_to_html_report("Malicious CTS vulnerability scans met expectations")
480
481
482class TestConnectionExhaustion:
483    """Start a connection, but don't close it."""
484
485    def test_connection_exhaustion(self):
486        """
487        | Description          | Start a connection, but don't close it                           |
488        | :------------------- | :--------------------------------------------------------------- |
489        | Instructions         | 1. Open a connection                                        </br>\
490                                 2. Refuse to close it and check for timeouts                     |
491        | Estimated Duration   | 85 seconds                                                       |
492        """
493        TestLocate6T().test_profile()
494
495        logger.write_info_to_report("Establishing connection...")
496        test_passed = request_data(end_of_message=False)
497
498        if test_passed is False:
499            logger.write_failure_to_html_report("No Connection Abort message was received after waiting 10 seconds")
500            pytest.fail("No Connection Abort message was received after waiting 10 seconds")
501        else:
502            logger.write_result_to_html_report("Connection Abort message was received after waiting 10 seconds")
BATTERY_CHANNEL = 'can0'

Channel identification. Expected type is backend dependent.

HARD_RESET = False

Whether to use a soft or hard reset before each test.

class ManufacturerID(enum.IntEnum):
55class ManufacturerID(IntEnum):
56    """Enum to hold Manufacturer IDs."""
57
58    SAFT = 269  # NOTE: unused
59    BRENTRONICS = 822

Enum to hold Manufacturer IDs.

SAFT = <ManufacturerID.SAFT: 269>
BRENTRONICS = <ManufacturerID.BRENTRONICS: 822>
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
is_integer
real
imag
numerator
denominator
BATTERY_ADDRESS = 0
class BatteryInfo:
65class BatteryInfo:
66    """class to store Battery Information storage"""
67
68    def __init__(self):
69        self.data = {}
70
71    def get_item(self, key):
72        """Gets a specific item from data"""
73        return self.data[key]
74
75    def set_item(self, key, value):
76        """Sets a specific item from data"""
77        self.data[key] = value

class to store Battery Information storage

data
def get_item(self, key):
71    def get_item(self, key):
72        """Gets a specific item from data"""
73        return self.data[key]

Gets a specific item from data

def set_item(self, key, value):
75    def set_item(self, key, value):
76        """Sets a specific item from data"""
77        self.data[key] = value

Sets a specific item from data

BATTERY_INFO = <BatteryInfo object>
def cmp( a: float | str, sign: Literal['<', '<=', '>', '>=', '=='], b: float | str, unit_a: str = '', unit_b: str = '', form: str = '') -> str:
 83def cmp(
 84    a: float | str,
 85    sign: Literal["<", "<=", ">", ">=", "=="],
 86    b: float | str,
 87    unit_a: str = "",
 88    unit_b: str = "",
 89    form: str = "",
 90) -> str:
 91    """Generate a formatted string based on a comparison."""
 92    if not unit_b:
 93        unit_b = unit_a
 94
 95    if isinstance(a, str) or isinstance(b, str):
 96        return f"{a:{form}}{unit_a} {('≠', '=')[a == b]} {b:{form}}{unit_b}"
 97
 98    sign_str = {
 99        "<": ("≮", "<")[a < b],
100        "<=": ("≰", "≤")[a <= b],
101        ">": ("≯", ">")[a > b],
102        ">=": ("≱", "≥")[a >= b],
103        "==": ("≠", "=")[a == b],
104    }
105
106    return f"{a:{form}}{unit_a} {sign_str[sign]} {b:{form}}{unit_b}"

Generate a formatted string based on a comparison.

@pytest.fixture(scope='class', autouse=True)
def reset_test_environment():
109@pytest.fixture(scope="class", autouse=True)
110def reset_test_environment():
111    """Before each test class, reset the 6T."""
112
113    if BATTERY_ADDRESS:
114        power_cycle_frame = CANFrame(
115            destination_address=BATTERY_ADDRESS,
116            pgn=PGN["PropA"],
117            data=[0, 0, 1, 1, 1, not HARD_RESET, 1, 3, 0, -1],
118        )
119        maintenance_mode_frame = CANFrame(
120            destination_address=BATTERY_ADDRESS,
121            pgn=PGN["PropA"],
122            data=[0, 0, 1, 1, 1, 3, 1, 3, 0, -1],
123        )
124        factory_reset_frame = CANFrame(
125            destination_address=BATTERY_ADDRESS,
126            pgn=PGN["PropA"],
127            data=[1, 0, 0, -1, 3, 0xF, 3, 0, 0x1F, -1],
128        )
129        with CANBus(BATTERY_CHANNEL) as bus:
130            logger.write_info_to_report("Power-Cycling 6T")
131            bus.process_call(power_cycle_frame)
132            time.sleep(10)
133            logger.write_info_to_report("Entering maintenance mode")
134            bus.process_call(maintenance_mode_frame)
135            logger.write_info_to_report("Factory resetting 6T")
136            bus.process_call(factory_reset_frame)
137            time.sleep(10)
138    else:  # Battery has not yet been found
139        return

Before each test class, reset the 6T.

class TestLocate6T:
142class TestLocate6T:
143    """Scan for 6T battery."""
144
145    MAX_ATTEMPTS = 12  # 2 minutes worth of attempts
146
147    def __init__(self):
148        self.id = 0
149        self.manufacturer_code = 0
150        self.manufacturer_name = ""
151
152    def test_profile(self) -> None:
153        """
154        | Description          | Scan the bus for devices                                         |
155        | :------------------- | :--------------------------------------------------------------- |
156        | Instructions         | 1. Request the names of everyone on the bus                 </br>\
157                                 2. Use the response data for all communication                   |
158        | Estimated Duration   | 1 second                                                         |
159        """
160        global BATTERY_ADDRESS
161
162        name_request = CANFrame(pgn=PGN["Request"], data=[PGN["Address Claimed"].id])
163        with CANBus(BATTERY_CHANNEL) as bus:
164            for attempt in range(self.MAX_ATTEMPTS):
165                with suppress(IndexError):
166                    if name_frame := bus.process_call(name_request):
167                        # Save responses to profile
168                        self.id = int(name_frame.data[0])
169                        self.manufacturer_code = name_frame.data[1]
170                        BATTERY_INFO.set_item("manufacturer_code", name_frame.data[1])
171                        if (mfg_name := spn_types.manufacturer_id(name_frame.data[1])) == "Reserved":
172                            mfg_name = f"Unknown ({name_frame.data[1]})"
173                        self.manufacturer_name = mfg_name
174                        BATTERY_ADDRESS = name_frame.source_address
175                        break
176                logger.write_warning_to_html_report(f"Failed to locate 6T. Retrying, attempt {attempt + 1}")
177                time.sleep(10)
178            else:
179                message = "Could not locate 6T"
180                logger.write_warning_to_html_report(message)
181                pytest.exit(message)
182
183        # Log results
184        printable_id = "".join(chr(i) if chr(i).isprintable() else "." for i in self.id.to_bytes(3, byteorder="big"))
185        logger.write_result_to_html_report(
186            f"Found {self.manufacturer_name} 6T at address {BATTERY_ADDRESS} "
187            f"(ID: {self.id:06X}, ASCII ID: {printable_id})"
188        )

Scan for 6T battery.

MAX_ATTEMPTS = 12
id
manufacturer_code
manufacturer_name
def test_profile(self) -> None:
152    def test_profile(self) -> None:
153        """
154        | Description          | Scan the bus for devices                                         |
155        | :------------------- | :--------------------------------------------------------------- |
156        | Instructions         | 1. Request the names of everyone on the bus                 </br>\
157                                 2. Use the response data for all communication                   |
158        | Estimated Duration   | 1 second                                                         |
159        """
160        global BATTERY_ADDRESS
161
162        name_request = CANFrame(pgn=PGN["Request"], data=[PGN["Address Claimed"].id])
163        with CANBus(BATTERY_CHANNEL) as bus:
164            for attempt in range(self.MAX_ATTEMPTS):
165                with suppress(IndexError):
166                    if name_frame := bus.process_call(name_request):
167                        # Save responses to profile
168                        self.id = int(name_frame.data[0])
169                        self.manufacturer_code = name_frame.data[1]
170                        BATTERY_INFO.set_item("manufacturer_code", name_frame.data[1])
171                        if (mfg_name := spn_types.manufacturer_id(name_frame.data[1])) == "Reserved":
172                            mfg_name = f"Unknown ({name_frame.data[1]})"
173                        self.manufacturer_name = mfg_name
174                        BATTERY_ADDRESS = name_frame.source_address
175                        break
176                logger.write_warning_to_html_report(f"Failed to locate 6T. Retrying, attempt {attempt + 1}")
177                time.sleep(10)
178            else:
179                message = "Could not locate 6T"
180                logger.write_warning_to_html_report(message)
181                pytest.exit(message)
182
183        # Log results
184        printable_id = "".join(chr(i) if chr(i).isprintable() else "." for i in self.id.to_bytes(3, byteorder="big"))
185        logger.write_result_to_html_report(
186            f"Found {self.manufacturer_name} 6T at address {BATTERY_ADDRESS} "
187            f"(ID: {self.id:06X}, ASCII ID: {printable_id})"
188        )
Description Scan the bus for devices
Instructions 1. Request the names of everyone on the bus
2. Use the response data for all communication
Estimated Duration 1 second
class TestScanForProprietary:
191class TestScanForProprietary:
192    """Scan all 512 proprietary commands for any responses."""
193
194    def test_scan_for_proprietary(self):
195        """
196        | Description          | Scan all 512 proprietary commands for any responses              |
197        | :------------------- | :--------------------------------------------------------------- |
198        | Instructions         | 1. Scan addresses $EFxx and $FFxx                           </br>\
199                                 2. Log any successful responses                             </br>\
200                                 3. Check if responded commands are not documented                |
201        | Estimated Duration   | 85 seconds                                                       |
202        """
203        TestLocate6T().test_profile()
204        proprietary_request = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["RQST"], data=[0])
205        ack_responses: dict[int, CANFrame] = {}  # type: ignore # address: response
206        with CANBus(BATTERY_CHANNEL) as bus:
207            for address in itertools.chain(range(0xEF00, 0xF000), range(0xFF00, 0x10000)):
208                proprietary_request.data = [address]
209                if response_frame := bus.process_call(proprietary_request):  # Log if response is not NACK
210                    if not (response_frame.pgn.id == PGN["Acknowledgement"].id and response_frame.data[0] == 1):
211                        ack_responses[address] = response_frame
212                        logger.write_info_to_report(f"Got response at address {address:04X}:\n{response_frame}")
213
214            milprf_commands = [0xFF00, 0xFF01, 0xFF02, 0xFF03, 0xFF04, 0xFF05, 0xFF06, 0xFF07, 0xFF08]
215            j1939_transmit_commands = [0xE800, 0xEE00, 0xFEE6, 0xFCB6, 0xFECA, 0xFECB, 0xFE50, 0xFDC5, 0xFEDA, 0xD800]
216
217            if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.SAFT:
218                saft_commands = [
219                    0xFFD2,
220                    0xFFD4,
221                    0xFFD5,
222                    0xFFD6,
223                    0xFFD7,
224                    0xFFD8,
225                    0xFFDC,
226                    0xFFDD,
227                    0xFFDE,
228                    0xFFE0,
229                    0xFFE1,
230                    0xFFE1,
231                    0xFFE4,
232                    0xFFE9,
233                    0x1EF00,
234                ]
235                documented_commands = list(dict.fromkeys(milprf_commands + j1939_transmit_commands + saft_commands))
236            else:
237                documented_commands = milprf_commands + j1939_transmit_commands
238
239            if unknown_proprietary_commands := set(ack_responses) - set(documented_commands):
240                message = (
241                    f"Unknown Proprietary Command{'s' if len(unknown_proprietary_commands) > 1 else ''}: "
242                    f"{', '.join(map(str, unknown_proprietary_commands))}"
243                )
244                logger.write_warning_to_html_report(message)
245                message = (
246                    f"{len(unknown_proprietary_commands)} command"
247                    f"{'s' if len(unknown_proprietary_commands) > 1 else ''} missing known documentation"
248                )
249                logger.write_failure_to_html_report(message)
250                pytest.fail(message)
251
252            logger.write_result_to_html_report("All responded commands are known and documented")

Scan all 512 proprietary commands for any responses.

def test_scan_for_proprietary(self):
194    def test_scan_for_proprietary(self):
195        """
196        | Description          | Scan all 512 proprietary commands for any responses              |
197        | :------------------- | :--------------------------------------------------------------- |
198        | Instructions         | 1. Scan addresses $EFxx and $FFxx                           </br>\
199                                 2. Log any successful responses                             </br>\
200                                 3. Check if responded commands are not documented                |
201        | Estimated Duration   | 85 seconds                                                       |
202        """
203        TestLocate6T().test_profile()
204        proprietary_request = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["RQST"], data=[0])
205        ack_responses: dict[int, CANFrame] = {}  # type: ignore # address: response
206        with CANBus(BATTERY_CHANNEL) as bus:
207            for address in itertools.chain(range(0xEF00, 0xF000), range(0xFF00, 0x10000)):
208                proprietary_request.data = [address]
209                if response_frame := bus.process_call(proprietary_request):  # Log if response is not NACK
210                    if not (response_frame.pgn.id == PGN["Acknowledgement"].id and response_frame.data[0] == 1):
211                        ack_responses[address] = response_frame
212                        logger.write_info_to_report(f"Got response at address {address:04X}:\n{response_frame}")
213
214            milprf_commands = [0xFF00, 0xFF01, 0xFF02, 0xFF03, 0xFF04, 0xFF05, 0xFF06, 0xFF07, 0xFF08]
215            j1939_transmit_commands = [0xE800, 0xEE00, 0xFEE6, 0xFCB6, 0xFECA, 0xFECB, 0xFE50, 0xFDC5, 0xFEDA, 0xD800]
216
217            if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.SAFT:
218                saft_commands = [
219                    0xFFD2,
220                    0xFFD4,
221                    0xFFD5,
222                    0xFFD6,
223                    0xFFD7,
224                    0xFFD8,
225                    0xFFDC,
226                    0xFFDD,
227                    0xFFDE,
228                    0xFFE0,
229                    0xFFE1,
230                    0xFFE1,
231                    0xFFE4,
232                    0xFFE9,
233                    0x1EF00,
234                ]
235                documented_commands = list(dict.fromkeys(milprf_commands + j1939_transmit_commands + saft_commands))
236            else:
237                documented_commands = milprf_commands + j1939_transmit_commands
238
239            if unknown_proprietary_commands := set(ack_responses) - set(documented_commands):
240                message = (
241                    f"Unknown Proprietary Command{'s' if len(unknown_proprietary_commands) > 1 else ''}: "
242                    f"{', '.join(map(str, unknown_proprietary_commands))}"
243                )
244                logger.write_warning_to_html_report(message)
245                message = (
246                    f"{len(unknown_proprietary_commands)} command"
247                    f"{'s' if len(unknown_proprietary_commands) > 1 else ''} missing known documentation"
248                )
249                logger.write_failure_to_html_report(message)
250                pytest.fail(message)
251
252            logger.write_result_to_html_report("All responded commands are known and documented")
Description Scan all 512 proprietary commands for any responses
Instructions 1. Scan addresses $EFxx and $FFxx
2. Log any successful responses
3. Check if responded commands are not documented
Estimated Duration 85 seconds
def request_data(end_of_message: bool = True) -> bool:
255def request_data(end_of_message: bool = True) -> bool:
256    """Send a RTS."""
257    cts_pgn = PGN["ECUID"].id
258    end_acknowledge_frame = CANFrame(  # Memory access only works in maintenance mode
259        destination_address=BATTERY_ADDRESS,
260        pgn=PGN["TP.CM"],
261        data=[17, 0x0203, 0xFF, 0xFF, cts_pgn],  # End of Message Acknowledge, bytes, packets, reserved, PGN
262    )
263    cts_r = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["RQST"], data=[cts_pgn])
264
265    with CANBus(BATTERY_CHANNEL) as bus:
266        # Get request to send
267        logger.write_info_to_report("Sending RTS")
268        rts_frame = bus.process_call(cts_r)
269        logger.write_info_to_report(f"Got response:\n{rts_frame}")
270        logger.write_info_to_report("Not sending CTS. Purposefully waiting")
271
272        # Prep for End of Message
273        expected_bytes = int(rts_frame.data[1])
274        expected_packets = int(rts_frame.data[2])
275        rts_pgn_id = int(rts_frame.data[-1])
276
277        # Acknowledge packets
278        if end_of_message:
279            end_acknowledge_frame.data[1] = expected_bytes
280            end_acknowledge_frame.data[2] = expected_packets
281            end_acknowledge_frame.data[-1] = rts_pgn_id
282            op_frame = bus.process_call(end_acknowledge_frame)
283            logger.write_info_to_report(f"End of message response:\n{op_frame}")
284            return True
285
286        time.sleep(10)
287        if response_frame := bus.read_frame():
288            if response_frame.data[0] == 255:
289                if response_frame.data[1] == 3:
290                    logger.write_info_to_report(f"Got timeout response:\n{response_frame}")
291                else:
292                    logger.write_info_to_report(f"Received different connection abort message:\n{response_frame}")
293                return True
294
295            logger.write_info_to_report(f"May not have received a timeout response:\n{response_frame}")
296
297        logger.write_info_to_report("No response after waiting...")
298        return False

Send a RTS.

def request_data_cts_test( size: int | None = None, sequence_id: int | None = None, end_of_message: bool = True) -> bool:
301def request_data_cts_test(size: int | None = None, sequence_id: int | None = None, end_of_message: bool = True) -> bool:
302    """Send a CTS."""
303    test_passed = True
304    cts_pgn = PGN["ECUID"].id
305    cts_frame = CANFrame(
306        destination_address=BATTERY_ADDRESS,
307        pgn=PGN[0xEC00],
308        data=[17, 0x0203, 0xFF, 0xFF, cts_pgn],  # Packets that can be sent, next packet#, reserved, PGN
309    )
310    end_acknowledge_frame = CANFrame(  # Memory access only works in maintenance mode
311        destination_address=BATTERY_ADDRESS,
312        pgn=PGN["TP.CM"],
313        data=[19, 0, 0, 0xFF, 0],  # End of Message Acknowledge, bytes, packets, reserved, PGN
314    )
315    cts_r = CANFrame(destination_address=BATTERY_ADDRESS, pgn=PGN["Request"], data=[cts_pgn])
316
317    with CANBus(BATTERY_CHANNEL) as bus:
318        # Get request to send
319        logger.write_info_to_report("Sending RTS")
320        rts_frame = bus.process_call(cts_r)
321        logger.write_info_to_report(f"Got response:\n{rts_frame}")
322
323        # Send malicious CTS
324        expected_bytes = int(rts_frame.data[1])
325        expected_packets = int(rts_frame.data[2])
326        rts_pgn_id = int(rts_frame.data[-1])
327        cts_frame.data[1] = expected_packets if size is None else size
328        cts_frame.data[2] = 1 if sequence_id is None else sequence_id
329        cts_frame.data[-1] = rts_pgn_id
330
331        logger.write_info_to_report("Sending CTS")
332        bus.send_frame(cts_frame)
333        packet_count = 0
334        while response_frame := bus.read_frame():
335
336            logger.write_info_to_report(f"Got response:\n{response_frame}")
337
338            if response_frame.pgn.id == PGN["TP.CM", [32]].id:
339                continue
340
341            packet_count += 1
342
343            if sequence_id is not None:
344                if expected_packets >= sequence_id > response_frame.data[0]:
345
346                    cmp_text = cmp(response_frame.data[0], ">=", sequence_id)
347
348                    logger.write_warning_to_html_report(
349                        f"Received sequence ID {response_frame.data[0]}, "
350                        f"which is before the requested sequence ID: {cmp_text}"
351                    )
352                    # Pass BT for now with warning, special case
353                    if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS:
354                        continue
355                    test_passed = False
356
357        # Acknowledge packets
358        if end_of_message:
359            end_acknowledge_frame.data[1] = expected_bytes
360            end_acknowledge_frame.data[2] = expected_packets
361            end_acknowledge_frame.data[-1] = rts_pgn_id
362            op_frame = bus.process_call(end_acknowledge_frame)
363            logger.write_info_to_report(f"End of message response:\n{op_frame}")
364        else:
365            time.sleep(10)
366            if response_frame := bus.read_frame():
367                logger.write_info_to_report(f"Got possible timeout response:\n{response_frame}")
368            else:
369                logger.write_info_to_report("No response after waiting...")
370
371        if size is not None:
372            cmp_text = cmp(packet_count, "==", expected_packets)
373            if size == 0:
374                if packet_count != 0:
375                    logger.write_warning_to_html_report(
376                        f"Did not receive expected packets with invalid size: {cmp(packet_count, '==', 0)}"
377                    )
378                    test_passed = False
379                else:
380                    logger.write_result_to_html_report(f"Received expected packets: {cmp(packet_count, '==', 0)}")
381            elif size > expected_packets:
382                if packet_count != expected_packets:
383                    logger.write_warning_to_html_report(f"Did not receive expected packets: {cmp_text}")
384                    test_passed = False
385                else:
386                    logger.write_result_to_html_report(f"Received expected packets: {cmp_text}")
387            elif size != packet_count:
388                logger.write_warning_to_html_report(f"Did not receive expected packets: {cmp_text}")
389                test_passed = False
390            else:
391                logger.write_result_to_html_report(f"Received expected packets: {cmp(packet_count, '==', size)}")
392
393        if sequence_id is not None:
394            if sequence_id == 0 or sequence_id > expected_packets:
395                cmp_text = cmp(packet_count, "==", 0)
396                if packet_count != 0:
397                    logger.write_warning_to_html_report(
398                        f"Did not receive expected number of packets with invalid request: {cmp_text}"
399                    )
400                    test_passed = False
401                else:
402                    logger.write_result_to_html_report(
403                        f"Received expected packets with invalid sequence request: {cmp_text}"
404                    )
405            else:
406                overall_expected_packets = expected_packets - sequence_id + 1
407                cmp_text = cmp(packet_count, "==", overall_expected_packets)
408                if packet_count != abs(expected_packets - sequence_id + 1):
409                    logger.write_warning_to_html_report(f"Did not receive expected number of packets: {cmp_text}")
410                else:
411                    logger.write_result_to_html_report(f"Received expected number of packets: {cmp_text}")
412
413        if sequence_id is None and size is None:
414            if expected_packets == packet_count:
415                logger.write_result_to_html_report(
416                    f"Received expected number of packets: {cmp(packet_count, '==', expected_packets)}"
417                )
418            else:
419                logger.write_warning_to_html_report(
420                    f"Did not receive expected number of packets: {cmp(packet_count, '==', expected_packets)}"
421                )
422
423        if BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS and test_passed is False:
424            return True
425
426        return test_passed

Send a CTS.

class TestMaliciousCTS:
429class TestMaliciousCTS:
430    """Send a CTS with bad sequence ID or size."""
431
432    def test_malicious_cts(self):
433        """
434        | Description          | Send a CTS with bad sequence ID or size                          |
435        | :------------------- | :--------------------------------------------------------------- |
436        | Instructions         | 1. Send a normal CTS                                        </br>\
437                                 2. Send a CTS with sequence ID of zero                      </br>\
438                                 3. Send a CTS with sequence ID too high                     </br>\
439                                 4. Send a CTS with zero size                                </br>\
440                                 5. Send a CTS with small size                               </br>\
441                                 6. Send a CTS with size too big                                  |
442        | Estimated Duration   | 85 seconds                                                       |
443        """
444        tests_failed = []
445        TestLocate6T().test_profile()
446
447        cts_tests = [
448            {"sequence_id": None, "size": None, "log_text": "Sending normal CTS"},
449            {"sequence_id": 0, "size": None, "log_text": "Sending CTS with sequence ID of zero"},
450            {"sequence_id": 3, "size": None, "log_text": "Sending CTS with sequence ID of three"},
451            {"sequence_id": 200, "size": None, "log_text": "Sending CTS with sequence ID too high"},
452            {"sequence_id": None, "size": 0, "log_text": "Sending CTS with zero size"},
453            {"sequence_id": None, "size": 1, "log_text": "Sending CTS with small size"},
454            {"sequence_id": None, "size": 200, "log_text": "Sending CTS with size too big"},
455        ]
456
457        for cts_test in cts_tests:
458            if cts_test["sequence_id"] is None:
459                sequence_id = None
460            else:
461                sequence_id = cts_test["sequence_id"]
462
463            if cts_test["size"] is None:
464                size = None
465            else:
466                size = cts_test["size"]
467
468            logger.write_result_to_html_report(cts_test["log_text"])
469            did_test_pass = request_data_cts_test(size, sequence_id)
470
471            if did_test_pass is False:
472                logger.write_failure_to_html_report(f"Test failed when {cts_test['log_text']}.")
473                tests_failed.append(cts_test["log_text"])
474
475        if len(tests_failed) > 0:
476            pytest.fail("One or more tests failed Malicious CTS vulnerability scan")
477        elif BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS:
478            logger.write_warning_to_html_report("See warnings for possible discrepancies with J1939 expectations")
479        else:
480            logger.write_result_to_html_report("Malicious CTS vulnerability scans met expectations")

Send a CTS with bad sequence ID or size.

def test_malicious_cts(self):
432    def test_malicious_cts(self):
433        """
434        | Description          | Send a CTS with bad sequence ID or size                          |
435        | :------------------- | :--------------------------------------------------------------- |
436        | Instructions         | 1. Send a normal CTS                                        </br>\
437                                 2. Send a CTS with sequence ID of zero                      </br>\
438                                 3. Send a CTS with sequence ID too high                     </br>\
439                                 4. Send a CTS with zero size                                </br>\
440                                 5. Send a CTS with small size                               </br>\
441                                 6. Send a CTS with size too big                                  |
442        | Estimated Duration   | 85 seconds                                                       |
443        """
444        tests_failed = []
445        TestLocate6T().test_profile()
446
447        cts_tests = [
448            {"sequence_id": None, "size": None, "log_text": "Sending normal CTS"},
449            {"sequence_id": 0, "size": None, "log_text": "Sending CTS with sequence ID of zero"},
450            {"sequence_id": 3, "size": None, "log_text": "Sending CTS with sequence ID of three"},
451            {"sequence_id": 200, "size": None, "log_text": "Sending CTS with sequence ID too high"},
452            {"sequence_id": None, "size": 0, "log_text": "Sending CTS with zero size"},
453            {"sequence_id": None, "size": 1, "log_text": "Sending CTS with small size"},
454            {"sequence_id": None, "size": 200, "log_text": "Sending CTS with size too big"},
455        ]
456
457        for cts_test in cts_tests:
458            if cts_test["sequence_id"] is None:
459                sequence_id = None
460            else:
461                sequence_id = cts_test["sequence_id"]
462
463            if cts_test["size"] is None:
464                size = None
465            else:
466                size = cts_test["size"]
467
468            logger.write_result_to_html_report(cts_test["log_text"])
469            did_test_pass = request_data_cts_test(size, sequence_id)
470
471            if did_test_pass is False:
472                logger.write_failure_to_html_report(f"Test failed when {cts_test['log_text']}.")
473                tests_failed.append(cts_test["log_text"])
474
475        if len(tests_failed) > 0:
476            pytest.fail("One or more tests failed Malicious CTS vulnerability scan")
477        elif BATTERY_INFO.get_item("manufacturer_code") == ManufacturerID.BRENTRONICS:
478            logger.write_warning_to_html_report("See warnings for possible discrepancies with J1939 expectations")
479        else:
480            logger.write_result_to_html_report("Malicious CTS vulnerability scans met expectations")
Description Send a CTS with bad sequence ID or size
Instructions 1. Send a normal CTS
2. Send a CTS with sequence ID of zero
3. Send a CTS with sequence ID too high
4. Send a CTS with zero size
5. Send a CTS with small size
6. Send a CTS with size too big
Estimated Duration 85 seconds
class TestConnectionExhaustion:
483class TestConnectionExhaustion:
484    """Start a connection, but don't close it."""
485
486    def test_connection_exhaustion(self):
487        """
488        | Description          | Start a connection, but don't close it                           |
489        | :------------------- | :--------------------------------------------------------------- |
490        | Instructions         | 1. Open a connection                                        </br>\
491                                 2. Refuse to close it and check for timeouts                     |
492        | Estimated Duration   | 85 seconds                                                       |
493        """
494        TestLocate6T().test_profile()
495
496        logger.write_info_to_report("Establishing connection...")
497        test_passed = request_data(end_of_message=False)
498
499        if test_passed is False:
500            logger.write_failure_to_html_report("No Connection Abort message was received after waiting 10 seconds")
501            pytest.fail("No Connection Abort message was received after waiting 10 seconds")
502        else:
503            logger.write_result_to_html_report("Connection Abort message was received after waiting 10 seconds")

Start a connection, but don't close it.

def test_connection_exhaustion(self):
486    def test_connection_exhaustion(self):
487        """
488        | Description          | Start a connection, but don't close it                           |
489        | :------------------- | :--------------------------------------------------------------- |
490        | Instructions         | 1. Open a connection                                        </br>\
491                                 2. Refuse to close it and check for timeouts                     |
492        | Estimated Duration   | 85 seconds                                                       |
493        """
494        TestLocate6T().test_profile()
495
496        logger.write_info_to_report("Establishing connection...")
497        test_passed = request_data(end_of_message=False)
498
499        if test_passed is False:
500            logger.write_failure_to_html_report("No Connection Abort message was received after waiting 10 seconds")
501            pytest.fail("No Connection Abort message was received after waiting 10 seconds")
502        else:
503            logger.write_result_to_html_report("Connection Abort message was received after waiting 10 seconds")
Description Start a connection, but don't close it
Instructions 1. Open a connection
2. Refuse to close it and check for timeouts
Estimated Duration 85 seconds