hitl_tester.modules.cyber_6t.lcd_manager

LCD controller for Crystalfontz screens.

(c) 2020-2024 TurnAround Factor, Inc.

#

CUI DISTRIBUTION CONTROL

Controlled by: DLA J68 R&D SBIP

CUI Category: Small Business Research and Technology

Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS

POC: GOV SBIP Program Manager Denise Price, 571-767-0111

Distribution authorized to U.S. Government Agencies only, to protect information not owned by the

U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that

it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests

for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,

Fort Belvoir, VA 22060-6221

#

SBIR DATA RIGHTS

Contract No.:SP4701-23-C-0083

Contractor Name: TurnAround Factor, Inc.

Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005

Expiration of SBIR Data Rights Period: September 24, 2029

The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer

software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights

in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause

contained in the above identified contract. No restrictions apply after the expiration date shown above. Any

reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce

the markings.

  1"""
  2LCD controller for Crystalfontz screens.
  3
  4# (c) 2020-2024 TurnAround Factor, Inc.
  5#
  6# CUI DISTRIBUTION CONTROL
  7# Controlled by: DLA J68 R&D SBIP
  8# CUI Category: Small Business Research and Technology
  9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111
 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
 15# Fort Belvoir, VA 22060-6221
 16#
 17# SBIR DATA RIGHTS
 18# Contract No.:SP4701-23-C-0083
 19# Contractor Name: TurnAround Factor, Inc.
 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
 21# Expiration of SBIR Data Rights Period: September 24, 2029
 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
 27# the markings.
 28"""
 29
 30import csv
 31import errno
 32import itertools
 33import logging
 34import os
 35import platform
 36import queue
 37import shlex
 38import signal
 39import subprocess
 40import sys
 41import threading
 42import time
 43import traceback
 44from collections import deque
 45from contextlib import suppress
 46from dataclasses import dataclass, asdict, replace
 47from datetime import datetime, timezone
 48from enum import IntEnum
 49from multiprocessing.managers import BaseManager
 50from pathlib import Path
 51from queue import Queue
 52from subprocess import DEVNULL
 53from typing import ClassVar, Any, Self
 54
 55import serial
 56import serial.tools.list_ports
 57
 58
 59WIDTH = 20
 60HEIGHT = 4
 61PORT = 51152
 62AUTH_KEY = b"HITL"
 63REPORT_NAME = Path(f"{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S.%f')}_{platform.node()}_lcd_manager")
 64LOG_FOLDER = Path(__file__).parent.parent.resolve() / "logs" / REPORT_NAME
 65LOG_FILE = LOG_FOLDER / REPORT_NAME.with_suffix(".txt")
 66logger = logging.getLogger("HITL-LCD")
 67
 68
 69def signal_handler(_sig, _frame):
 70    """Exit gracefully."""
 71    logger.info("Shutting down...")
 72    sys.exit(0)
 73
 74
 75@dataclass
 76class Crystalfontz:
 77    """Hardware info for Crystalfontz LCD."""
 78
 79    models: ClassVar[list[Self]] = []
 80    pid: int
 81    vid: int = 8763
 82    baud: int = 115200
 83
 84    def __post_init__(self):
 85        self.models.append(self)
 86
 87
 88class LCDPort:
 89    """Manager for serial port."""
 90
 91    cfa835 = Crystalfontz(pid=5)
 92    # cfa635 = Crystalfontz(pid=11)
 93
 94    def __init__(self) -> None:
 95        self.index = 0
 96        self.buffer: list[int] = []
 97
 98        # Get serial port
 99        for comport, model in itertools.product(serial.tools.list_ports.comports(), Crystalfontz.models):
100            if comport.vid == model.vid and comport.pid == model.pid:
101                logger.debug(f"Found LCD VID: {comport.vid}, PID: {comport.pid}")
102                break
103        else:
104            raise RuntimeError(f"LCD could not be found: {', '.join(map(repr, Crystalfontz.models))}")
105        self.serial = serial.Serial(port=comport.device, baudrate=model.baud, timeout=0)
106
107    def write(self, data: bytes, timeout: float | None = None):
108        """Write data to the serial port."""
109        self.serial.write_timeout = timeout  # 0 seems to block?
110        logger.debug(f"Writing bytes: {data!r}")
111        with suppress(serial.serialutil.SerialTimeoutException):
112            self.serial.write(data)
113            self.serial.flush()
114
115    def consume(self, timeout: float = 0.1):
116        """Fetch the current byte and advance."""
117        start_time = time.perf_counter()
118        while self.serial.in_waiting > 0 or self.index >= len(self.buffer):
119            if self.serial.in_waiting > 0:
120                try:
121                    self.buffer += list(self.serial.read(self.serial.in_waiting))
122                except serial.serialutil.SerialException as e:
123                    logger.debug(f"Got error: {e}")
124                    time.sleep(1)
125                start_time = time.perf_counter()
126            if timeout and time.perf_counter() - start_time > timeout:
127                raise TimeoutError(f"No data after {timeout} seconds.")
128        result = self.buffer[self.index]
129        self.index += 1
130        return result
131
132    def clear_history(self):
133        """Discard data before the current index."""
134        self.buffer = self.buffer[self.index :]
135        self.index = 0
136
137
138@dataclass
139class RawPacket:
140    """A raw LCD packet."""
141
142    command: int = 0
143    data_length: int = 0
144    data: bytes = b""
145    crc: int = 0
146
147    def to_bytes(self) -> bytes:
148        """Convert everything but the CRC to bytes."""
149        return (
150            bytes([self.command & 0xFF, self.data_length & 0xFF]) + self.data + self.crc.to_bytes(2, byteorder="little")
151        )
152
153    def calculate_crc(self):
154        """Fill in CRC with the correct value."""
155        crc = 0xFFFF  # Preset to all 1's, prevent loss of leading zeros
156        for byte in self.to_bytes()[:-2]:  # Strip current CRC bytes
157            for _ in range(8):
158                if (crc ^ byte) & 0x01:
159                    crc >>= 1
160                    crc ^= 0x8408
161                else:
162                    crc >>= 1
163                byte >>= 1
164        self.crc = (~crc) & 0xFFFF
165
166
167class LCDMonitor:
168    """Manage LCD events."""
169
170    # Supported commands: hw info (0x1), brightness (0x0E, 0x0D), cursor (0x0C), error (0xC0), leds (0x22)
171
172    MAX_ATTEMPTS = 2
173
174    class ResponseError(IntEnum):
175        """Possible LCD error responses."""
176
177        UNKNOWN_ERROR = 1
178        UNKNOWN_COMMAND = 2
179        INVALID_COMMAND_LENGTH_OPTIONS = 3
180        WRITING_FLASH_MEM_FAILED = 4
181        READING_FLASH_MEM_FAILED = 5
182        CFA_FBSCAB_NOT_PRESENT_AT_INDEX = 6
183        CFA_FBSCAB_DID_NOT_REPLY_TO_REG = 7
184        MICRO_SD_NOT_INSERTED_OR_BAD = 8
185        MICRO_SD_NOT_FORMATTED = 9
186        MICRO_SD_FILE_COULD_NOT_BE_FOUND_OPENED = 10
187        MICRO_SD_UNKNOWN_ERROR = 11
188        MICRO_SD_FILE_COULD_NOT_BE_READ = 12
189        MICRO_SD_COULD_NOT_BE_WRITTEN = 13
190        FILE_HEADER_IS_INVALID = 14
191        MICRO_SD_FILE_IS_ALREADY_OPEN = 15
192        MICRO_SD_FILE_OPERATION_FAILED = 16
193        MICRO_SD_FILE_HAS_NOT_BEEN_OPENED = 17
194        GFX_STREAM_ALREADY_STARTED = 18
195        GFX_IS_OUT_OF_LCD_BOUNDS = 19
196        VIDEO_IS_NOT_OPEN_IN_SLOT = 20
197        GFX_STREAM_HAS_TIMED_OUT = 21
198        GPIO_NOT_SET_FOR_ATX_USE = 22
199        INTERFACE_NOT_ENABLED = 23
200        INTERFACE_NOT_AVAILABLE_ = 24
201
202    def __init__(self, packet_buffer_size: int = 4096):
203        self.dropped_packets = 0
204        self.packet_deque: deque[RawPacket] = deque(maxlen=packet_buffer_size)
205
206        # Start serial monitor thread
207        self.serial_port = LCDPort()
208
209        # Create log file
210        with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile:
211            csv.writer(csvfile).writerow(asdict(RawPacket()))
212
213        # Start monitor thread
214        self.daemon = threading.Thread(target=self.runtime, daemon=True)
215        self.daemon.start()
216
217    def runtime(self):
218        """Fetch packets as quickly as possible."""
219
220        def fetch_packet() -> RawPacket:
221            """Decode serial stream to a valid packet."""
222            while True:
223                # Fetch raw packet
224                try:
225                    raw_packet = RawPacket(
226                        command=self.serial_port.consume(timeout=0),
227                        data_length=(length := self.serial_port.consume()),
228                        data=bytes(self.serial_port.consume() for _ in range(length)),
229                        crc=self.serial_port.consume() | (self.serial_port.consume() << 8),
230                    )
231                except TimeoutError:
232                    logger.warning(
233                        "Timeout when attempting to read packet. Possibly an invalid packet cause by dropped bytes."
234                    )
235                else:
236                    # Validate CRC
237                    logger.debug(f"Packet?: {asdict(raw_packet)}")
238                    valid_packet = replace(raw_packet)  # Calculate the expected CRC
239                    valid_packet.calculate_crc()
240                    if raw_packet.crc == valid_packet.crc:
241                        logger.debug("got lcd packet")
242                        self.serial_port.clear_history()
243                        return raw_packet
244                    logger.debug(f"Invalid CRC 0x{raw_packet.crc:04X}, expected 0x{valid_packet.crc:04X}.")
245                logger.debug(f"LCD Buffer: {self.serial_port.buffer}")
246                logger.debug(f"Discarding lcd byte: {self.serial_port.buffer[0]:02X}")
247                self.serial_port.index = 1
248                self.serial_port.clear_history()
249
250        while True:
251            packet = fetch_packet()
252            if len(self.packet_deque) == self.packet_deque.maxlen:  # Deque is full
253                self.dropped_packets += 1
254            self.packet_deque.append(packet)  # Save to deque
255            logger.debug(f"{len(self.packet_deque)} packet(s) in queue: {', '.join(map(str, self.packet_deque))}")
256
257            # Save to file
258            with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile:
259                csv.writer(csvfile).writerow(asdict(packet).values())
260
261            time.sleep(0.1)  # Packets only come in when a
262
263    def read(self, blocking: bool = True, timeout: float = 0) -> RawPacket | None:
264        """Get keys or acknowledgment."""
265        start_time = time.perf_counter()
266        while not timeout or time.perf_counter() - start_time < timeout:
267            with suppress(IndexError):
268                return self.packet_deque.popleft()
269            if not blocking:
270                return None
271            time.sleep(0.1)  # Packets only come in on user input
272        raise TimeoutError(f"A packet was not received after {timeout:.1f} seconds.")
273
274    def write(self, command_packet: RawPacket, timeout: float = 2):
275        """Send command to LCD"""
276        for attempt_number in range(self.MAX_ATTEMPTS):
277            self.serial_port.write(command_packet.to_bytes(), timeout)
278            with suppress(TimeoutError):
279                response = self.read(timeout=timeout)
280                if response and (0x40 | command_packet.command) == response.command:
281                    break
282                if response and (response.command & 0xC0) == 0xC0:
283                    error_enum = self.ResponseError(response.command & ((~0xC0) & 0xFF))
284                    error_message = error_enum.name.capitalize().replace("_", " ")
285                    logger.warning(f"Received error response: {error_message}.")
286                    break
287            logger.error(f"LCD response not received or invalid for attempt {attempt_number + 1}.")
288            time.sleep(0.1)
289
290
291class KeyCode(IntEnum):
292    """The key pressed on the keypad."""
293
294    KEY_UP_PRESS = 1
295    KEY_DOWN_PRESS = 2
296    KEY_LEFT_PRESS = 3
297    KEY_RIGHT_PRESS = 4
298    KEY_ENTER_PRESS = 5
299    KEY_EXIT_PRESS = 6
300    KEY_UP_RELEASE = 7
301    KEY_DOWN_RELEASE = 8
302    KEY_LEFT_RELEASE = 9
303    KEY_RIGHT_RELEASE = 10
304    KEY_ENTER_RELEASE = 11
305    KEY_EXIT_RELEASE = 12
306
307
308class LEDColor(IntEnum):
309    """The LED color."""
310
311    OFF = 0
312    RED = 1
313    GREEN = 2
314    YELLOW = 3
315
316
317class UIHandler:
318    """Manage LCD UI."""
319
320    FREQUENCY = 20  # How often to check for input / refresh screen
321
322    def __init__(self, lcd_monitor: LCDMonitor):
323        self.lcd_monitor = lcd_monitor
324        self.page: PageType = TestMenuType()
325        self.stack: list[Any] = []  # Allows pages to communicate
326        # self.reset()
327        self.clear_screen()
328        self.page.draw(self)
329
330        # Progress queue for communicating with the test
331        class QueueManager(BaseManager):
332            """Manager for multiprocess queue."""
333
334        self._backing_queue: Queue[dict[str, str | float]] = Queue()
335        QueueManager.register("get_queue", callable=lambda: self._backing_queue)
336        self._queue_manager = QueueManager(address=("", PORT), authkey=AUTH_KEY)
337        self._queue_manager.start()  # pylint: disable=consider-using-with
338        self.queue: Queue[dict[str, str | float]] = self._queue_manager.get_queue()  # type: ignore[attr-defined]  # pylint: disable=no-member
339
340    def run_forever(self):
341        """Process LCD events forever."""
342        while True:
343            try:
344                # Check if a test is running and show progress if so
345                if not isinstance(self.page, ProgressType):
346                    current_name = Path("/proc/self/comm").read_text(encoding="ascii")
347                    for pid in Path("/proc").glob("[0-9]*"):
348                        with suppress(FileNotFoundError):
349                            process_name = (pid / "comm").read_text(encoding="ascii")
350                            if process_name != current_name and process_name.startswith("HITL") and pid.stem.isdigit():
351                                logger.debug(f"Found running test at PID: {pid.stem}")
352                                self.stack.append(int(pid.stem))
353                                self.page = ProgressType()
354
355                # Check for new key inputs
356                if (packet := self.lcd_monitor.read(blocking=False)) and packet.command == 0x80:
357                    self.key_event(packet)
358                self.page.refresh(self)
359
360                time.sleep(1 / self.FREQUENCY)  # Don't waste cycles when nothing happens 99% of the time
361            except Exception:  # pylint: disable=broad-exception-caught
362                logger.error(traceback.format_exc())
363                time.sleep(1)
364
365    def key_event(self, packet: RawPacket):
366        """Process key event."""
367        self.page.process_key(self, KeyCode(packet.data[0] if packet.data else 0))
368        self.page.draw(self)
369
370    def draw_image(self, filename: str):
371        """Draw an image to the LCD."""
372        raw_filename = filename.encode("latin1")  # Map bytes 1-to-1
373        enable_transparency = False  # pixel value 0 is transparent
374        invert_image_shade = False  # will invert transparency value also
375        x_start = 0
376        y_start = 0
377        packet = RawPacket(
378            command=0x28,
379            data_length=4 + len(raw_filename or "EE"),
380            data=bytes([3, enable_transparency | (invert_image_shade << 1), x_start, y_start, *raw_filename]),
381        )
382        packet.calculate_crc()
383        self.lcd_monitor.write(packet)
384
385    def write_text(self, row: int, column: int, text: str):
386        """Write text to the LCD."""
387        raw_text = text.encode("latin1")  # Map bytes 1-to-1
388        packet = RawPacket(
389            command=0x1F,
390            data_length=2 + len(raw_text or "E"),
391            data=bytes([column, row, *raw_text]),
392        )
393        packet.calculate_crc()
394        self.lcd_monitor.write(packet)
395
396    def clear_screen(self):
397        """Clear text from the screen."""
398        packet = RawPacket(command=0x06, data_length=0)
399        packet.calculate_crc()
400        self.lcd_monitor.write(packet)
401
402    def reset(self):
403        """Reset the LCD device."""
404        packet = RawPacket(command=0x05, data_length=3, data=bytes([8, 25, 48]))
405        packet.calculate_crc()
406        self.lcd_monitor.write(packet)
407        time.sleep(4)  # May not respond for up to 3 seconds
408
409    def set_led(self, colors: tuple[LEDColor, LEDColor, LEDColor, LEDColor]):
410        """Set the on-board LCD values."""
411        for led_id in range(4):
412            packet = RawPacket(
413                command=0x22, data_length=2, data=bytes([5 + led_id * 2, 100 * bool(colors[led_id] & 2)])
414            )
415            packet.calculate_crc()
416            self.lcd_monitor.write(packet)
417            packet = RawPacket(
418                command=0x22, data_length=2, data=bytes([6 + led_id * 2, 100 * bool(colors[led_id] & 1)])
419            )
420            packet.calculate_crc()
421            self.lcd_monitor.write(packet)
422
423    def save_state(self):
424        """Save state as boot state"""
425        packet = RawPacket(command=0x4, data_length=0)
426        packet.calculate_crc()
427        self.lcd_monitor.write(packet)
428
429
430class PageType:
431    """An LCD page."""
432
433    title = ""
434
435    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
436        """Perform some action based on the key."""
437
438    def draw(self, lcd_manager: UIHandler):
439        """Draw to screen."""
440
441    def refresh(self, lcd_manager: UIHandler):
442        """Update internal values. Called continuously."""
443
444
445class ProfileType(PageType):
446    """Test selection menu"""
447
448    title = "Profile Selection"
449
450    def __init__(self):
451        profile_paths = (profile_path for profile_path in Path("/var/www/logs/profile/").glob("*.json"))
452        profile_paths = sorted(profile_paths, key=lambda path: path.stat().st_mtime)
453        self.items = {path.stem: path for path in profile_paths}
454        self.index = 0
455        try:
456            self.version = Path("/var/www/.version").read_text(encoding="utf-8").strip("\r\n")
457        except Exception:  # pylint: disable=broad-exception-caught
458            logger.error(traceback.format_exc())
459            self.version = "1.0.1"
460
461    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
462        """Handle profile selection UI."""
463        if key is KeyCode.KEY_RIGHT_PRESS:
464            self.index = (self.index + 1) % len(self.items)
465        elif key is KeyCode.KEY_LEFT_PRESS:
466            self.index = (self.index - 1) % len(self.items)
467        elif key is KeyCode.KEY_ENTER_PRESS:
468            selected_profile = list(self.items.values())[self.index]
469            command = f"/opt/hitl/test fingerprint.plan -c -v -D PROFILE={selected_profile}"
470            lcd_manager.stack.append(
471                subprocess.Popen(  # pylint: disable=consider-using-with
472                    shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True
473                ).pid
474            )
475            lcd_manager.page = ProgressType()
476        elif key is KeyCode.KEY_EXIT_PRESS:
477            lcd_manager.page = TestMenuType()
478
479    def draw(self, lcd_manager: UIHandler):
480        """Draw item to screen."""
481        max_text_width = WIDTH - 2
482        manufacturer_name, _, serial_id = list(self.items)[self.index].partition("_6T_")
483        manufacturer_name = manufacturer_name[:WIDTH]
484        item_name = serial_id[:max_text_width]
485
486        lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}")
487        lcd_manager.write_text(1, 0, f"{manufacturer_name:<20}")
488        lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>")
489        lcd_manager.write_text(3, 0, f"v{self.version:<19}")
490
491
492class TestMenuType(PageType):
493    """Test selection menu"""
494
495    title = "Test Selection"
496
497    def __init__(self):
498        self.items = {
499            "Generate profile": self.generate_profile,
500            "Compare profiles": self.compare_profiles,
501        }
502        self.index = 0
503        try:
504            self.version = Path("/var/www/.version").read_text(encoding="utf-8").strip("\r\n")
505        except Exception:  # pylint: disable=broad-exception-caught
506            logger.error(traceback.format_exc())
507            self.version = "1.0.1"
508
509    def generate_profile(self, lcd_manager: UIHandler):
510        """Run profile generation test and go to progress page."""
511        command = "/opt/hitl/test fingerprint.plan -c -v"
512        lcd_manager.stack.append(
513            subprocess.Popen(  # pylint: disable=consider-using-with
514                shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True
515            ).pid
516        )
517        lcd_manager.page = ProgressType()
518
519    def compare_profiles(self, lcd_manager: UIHandler):
520        """Go to profile selection page."""
521        lcd_manager.page = ProfileType()
522
523    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
524        """Handle profile selection UI."""
525        if key is KeyCode.KEY_RIGHT_PRESS:
526            self.index = (self.index + 1) % len(self.items)
527        elif key is KeyCode.KEY_LEFT_PRESS:
528            self.index = (self.index - 1) % len(self.items)
529        elif key is KeyCode.KEY_ENTER_PRESS:
530            self.items[list(self.items)[self.index]](lcd_manager)
531        elif key is KeyCode.KEY_UP_PRESS:
532            lcd_manager.page = LogoType()
533
534    def draw(self, lcd_manager: UIHandler):
535        """Draw item to screen."""
536        max_text_width = WIDTH - 2
537        item_name = list(self.items)[self.index][:max_text_width]
538
539        lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}")
540        lcd_manager.write_text(1, 0, " " * WIDTH)
541        lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>")
542        lcd_manager.write_text(3, 0, f"v{self.version:<19}")
543
544        lcd_manager.set_led((LEDColor.RED, LEDColor.OFF, LEDColor.OFF, LEDColor.OFF))
545
546
547class ProgressType(PageType):
548    """Test progress."""
549
550    SCROLL_RATE = 2  # Frequency of screen text scroll
551
552    def __init__(self) -> None:
553        self.data: dict[str, str | float] = {}
554        self.text_offset = 0
555        self.last_scroll_time = 0.0
556
557    def _pid_is_alive(self, pid: int):
558        """Check if PID exists."""
559        try:
560            os.kill(pid, 0)
561        except OSError as error:
562            match (error.errno):
563                case errno.ESRCH:  # ESRCH == No such process
564                    return False
565                case errno.EPERM:  # EPERM clearly means there's a process to deny access to
566                    return True
567                case _:
568                    raise
569        else:
570            return True
571
572    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
573        if key is KeyCode.KEY_EXIT_PRESS:
574            lcd_manager.stack.pop()
575            command = "/opt/hitl/kill"
576            subprocess.Popen(  # pylint: disable=consider-using-with
577                shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True
578            )
579            lcd_manager.page = TestMenuType()
580
581    def draw(self, lcd_manager: UIHandler):
582        """Draw item to screen."""
583        if isinstance(lcd_manager.stack[-1], int) and self._pid_is_alive(lcd_manager.stack[-1]):
584            if self.data:
585                # Set LED color
586                lcd_manager.set_led(
587                    (
588                        LEDColor.RED if self.data["name"] == "Test Completed" else LEDColor.GREEN,
589                        LEDColor.OFF,
590                        LEDColor.OFF,
591                        LEDColor.OFF,
592                    )
593                )
594                # Apply a marquee effect to the test title text
595                name = str(self.data["name"]) + "     "
596                name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20]
597                ete_minutes = int(self.data["ete"])
598                ete = f"{self.data.get('mode', '???????')}, ETR: {ete_minutes} min"
599                pass_fail = f"{self.data['passed']:>2} Passed,{self.data['failed']:>2} Failed"
600                prog_bar = f"\xFA{'=' * int(18 * self.data['progress']):-<18}\xFC"
601                lcd_manager.write_text(0, 0, f"{name:<20}")
602                lcd_manager.write_text(1, 0, f"{ete:<20}")
603                lcd_manager.write_text(2, 0, f"{pass_fail:^20}")
604                lcd_manager.write_text(3, 0, prog_bar)
605            else:
606                lcd_manager.clear_screen()
607                lcd_manager.write_text(2, 0, f"{'Preparing...':^20}")
608        else:
609            lcd_manager.stack.pop()
610            lcd_manager.page = TestMenuType()
611
612    def refresh(self, lcd_manager: UIHandler):
613        """Output test progress."""
614        old_data = self.data
615        with suppress(queue.Empty, KeyError):
616            self.data = lcd_manager.queue.get_nowait()
617            self.data["name"] = str(self.data["name"]).partition(" - ")[0].removeprefix("Test")
618
619        if self.data:
620            name_length = len(str(self.data.get("name", "")))
621            if old_data.get("name") != self.data["name"]:  # New test
622                self.text_offset = -1  # Reset offset when new title is added
623                self.draw(lcd_manager)
624            elif name_length > 20 and time.perf_counter() - self.last_scroll_time > 1 / self.SCROLL_RATE:  # Scroll name
625                self.last_scroll_time = time.perf_counter()
626                self.text_offset = (self.text_offset + 1) % name_length
627                name = str(self.data["name"]) + "     "
628                name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20]
629                lcd_manager.write_text(0, 0, f"{name:<20}")
630            elif old_data != self.data:  # Don't draw when not needed
631                self.draw(lcd_manager)
632
633
634class LogoType(PageType):
635    """Display logo."""
636
637    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
638        """Handle profile selection UI."""
639        if key <= KeyCode.KEY_EXIT_PRESS:  # Any key press
640            lcd_manager.save_state()
641            lcd_manager.page = TestMenuType()
642
643    def draw(self, lcd_manager: UIHandler):
644        """Draw image to screen."""
645        lcd_manager.draw_image("TAFLogo.bmp")
646
647
648def test_main():
649    """Set up logging and run manager."""
650    logger.setLevel(logging.DEBUG)
651    formatter = logging.Formatter(
652        "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p"
653    )
654
655    # Set up console logging
656    console_logger = logging.StreamHandler()
657    console_logger.setLevel(logging.DEBUG)
658    console_logger.setFormatter(formatter)
659    console_logger.terminator = "\r\n"
660    logger.addHandler(console_logger)
661
662    # Set up file logging
663    LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
664    file_logger = logging.FileHandler(LOG_FILE)
665    file_logger.setLevel(logging.DEBUG)
666    file_logger.setFormatter(formatter)
667    logger.addHandler(file_logger)
668    logger.debug(f"Log Filename: {LOG_FILE}")
669
670    monitor = LCDMonitor()  # Begin monitoring for packets
671    screen = UIHandler(monitor)  # Manage LCD UI
672    screen.run_forever()
673
674
675if __name__ == "__main__":
676    signal.signal(signal.SIGINT, signal_handler)
677    test_main()
WIDTH = 20
HEIGHT = 4
PORT = 51152
AUTH_KEY = b'HITL'
REPORT_NAME = PosixPath('20241117T002505.148761_fv-az1714-684_lcd_manager')
LOG_FOLDER = PosixPath('/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/site-packages/hitl_tester/modules/logs/20241117T002505.148761_fv-az1714-684_lcd_manager')
LOG_FILE = PosixPath('/opt/hostedtoolcache/Python/3.12.7/x64/lib/python3.12/site-packages/hitl_tester/modules/logs/20241117T002505.148761_fv-az1714-684_lcd_manager/20241117T002505.txt')
logger = <Logger HITL-LCD (WARNING)>
def signal_handler(_sig, _frame):
70def signal_handler(_sig, _frame):
71    """Exit gracefully."""
72    logger.info("Shutting down...")
73    sys.exit(0)

Exit gracefully.

@dataclass
class Crystalfontz:
76@dataclass
77class Crystalfontz:
78    """Hardware info for Crystalfontz LCD."""
79
80    models: ClassVar[list[Self]] = []
81    pid: int
82    vid: int = 8763
83    baud: int = 115200
84
85    def __post_init__(self):
86        self.models.append(self)

Hardware info for Crystalfontz LCD.

Crystalfontz(pid: int, vid: int = 8763, baud: int = 115200)
models: ClassVar[list[Self]] = [Crystalfontz(pid=5, vid=8763, baud=115200)]
pid: int
vid: int = 8763
baud: int = 115200
class LCDPort:
 89class LCDPort:
 90    """Manager for serial port."""
 91
 92    cfa835 = Crystalfontz(pid=5)
 93    # cfa635 = Crystalfontz(pid=11)
 94
 95    def __init__(self) -> None:
 96        self.index = 0
 97        self.buffer: list[int] = []
 98
 99        # Get serial port
100        for comport, model in itertools.product(serial.tools.list_ports.comports(), Crystalfontz.models):
101            if comport.vid == model.vid and comport.pid == model.pid:
102                logger.debug(f"Found LCD VID: {comport.vid}, PID: {comport.pid}")
103                break
104        else:
105            raise RuntimeError(f"LCD could not be found: {', '.join(map(repr, Crystalfontz.models))}")
106        self.serial = serial.Serial(port=comport.device, baudrate=model.baud, timeout=0)
107
108    def write(self, data: bytes, timeout: float | None = None):
109        """Write data to the serial port."""
110        self.serial.write_timeout = timeout  # 0 seems to block?
111        logger.debug(f"Writing bytes: {data!r}")
112        with suppress(serial.serialutil.SerialTimeoutException):
113            self.serial.write(data)
114            self.serial.flush()
115
116    def consume(self, timeout: float = 0.1):
117        """Fetch the current byte and advance."""
118        start_time = time.perf_counter()
119        while self.serial.in_waiting > 0 or self.index >= len(self.buffer):
120            if self.serial.in_waiting > 0:
121                try:
122                    self.buffer += list(self.serial.read(self.serial.in_waiting))
123                except serial.serialutil.SerialException as e:
124                    logger.debug(f"Got error: {e}")
125                    time.sleep(1)
126                start_time = time.perf_counter()
127            if timeout and time.perf_counter() - start_time > timeout:
128                raise TimeoutError(f"No data after {timeout} seconds.")
129        result = self.buffer[self.index]
130        self.index += 1
131        return result
132
133    def clear_history(self):
134        """Discard data before the current index."""
135        self.buffer = self.buffer[self.index :]
136        self.index = 0

Manager for serial port.

cfa835 = Crystalfontz(pid=5, vid=8763, baud=115200)
index
buffer: list[int]
serial
def write(self, data: bytes, timeout: float | None = None):
108    def write(self, data: bytes, timeout: float | None = None):
109        """Write data to the serial port."""
110        self.serial.write_timeout = timeout  # 0 seems to block?
111        logger.debug(f"Writing bytes: {data!r}")
112        with suppress(serial.serialutil.SerialTimeoutException):
113            self.serial.write(data)
114            self.serial.flush()

Write data to the serial port.

def consume(self, timeout: float = 0.1):
116    def consume(self, timeout: float = 0.1):
117        """Fetch the current byte and advance."""
118        start_time = time.perf_counter()
119        while self.serial.in_waiting > 0 or self.index >= len(self.buffer):
120            if self.serial.in_waiting > 0:
121                try:
122                    self.buffer += list(self.serial.read(self.serial.in_waiting))
123                except serial.serialutil.SerialException as e:
124                    logger.debug(f"Got error: {e}")
125                    time.sleep(1)
126                start_time = time.perf_counter()
127            if timeout and time.perf_counter() - start_time > timeout:
128                raise TimeoutError(f"No data after {timeout} seconds.")
129        result = self.buffer[self.index]
130        self.index += 1
131        return result

Fetch the current byte and advance.

def clear_history(self):
133    def clear_history(self):
134        """Discard data before the current index."""
135        self.buffer = self.buffer[self.index :]
136        self.index = 0

Discard data before the current index.

@dataclass
class RawPacket:
139@dataclass
140class RawPacket:
141    """A raw LCD packet."""
142
143    command: int = 0
144    data_length: int = 0
145    data: bytes = b""
146    crc: int = 0
147
148    def to_bytes(self) -> bytes:
149        """Convert everything but the CRC to bytes."""
150        return (
151            bytes([self.command & 0xFF, self.data_length & 0xFF]) + self.data + self.crc.to_bytes(2, byteorder="little")
152        )
153
154    def calculate_crc(self):
155        """Fill in CRC with the correct value."""
156        crc = 0xFFFF  # Preset to all 1's, prevent loss of leading zeros
157        for byte in self.to_bytes()[:-2]:  # Strip current CRC bytes
158            for _ in range(8):
159                if (crc ^ byte) & 0x01:
160                    crc >>= 1
161                    crc ^= 0x8408
162                else:
163                    crc >>= 1
164                byte >>= 1
165        self.crc = (~crc) & 0xFFFF

A raw LCD packet.

RawPacket( command: int = 0, data_length: int = 0, data: bytes = b'', crc: int = 0)
command: int = 0
data_length: int = 0
data: bytes = b''
crc: int = 0
def to_bytes(self) -> bytes:
148    def to_bytes(self) -> bytes:
149        """Convert everything but the CRC to bytes."""
150        return (
151            bytes([self.command & 0xFF, self.data_length & 0xFF]) + self.data + self.crc.to_bytes(2, byteorder="little")
152        )

Convert everything but the CRC to bytes.

def calculate_crc(self):
154    def calculate_crc(self):
155        """Fill in CRC with the correct value."""
156        crc = 0xFFFF  # Preset to all 1's, prevent loss of leading zeros
157        for byte in self.to_bytes()[:-2]:  # Strip current CRC bytes
158            for _ in range(8):
159                if (crc ^ byte) & 0x01:
160                    crc >>= 1
161                    crc ^= 0x8408
162                else:
163                    crc >>= 1
164                byte >>= 1
165        self.crc = (~crc) & 0xFFFF

Fill in CRC with the correct value.

class LCDMonitor:
168class LCDMonitor:
169    """Manage LCD events."""
170
171    # Supported commands: hw info (0x1), brightness (0x0E, 0x0D), cursor (0x0C), error (0xC0), leds (0x22)
172
173    MAX_ATTEMPTS = 2
174
175    class ResponseError(IntEnum):
176        """Possible LCD error responses."""
177
178        UNKNOWN_ERROR = 1
179        UNKNOWN_COMMAND = 2
180        INVALID_COMMAND_LENGTH_OPTIONS = 3
181        WRITING_FLASH_MEM_FAILED = 4
182        READING_FLASH_MEM_FAILED = 5
183        CFA_FBSCAB_NOT_PRESENT_AT_INDEX = 6
184        CFA_FBSCAB_DID_NOT_REPLY_TO_REG = 7
185        MICRO_SD_NOT_INSERTED_OR_BAD = 8
186        MICRO_SD_NOT_FORMATTED = 9
187        MICRO_SD_FILE_COULD_NOT_BE_FOUND_OPENED = 10
188        MICRO_SD_UNKNOWN_ERROR = 11
189        MICRO_SD_FILE_COULD_NOT_BE_READ = 12
190        MICRO_SD_COULD_NOT_BE_WRITTEN = 13
191        FILE_HEADER_IS_INVALID = 14
192        MICRO_SD_FILE_IS_ALREADY_OPEN = 15
193        MICRO_SD_FILE_OPERATION_FAILED = 16
194        MICRO_SD_FILE_HAS_NOT_BEEN_OPENED = 17
195        GFX_STREAM_ALREADY_STARTED = 18
196        GFX_IS_OUT_OF_LCD_BOUNDS = 19
197        VIDEO_IS_NOT_OPEN_IN_SLOT = 20
198        GFX_STREAM_HAS_TIMED_OUT = 21
199        GPIO_NOT_SET_FOR_ATX_USE = 22
200        INTERFACE_NOT_ENABLED = 23
201        INTERFACE_NOT_AVAILABLE_ = 24
202
203    def __init__(self, packet_buffer_size: int = 4096):
204        self.dropped_packets = 0
205        self.packet_deque: deque[RawPacket] = deque(maxlen=packet_buffer_size)
206
207        # Start serial monitor thread
208        self.serial_port = LCDPort()
209
210        # Create log file
211        with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile:
212            csv.writer(csvfile).writerow(asdict(RawPacket()))
213
214        # Start monitor thread
215        self.daemon = threading.Thread(target=self.runtime, daemon=True)
216        self.daemon.start()
217
218    def runtime(self):
219        """Fetch packets as quickly as possible."""
220
221        def fetch_packet() -> RawPacket:
222            """Decode serial stream to a valid packet."""
223            while True:
224                # Fetch raw packet
225                try:
226                    raw_packet = RawPacket(
227                        command=self.serial_port.consume(timeout=0),
228                        data_length=(length := self.serial_port.consume()),
229                        data=bytes(self.serial_port.consume() for _ in range(length)),
230                        crc=self.serial_port.consume() | (self.serial_port.consume() << 8),
231                    )
232                except TimeoutError:
233                    logger.warning(
234                        "Timeout when attempting to read packet. Possibly an invalid packet cause by dropped bytes."
235                    )
236                else:
237                    # Validate CRC
238                    logger.debug(f"Packet?: {asdict(raw_packet)}")
239                    valid_packet = replace(raw_packet)  # Calculate the expected CRC
240                    valid_packet.calculate_crc()
241                    if raw_packet.crc == valid_packet.crc:
242                        logger.debug("got lcd packet")
243                        self.serial_port.clear_history()
244                        return raw_packet
245                    logger.debug(f"Invalid CRC 0x{raw_packet.crc:04X}, expected 0x{valid_packet.crc:04X}.")
246                logger.debug(f"LCD Buffer: {self.serial_port.buffer}")
247                logger.debug(f"Discarding lcd byte: {self.serial_port.buffer[0]:02X}")
248                self.serial_port.index = 1
249                self.serial_port.clear_history()
250
251        while True:
252            packet = fetch_packet()
253            if len(self.packet_deque) == self.packet_deque.maxlen:  # Deque is full
254                self.dropped_packets += 1
255            self.packet_deque.append(packet)  # Save to deque
256            logger.debug(f"{len(self.packet_deque)} packet(s) in queue: {', '.join(map(str, self.packet_deque))}")
257
258            # Save to file
259            with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile:
260                csv.writer(csvfile).writerow(asdict(packet).values())
261
262            time.sleep(0.1)  # Packets only come in when a
263
264    def read(self, blocking: bool = True, timeout: float = 0) -> RawPacket | None:
265        """Get keys or acknowledgment."""
266        start_time = time.perf_counter()
267        while not timeout or time.perf_counter() - start_time < timeout:
268            with suppress(IndexError):
269                return self.packet_deque.popleft()
270            if not blocking:
271                return None
272            time.sleep(0.1)  # Packets only come in on user input
273        raise TimeoutError(f"A packet was not received after {timeout:.1f} seconds.")
274
275    def write(self, command_packet: RawPacket, timeout: float = 2):
276        """Send command to LCD"""
277        for attempt_number in range(self.MAX_ATTEMPTS):
278            self.serial_port.write(command_packet.to_bytes(), timeout)
279            with suppress(TimeoutError):
280                response = self.read(timeout=timeout)
281                if response and (0x40 | command_packet.command) == response.command:
282                    break
283                if response and (response.command & 0xC0) == 0xC0:
284                    error_enum = self.ResponseError(response.command & ((~0xC0) & 0xFF))
285                    error_message = error_enum.name.capitalize().replace("_", " ")
286                    logger.warning(f"Received error response: {error_message}.")
287                    break
288            logger.error(f"LCD response not received or invalid for attempt {attempt_number + 1}.")
289            time.sleep(0.1)

Manage LCD events.

LCDMonitor(packet_buffer_size: int = 4096)
203    def __init__(self, packet_buffer_size: int = 4096):
204        self.dropped_packets = 0
205        self.packet_deque: deque[RawPacket] = deque(maxlen=packet_buffer_size)
206
207        # Start serial monitor thread
208        self.serial_port = LCDPort()
209
210        # Create log file
211        with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile:
212            csv.writer(csvfile).writerow(asdict(RawPacket()))
213
214        # Start monitor thread
215        self.daemon = threading.Thread(target=self.runtime, daemon=True)
216        self.daemon.start()
MAX_ATTEMPTS = 2
dropped_packets
packet_deque: collections.deque[RawPacket]
serial_port
daemon
def runtime(self):
218    def runtime(self):
219        """Fetch packets as quickly as possible."""
220
221        def fetch_packet() -> RawPacket:
222            """Decode serial stream to a valid packet."""
223            while True:
224                # Fetch raw packet
225                try:
226                    raw_packet = RawPacket(
227                        command=self.serial_port.consume(timeout=0),
228                        data_length=(length := self.serial_port.consume()),
229                        data=bytes(self.serial_port.consume() for _ in range(length)),
230                        crc=self.serial_port.consume() | (self.serial_port.consume() << 8),
231                    )
232                except TimeoutError:
233                    logger.warning(
234                        "Timeout when attempting to read packet. Possibly an invalid packet cause by dropped bytes."
235                    )
236                else:
237                    # Validate CRC
238                    logger.debug(f"Packet?: {asdict(raw_packet)}")
239                    valid_packet = replace(raw_packet)  # Calculate the expected CRC
240                    valid_packet.calculate_crc()
241                    if raw_packet.crc == valid_packet.crc:
242                        logger.debug("got lcd packet")
243                        self.serial_port.clear_history()
244                        return raw_packet
245                    logger.debug(f"Invalid CRC 0x{raw_packet.crc:04X}, expected 0x{valid_packet.crc:04X}.")
246                logger.debug(f"LCD Buffer: {self.serial_port.buffer}")
247                logger.debug(f"Discarding lcd byte: {self.serial_port.buffer[0]:02X}")
248                self.serial_port.index = 1
249                self.serial_port.clear_history()
250
251        while True:
252            packet = fetch_packet()
253            if len(self.packet_deque) == self.packet_deque.maxlen:  # Deque is full
254                self.dropped_packets += 1
255            self.packet_deque.append(packet)  # Save to deque
256            logger.debug(f"{len(self.packet_deque)} packet(s) in queue: {', '.join(map(str, self.packet_deque))}")
257
258            # Save to file
259            with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile:
260                csv.writer(csvfile).writerow(asdict(packet).values())
261
262            time.sleep(0.1)  # Packets only come in when a

Fetch packets as quickly as possible.

def read( self, blocking: bool = True, timeout: float = 0) -> RawPacket | None:
264    def read(self, blocking: bool = True, timeout: float = 0) -> RawPacket | None:
265        """Get keys or acknowledgment."""
266        start_time = time.perf_counter()
267        while not timeout or time.perf_counter() - start_time < timeout:
268            with suppress(IndexError):
269                return self.packet_deque.popleft()
270            if not blocking:
271                return None
272            time.sleep(0.1)  # Packets only come in on user input
273        raise TimeoutError(f"A packet was not received after {timeout:.1f} seconds.")

Get keys or acknowledgment.

def write( self, command_packet: RawPacket, timeout: float = 2):
275    def write(self, command_packet: RawPacket, timeout: float = 2):
276        """Send command to LCD"""
277        for attempt_number in range(self.MAX_ATTEMPTS):
278            self.serial_port.write(command_packet.to_bytes(), timeout)
279            with suppress(TimeoutError):
280                response = self.read(timeout=timeout)
281                if response and (0x40 | command_packet.command) == response.command:
282                    break
283                if response and (response.command & 0xC0) == 0xC0:
284                    error_enum = self.ResponseError(response.command & ((~0xC0) & 0xFF))
285                    error_message = error_enum.name.capitalize().replace("_", " ")
286                    logger.warning(f"Received error response: {error_message}.")
287                    break
288            logger.error(f"LCD response not received or invalid for attempt {attempt_number + 1}.")
289            time.sleep(0.1)

Send command to LCD

class LCDMonitor.ResponseError(enum.IntEnum):
175    class ResponseError(IntEnum):
176        """Possible LCD error responses."""
177
178        UNKNOWN_ERROR = 1
179        UNKNOWN_COMMAND = 2
180        INVALID_COMMAND_LENGTH_OPTIONS = 3
181        WRITING_FLASH_MEM_FAILED = 4
182        READING_FLASH_MEM_FAILED = 5
183        CFA_FBSCAB_NOT_PRESENT_AT_INDEX = 6
184        CFA_FBSCAB_DID_NOT_REPLY_TO_REG = 7
185        MICRO_SD_NOT_INSERTED_OR_BAD = 8
186        MICRO_SD_NOT_FORMATTED = 9
187        MICRO_SD_FILE_COULD_NOT_BE_FOUND_OPENED = 10
188        MICRO_SD_UNKNOWN_ERROR = 11
189        MICRO_SD_FILE_COULD_NOT_BE_READ = 12
190        MICRO_SD_COULD_NOT_BE_WRITTEN = 13
191        FILE_HEADER_IS_INVALID = 14
192        MICRO_SD_FILE_IS_ALREADY_OPEN = 15
193        MICRO_SD_FILE_OPERATION_FAILED = 16
194        MICRO_SD_FILE_HAS_NOT_BEEN_OPENED = 17
195        GFX_STREAM_ALREADY_STARTED = 18
196        GFX_IS_OUT_OF_LCD_BOUNDS = 19
197        VIDEO_IS_NOT_OPEN_IN_SLOT = 20
198        GFX_STREAM_HAS_TIMED_OUT = 21
199        GPIO_NOT_SET_FOR_ATX_USE = 22
200        INTERFACE_NOT_ENABLED = 23
201        INTERFACE_NOT_AVAILABLE_ = 24

Possible LCD error responses.

UNKNOWN_ERROR = <ResponseError.UNKNOWN_ERROR: 1>
UNKNOWN_COMMAND = <ResponseError.UNKNOWN_COMMAND: 2>
INVALID_COMMAND_LENGTH_OPTIONS = <ResponseError.INVALID_COMMAND_LENGTH_OPTIONS: 3>
WRITING_FLASH_MEM_FAILED = <ResponseError.WRITING_FLASH_MEM_FAILED: 4>
READING_FLASH_MEM_FAILED = <ResponseError.READING_FLASH_MEM_FAILED: 5>
CFA_FBSCAB_NOT_PRESENT_AT_INDEX = <ResponseError.CFA_FBSCAB_NOT_PRESENT_AT_INDEX: 6>
CFA_FBSCAB_DID_NOT_REPLY_TO_REG = <ResponseError.CFA_FBSCAB_DID_NOT_REPLY_TO_REG: 7>
MICRO_SD_NOT_INSERTED_OR_BAD = <ResponseError.MICRO_SD_NOT_INSERTED_OR_BAD: 8>
MICRO_SD_NOT_FORMATTED = <ResponseError.MICRO_SD_NOT_FORMATTED: 9>
MICRO_SD_FILE_COULD_NOT_BE_FOUND_OPENED = <ResponseError.MICRO_SD_FILE_COULD_NOT_BE_FOUND_OPENED: 10>
MICRO_SD_UNKNOWN_ERROR = <ResponseError.MICRO_SD_UNKNOWN_ERROR: 11>
MICRO_SD_FILE_COULD_NOT_BE_READ = <ResponseError.MICRO_SD_FILE_COULD_NOT_BE_READ: 12>
MICRO_SD_COULD_NOT_BE_WRITTEN = <ResponseError.MICRO_SD_COULD_NOT_BE_WRITTEN: 13>
FILE_HEADER_IS_INVALID = <ResponseError.FILE_HEADER_IS_INVALID: 14>
MICRO_SD_FILE_IS_ALREADY_OPEN = <ResponseError.MICRO_SD_FILE_IS_ALREADY_OPEN: 15>
MICRO_SD_FILE_OPERATION_FAILED = <ResponseError.MICRO_SD_FILE_OPERATION_FAILED: 16>
MICRO_SD_FILE_HAS_NOT_BEEN_OPENED = <ResponseError.MICRO_SD_FILE_HAS_NOT_BEEN_OPENED: 17>
GFX_STREAM_ALREADY_STARTED = <ResponseError.GFX_STREAM_ALREADY_STARTED: 18>
GFX_IS_OUT_OF_LCD_BOUNDS = <ResponseError.GFX_IS_OUT_OF_LCD_BOUNDS: 19>
VIDEO_IS_NOT_OPEN_IN_SLOT = <ResponseError.VIDEO_IS_NOT_OPEN_IN_SLOT: 20>
GFX_STREAM_HAS_TIMED_OUT = <ResponseError.GFX_STREAM_HAS_TIMED_OUT: 21>
GPIO_NOT_SET_FOR_ATX_USE = <ResponseError.GPIO_NOT_SET_FOR_ATX_USE: 22>
INTERFACE_NOT_ENABLED = <ResponseError.INTERFACE_NOT_ENABLED: 23>
INTERFACE_NOT_AVAILABLE_ = <ResponseError.INTERFACE_NOT_AVAILABLE_: 24>
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
is_integer
real
imag
numerator
denominator
class KeyCode(enum.IntEnum):
292class KeyCode(IntEnum):
293    """The key pressed on the keypad."""
294
295    KEY_UP_PRESS = 1
296    KEY_DOWN_PRESS = 2
297    KEY_LEFT_PRESS = 3
298    KEY_RIGHT_PRESS = 4
299    KEY_ENTER_PRESS = 5
300    KEY_EXIT_PRESS = 6
301    KEY_UP_RELEASE = 7
302    KEY_DOWN_RELEASE = 8
303    KEY_LEFT_RELEASE = 9
304    KEY_RIGHT_RELEASE = 10
305    KEY_ENTER_RELEASE = 11
306    KEY_EXIT_RELEASE = 12

The key pressed on the keypad.

KEY_UP_PRESS = <KeyCode.KEY_UP_PRESS: 1>
KEY_DOWN_PRESS = <KeyCode.KEY_DOWN_PRESS: 2>
KEY_LEFT_PRESS = <KeyCode.KEY_LEFT_PRESS: 3>
KEY_RIGHT_PRESS = <KeyCode.KEY_RIGHT_PRESS: 4>
KEY_ENTER_PRESS = <KeyCode.KEY_ENTER_PRESS: 5>
KEY_EXIT_PRESS = <KeyCode.KEY_EXIT_PRESS: 6>
KEY_UP_RELEASE = <KeyCode.KEY_UP_RELEASE: 7>
KEY_DOWN_RELEASE = <KeyCode.KEY_DOWN_RELEASE: 8>
KEY_LEFT_RELEASE = <KeyCode.KEY_LEFT_RELEASE: 9>
KEY_RIGHT_RELEASE = <KeyCode.KEY_RIGHT_RELEASE: 10>
KEY_ENTER_RELEASE = <KeyCode.KEY_ENTER_RELEASE: 11>
KEY_EXIT_RELEASE = <KeyCode.KEY_EXIT_RELEASE: 12>
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
is_integer
real
imag
numerator
denominator
class LEDColor(enum.IntEnum):
309class LEDColor(IntEnum):
310    """The LED color."""
311
312    OFF = 0
313    RED = 1
314    GREEN = 2
315    YELLOW = 3

The LED color.

OFF = <LEDColor.OFF: 0>
RED = <LEDColor.RED: 1>
GREEN = <LEDColor.GREEN: 2>
YELLOW = <LEDColor.YELLOW: 3>
Inherited Members
enum.Enum
name
value
builtins.int
conjugate
bit_length
bit_count
to_bytes
from_bytes
as_integer_ratio
is_integer
real
imag
numerator
denominator
class UIHandler:
318class UIHandler:
319    """Manage LCD UI."""
320
321    FREQUENCY = 20  # How often to check for input / refresh screen
322
323    def __init__(self, lcd_monitor: LCDMonitor):
324        self.lcd_monitor = lcd_monitor
325        self.page: PageType = TestMenuType()
326        self.stack: list[Any] = []  # Allows pages to communicate
327        # self.reset()
328        self.clear_screen()
329        self.page.draw(self)
330
331        # Progress queue for communicating with the test
332        class QueueManager(BaseManager):
333            """Manager for multiprocess queue."""
334
335        self._backing_queue: Queue[dict[str, str | float]] = Queue()
336        QueueManager.register("get_queue", callable=lambda: self._backing_queue)
337        self._queue_manager = QueueManager(address=("", PORT), authkey=AUTH_KEY)
338        self._queue_manager.start()  # pylint: disable=consider-using-with
339        self.queue: Queue[dict[str, str | float]] = self._queue_manager.get_queue()  # type: ignore[attr-defined]  # pylint: disable=no-member
340
341    def run_forever(self):
342        """Process LCD events forever."""
343        while True:
344            try:
345                # Check if a test is running and show progress if so
346                if not isinstance(self.page, ProgressType):
347                    current_name = Path("/proc/self/comm").read_text(encoding="ascii")
348                    for pid in Path("/proc").glob("[0-9]*"):
349                        with suppress(FileNotFoundError):
350                            process_name = (pid / "comm").read_text(encoding="ascii")
351                            if process_name != current_name and process_name.startswith("HITL") and pid.stem.isdigit():
352                                logger.debug(f"Found running test at PID: {pid.stem}")
353                                self.stack.append(int(pid.stem))
354                                self.page = ProgressType()
355
356                # Check for new key inputs
357                if (packet := self.lcd_monitor.read(blocking=False)) and packet.command == 0x80:
358                    self.key_event(packet)
359                self.page.refresh(self)
360
361                time.sleep(1 / self.FREQUENCY)  # Don't waste cycles when nothing happens 99% of the time
362            except Exception:  # pylint: disable=broad-exception-caught
363                logger.error(traceback.format_exc())
364                time.sleep(1)
365
366    def key_event(self, packet: RawPacket):
367        """Process key event."""
368        self.page.process_key(self, KeyCode(packet.data[0] if packet.data else 0))
369        self.page.draw(self)
370
371    def draw_image(self, filename: str):
372        """Draw an image to the LCD."""
373        raw_filename = filename.encode("latin1")  # Map bytes 1-to-1
374        enable_transparency = False  # pixel value 0 is transparent
375        invert_image_shade = False  # will invert transparency value also
376        x_start = 0
377        y_start = 0
378        packet = RawPacket(
379            command=0x28,
380            data_length=4 + len(raw_filename or "EE"),
381            data=bytes([3, enable_transparency | (invert_image_shade << 1), x_start, y_start, *raw_filename]),
382        )
383        packet.calculate_crc()
384        self.lcd_monitor.write(packet)
385
386    def write_text(self, row: int, column: int, text: str):
387        """Write text to the LCD."""
388        raw_text = text.encode("latin1")  # Map bytes 1-to-1
389        packet = RawPacket(
390            command=0x1F,
391            data_length=2 + len(raw_text or "E"),
392            data=bytes([column, row, *raw_text]),
393        )
394        packet.calculate_crc()
395        self.lcd_monitor.write(packet)
396
397    def clear_screen(self):
398        """Clear text from the screen."""
399        packet = RawPacket(command=0x06, data_length=0)
400        packet.calculate_crc()
401        self.lcd_monitor.write(packet)
402
403    def reset(self):
404        """Reset the LCD device."""
405        packet = RawPacket(command=0x05, data_length=3, data=bytes([8, 25, 48]))
406        packet.calculate_crc()
407        self.lcd_monitor.write(packet)
408        time.sleep(4)  # May not respond for up to 3 seconds
409
410    def set_led(self, colors: tuple[LEDColor, LEDColor, LEDColor, LEDColor]):
411        """Set the on-board LCD values."""
412        for led_id in range(4):
413            packet = RawPacket(
414                command=0x22, data_length=2, data=bytes([5 + led_id * 2, 100 * bool(colors[led_id] & 2)])
415            )
416            packet.calculate_crc()
417            self.lcd_monitor.write(packet)
418            packet = RawPacket(
419                command=0x22, data_length=2, data=bytes([6 + led_id * 2, 100 * bool(colors[led_id] & 1)])
420            )
421            packet.calculate_crc()
422            self.lcd_monitor.write(packet)
423
424    def save_state(self):
425        """Save state as boot state"""
426        packet = RawPacket(command=0x4, data_length=0)
427        packet.calculate_crc()
428        self.lcd_monitor.write(packet)

Manage LCD UI.

UIHandler(lcd_monitor: LCDMonitor)
323    def __init__(self, lcd_monitor: LCDMonitor):
324        self.lcd_monitor = lcd_monitor
325        self.page: PageType = TestMenuType()
326        self.stack: list[Any] = []  # Allows pages to communicate
327        # self.reset()
328        self.clear_screen()
329        self.page.draw(self)
330
331        # Progress queue for communicating with the test
332        class QueueManager(BaseManager):
333            """Manager for multiprocess queue."""
334
335        self._backing_queue: Queue[dict[str, str | float]] = Queue()
336        QueueManager.register("get_queue", callable=lambda: self._backing_queue)
337        self._queue_manager = QueueManager(address=("", PORT), authkey=AUTH_KEY)
338        self._queue_manager.start()  # pylint: disable=consider-using-with
339        self.queue: Queue[dict[str, str | float]] = self._queue_manager.get_queue()  # type: ignore[attr-defined]  # pylint: disable=no-member
FREQUENCY = 20
lcd_monitor
page: PageType
stack: list[typing.Any]
queue: queue.Queue[dict[str, str | float]]
def run_forever(self):
341    def run_forever(self):
342        """Process LCD events forever."""
343        while True:
344            try:
345                # Check if a test is running and show progress if so
346                if not isinstance(self.page, ProgressType):
347                    current_name = Path("/proc/self/comm").read_text(encoding="ascii")
348                    for pid in Path("/proc").glob("[0-9]*"):
349                        with suppress(FileNotFoundError):
350                            process_name = (pid / "comm").read_text(encoding="ascii")
351                            if process_name != current_name and process_name.startswith("HITL") and pid.stem.isdigit():
352                                logger.debug(f"Found running test at PID: {pid.stem}")
353                                self.stack.append(int(pid.stem))
354                                self.page = ProgressType()
355
356                # Check for new key inputs
357                if (packet := self.lcd_monitor.read(blocking=False)) and packet.command == 0x80:
358                    self.key_event(packet)
359                self.page.refresh(self)
360
361                time.sleep(1 / self.FREQUENCY)  # Don't waste cycles when nothing happens 99% of the time
362            except Exception:  # pylint: disable=broad-exception-caught
363                logger.error(traceback.format_exc())
364                time.sleep(1)

Process LCD events forever.

def key_event(self, packet: RawPacket):
366    def key_event(self, packet: RawPacket):
367        """Process key event."""
368        self.page.process_key(self, KeyCode(packet.data[0] if packet.data else 0))
369        self.page.draw(self)

Process key event.

def draw_image(self, filename: str):
371    def draw_image(self, filename: str):
372        """Draw an image to the LCD."""
373        raw_filename = filename.encode("latin1")  # Map bytes 1-to-1
374        enable_transparency = False  # pixel value 0 is transparent
375        invert_image_shade = False  # will invert transparency value also
376        x_start = 0
377        y_start = 0
378        packet = RawPacket(
379            command=0x28,
380            data_length=4 + len(raw_filename or "EE"),
381            data=bytes([3, enable_transparency | (invert_image_shade << 1), x_start, y_start, *raw_filename]),
382        )
383        packet.calculate_crc()
384        self.lcd_monitor.write(packet)

Draw an image to the LCD.

def write_text(self, row: int, column: int, text: str):
386    def write_text(self, row: int, column: int, text: str):
387        """Write text to the LCD."""
388        raw_text = text.encode("latin1")  # Map bytes 1-to-1
389        packet = RawPacket(
390            command=0x1F,
391            data_length=2 + len(raw_text or "E"),
392            data=bytes([column, row, *raw_text]),
393        )
394        packet.calculate_crc()
395        self.lcd_monitor.write(packet)

Write text to the LCD.

def clear_screen(self):
397    def clear_screen(self):
398        """Clear text from the screen."""
399        packet = RawPacket(command=0x06, data_length=0)
400        packet.calculate_crc()
401        self.lcd_monitor.write(packet)

Clear text from the screen.

def reset(self):
403    def reset(self):
404        """Reset the LCD device."""
405        packet = RawPacket(command=0x05, data_length=3, data=bytes([8, 25, 48]))
406        packet.calculate_crc()
407        self.lcd_monitor.write(packet)
408        time.sleep(4)  # May not respond for up to 3 seconds

Reset the LCD device.

def set_led( self, colors: tuple[LEDColor, LEDColor, LEDColor, LEDColor]):
410    def set_led(self, colors: tuple[LEDColor, LEDColor, LEDColor, LEDColor]):
411        """Set the on-board LCD values."""
412        for led_id in range(4):
413            packet = RawPacket(
414                command=0x22, data_length=2, data=bytes([5 + led_id * 2, 100 * bool(colors[led_id] & 2)])
415            )
416            packet.calculate_crc()
417            self.lcd_monitor.write(packet)
418            packet = RawPacket(
419                command=0x22, data_length=2, data=bytes([6 + led_id * 2, 100 * bool(colors[led_id] & 1)])
420            )
421            packet.calculate_crc()
422            self.lcd_monitor.write(packet)

Set the on-board LCD values.

def save_state(self):
424    def save_state(self):
425        """Save state as boot state"""
426        packet = RawPacket(command=0x4, data_length=0)
427        packet.calculate_crc()
428        self.lcd_monitor.write(packet)

Save state as boot state

class PageType:
431class PageType:
432    """An LCD page."""
433
434    title = ""
435
436    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
437        """Perform some action based on the key."""
438
439    def draw(self, lcd_manager: UIHandler):
440        """Draw to screen."""
441
442    def refresh(self, lcd_manager: UIHandler):
443        """Update internal values. Called continuously."""

An LCD page.

title = ''
def process_key( self, lcd_manager: UIHandler, key: KeyCode):
436    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
437        """Perform some action based on the key."""

Perform some action based on the key.

def draw( self, lcd_manager: UIHandler):
439    def draw(self, lcd_manager: UIHandler):
440        """Draw to screen."""

Draw to screen.

def refresh( self, lcd_manager: UIHandler):
442    def refresh(self, lcd_manager: UIHandler):
443        """Update internal values. Called continuously."""

Update internal values. Called continuously.

class ProfileType(PageType):
446class ProfileType(PageType):
447    """Test selection menu"""
448
449    title = "Profile Selection"
450
451    def __init__(self):
452        profile_paths = (profile_path for profile_path in Path("/var/www/logs/profile/").glob("*.json"))
453        profile_paths = sorted(profile_paths, key=lambda path: path.stat().st_mtime)
454        self.items = {path.stem: path for path in profile_paths}
455        self.index = 0
456        try:
457            self.version = Path("/var/www/.version").read_text(encoding="utf-8").strip("\r\n")
458        except Exception:  # pylint: disable=broad-exception-caught
459            logger.error(traceback.format_exc())
460            self.version = "1.0.1"
461
462    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
463        """Handle profile selection UI."""
464        if key is KeyCode.KEY_RIGHT_PRESS:
465            self.index = (self.index + 1) % len(self.items)
466        elif key is KeyCode.KEY_LEFT_PRESS:
467            self.index = (self.index - 1) % len(self.items)
468        elif key is KeyCode.KEY_ENTER_PRESS:
469            selected_profile = list(self.items.values())[self.index]
470            command = f"/opt/hitl/test fingerprint.plan -c -v -D PROFILE={selected_profile}"
471            lcd_manager.stack.append(
472                subprocess.Popen(  # pylint: disable=consider-using-with
473                    shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True
474                ).pid
475            )
476            lcd_manager.page = ProgressType()
477        elif key is KeyCode.KEY_EXIT_PRESS:
478            lcd_manager.page = TestMenuType()
479
480    def draw(self, lcd_manager: UIHandler):
481        """Draw item to screen."""
482        max_text_width = WIDTH - 2
483        manufacturer_name, _, serial_id = list(self.items)[self.index].partition("_6T_")
484        manufacturer_name = manufacturer_name[:WIDTH]
485        item_name = serial_id[:max_text_width]
486
487        lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}")
488        lcd_manager.write_text(1, 0, f"{manufacturer_name:<20}")
489        lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>")
490        lcd_manager.write_text(3, 0, f"v{self.version:<19}")

Test selection menu

title = 'Profile Selection'
items
index
def process_key( self, lcd_manager: UIHandler, key: KeyCode):
462    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
463        """Handle profile selection UI."""
464        if key is KeyCode.KEY_RIGHT_PRESS:
465            self.index = (self.index + 1) % len(self.items)
466        elif key is KeyCode.KEY_LEFT_PRESS:
467            self.index = (self.index - 1) % len(self.items)
468        elif key is KeyCode.KEY_ENTER_PRESS:
469            selected_profile = list(self.items.values())[self.index]
470            command = f"/opt/hitl/test fingerprint.plan -c -v -D PROFILE={selected_profile}"
471            lcd_manager.stack.append(
472                subprocess.Popen(  # pylint: disable=consider-using-with
473                    shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True
474                ).pid
475            )
476            lcd_manager.page = ProgressType()
477        elif key is KeyCode.KEY_EXIT_PRESS:
478            lcd_manager.page = TestMenuType()

Handle profile selection UI.

def draw( self, lcd_manager: UIHandler):
480    def draw(self, lcd_manager: UIHandler):
481        """Draw item to screen."""
482        max_text_width = WIDTH - 2
483        manufacturer_name, _, serial_id = list(self.items)[self.index].partition("_6T_")
484        manufacturer_name = manufacturer_name[:WIDTH]
485        item_name = serial_id[:max_text_width]
486
487        lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}")
488        lcd_manager.write_text(1, 0, f"{manufacturer_name:<20}")
489        lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>")
490        lcd_manager.write_text(3, 0, f"v{self.version:<19}")

Draw item to screen.

Inherited Members
PageType
refresh
class TestMenuType(PageType):
493class TestMenuType(PageType):
494    """Test selection menu"""
495
496    title = "Test Selection"
497
498    def __init__(self):
499        self.items = {
500            "Generate profile": self.generate_profile,
501            "Compare profiles": self.compare_profiles,
502        }
503        self.index = 0
504        try:
505            self.version = Path("/var/www/.version").read_text(encoding="utf-8").strip("\r\n")
506        except Exception:  # pylint: disable=broad-exception-caught
507            logger.error(traceback.format_exc())
508            self.version = "1.0.1"
509
510    def generate_profile(self, lcd_manager: UIHandler):
511        """Run profile generation test and go to progress page."""
512        command = "/opt/hitl/test fingerprint.plan -c -v"
513        lcd_manager.stack.append(
514            subprocess.Popen(  # pylint: disable=consider-using-with
515                shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True
516            ).pid
517        )
518        lcd_manager.page = ProgressType()
519
520    def compare_profiles(self, lcd_manager: UIHandler):
521        """Go to profile selection page."""
522        lcd_manager.page = ProfileType()
523
524    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
525        """Handle profile selection UI."""
526        if key is KeyCode.KEY_RIGHT_PRESS:
527            self.index = (self.index + 1) % len(self.items)
528        elif key is KeyCode.KEY_LEFT_PRESS:
529            self.index = (self.index - 1) % len(self.items)
530        elif key is KeyCode.KEY_ENTER_PRESS:
531            self.items[list(self.items)[self.index]](lcd_manager)
532        elif key is KeyCode.KEY_UP_PRESS:
533            lcd_manager.page = LogoType()
534
535    def draw(self, lcd_manager: UIHandler):
536        """Draw item to screen."""
537        max_text_width = WIDTH - 2
538        item_name = list(self.items)[self.index][:max_text_width]
539
540        lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}")
541        lcd_manager.write_text(1, 0, " " * WIDTH)
542        lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>")
543        lcd_manager.write_text(3, 0, f"v{self.version:<19}")
544
545        lcd_manager.set_led((LEDColor.RED, LEDColor.OFF, LEDColor.OFF, LEDColor.OFF))

Test selection menu

title = 'Test Selection'
items
index
def generate_profile( self, lcd_manager: UIHandler):
510    def generate_profile(self, lcd_manager: UIHandler):
511        """Run profile generation test and go to progress page."""
512        command = "/opt/hitl/test fingerprint.plan -c -v"
513        lcd_manager.stack.append(
514            subprocess.Popen(  # pylint: disable=consider-using-with
515                shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True
516            ).pid
517        )
518        lcd_manager.page = ProgressType()

Run profile generation test and go to progress page.

def compare_profiles( self, lcd_manager: UIHandler):
520    def compare_profiles(self, lcd_manager: UIHandler):
521        """Go to profile selection page."""
522        lcd_manager.page = ProfileType()

Go to profile selection page.

def process_key( self, lcd_manager: UIHandler, key: KeyCode):
524    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
525        """Handle profile selection UI."""
526        if key is KeyCode.KEY_RIGHT_PRESS:
527            self.index = (self.index + 1) % len(self.items)
528        elif key is KeyCode.KEY_LEFT_PRESS:
529            self.index = (self.index - 1) % len(self.items)
530        elif key is KeyCode.KEY_ENTER_PRESS:
531            self.items[list(self.items)[self.index]](lcd_manager)
532        elif key is KeyCode.KEY_UP_PRESS:
533            lcd_manager.page = LogoType()

Handle profile selection UI.

def draw( self, lcd_manager: UIHandler):
535    def draw(self, lcd_manager: UIHandler):
536        """Draw item to screen."""
537        max_text_width = WIDTH - 2
538        item_name = list(self.items)[self.index][:max_text_width]
539
540        lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}")
541        lcd_manager.write_text(1, 0, " " * WIDTH)
542        lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>")
543        lcd_manager.write_text(3, 0, f"v{self.version:<19}")
544
545        lcd_manager.set_led((LEDColor.RED, LEDColor.OFF, LEDColor.OFF, LEDColor.OFF))

Draw item to screen.

Inherited Members
PageType
refresh
class ProgressType(PageType):
548class ProgressType(PageType):
549    """Test progress."""
550
551    SCROLL_RATE = 2  # Frequency of screen text scroll
552
553    def __init__(self) -> None:
554        self.data: dict[str, str | float] = {}
555        self.text_offset = 0
556        self.last_scroll_time = 0.0
557
558    def _pid_is_alive(self, pid: int):
559        """Check if PID exists."""
560        try:
561            os.kill(pid, 0)
562        except OSError as error:
563            match (error.errno):
564                case errno.ESRCH:  # ESRCH == No such process
565                    return False
566                case errno.EPERM:  # EPERM clearly means there's a process to deny access to
567                    return True
568                case _:
569                    raise
570        else:
571            return True
572
573    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
574        if key is KeyCode.KEY_EXIT_PRESS:
575            lcd_manager.stack.pop()
576            command = "/opt/hitl/kill"
577            subprocess.Popen(  # pylint: disable=consider-using-with
578                shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True
579            )
580            lcd_manager.page = TestMenuType()
581
582    def draw(self, lcd_manager: UIHandler):
583        """Draw item to screen."""
584        if isinstance(lcd_manager.stack[-1], int) and self._pid_is_alive(lcd_manager.stack[-1]):
585            if self.data:
586                # Set LED color
587                lcd_manager.set_led(
588                    (
589                        LEDColor.RED if self.data["name"] == "Test Completed" else LEDColor.GREEN,
590                        LEDColor.OFF,
591                        LEDColor.OFF,
592                        LEDColor.OFF,
593                    )
594                )
595                # Apply a marquee effect to the test title text
596                name = str(self.data["name"]) + "     "
597                name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20]
598                ete_minutes = int(self.data["ete"])
599                ete = f"{self.data.get('mode', '???????')}, ETR: {ete_minutes} min"
600                pass_fail = f"{self.data['passed']:>2} Passed,{self.data['failed']:>2} Failed"
601                prog_bar = f"\xFA{'=' * int(18 * self.data['progress']):-<18}\xFC"
602                lcd_manager.write_text(0, 0, f"{name:<20}")
603                lcd_manager.write_text(1, 0, f"{ete:<20}")
604                lcd_manager.write_text(2, 0, f"{pass_fail:^20}")
605                lcd_manager.write_text(3, 0, prog_bar)
606            else:
607                lcd_manager.clear_screen()
608                lcd_manager.write_text(2, 0, f"{'Preparing...':^20}")
609        else:
610            lcd_manager.stack.pop()
611            lcd_manager.page = TestMenuType()
612
613    def refresh(self, lcd_manager: UIHandler):
614        """Output test progress."""
615        old_data = self.data
616        with suppress(queue.Empty, KeyError):
617            self.data = lcd_manager.queue.get_nowait()
618            self.data["name"] = str(self.data["name"]).partition(" - ")[0].removeprefix("Test")
619
620        if self.data:
621            name_length = len(str(self.data.get("name", "")))
622            if old_data.get("name") != self.data["name"]:  # New test
623                self.text_offset = -1  # Reset offset when new title is added
624                self.draw(lcd_manager)
625            elif name_length > 20 and time.perf_counter() - self.last_scroll_time > 1 / self.SCROLL_RATE:  # Scroll name
626                self.last_scroll_time = time.perf_counter()
627                self.text_offset = (self.text_offset + 1) % name_length
628                name = str(self.data["name"]) + "     "
629                name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20]
630                lcd_manager.write_text(0, 0, f"{name:<20}")
631            elif old_data != self.data:  # Don't draw when not needed
632                self.draw(lcd_manager)

Test progress.

SCROLL_RATE = 2
data: dict[str, str | float]
text_offset
last_scroll_time
def process_key( self, lcd_manager: UIHandler, key: KeyCode):
573    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
574        if key is KeyCode.KEY_EXIT_PRESS:
575            lcd_manager.stack.pop()
576            command = "/opt/hitl/kill"
577            subprocess.Popen(  # pylint: disable=consider-using-with
578                shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True
579            )
580            lcd_manager.page = TestMenuType()

Perform some action based on the key.

def draw( self, lcd_manager: UIHandler):
582    def draw(self, lcd_manager: UIHandler):
583        """Draw item to screen."""
584        if isinstance(lcd_manager.stack[-1], int) and self._pid_is_alive(lcd_manager.stack[-1]):
585            if self.data:
586                # Set LED color
587                lcd_manager.set_led(
588                    (
589                        LEDColor.RED if self.data["name"] == "Test Completed" else LEDColor.GREEN,
590                        LEDColor.OFF,
591                        LEDColor.OFF,
592                        LEDColor.OFF,
593                    )
594                )
595                # Apply a marquee effect to the test title text
596                name = str(self.data["name"]) + "     "
597                name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20]
598                ete_minutes = int(self.data["ete"])
599                ete = f"{self.data.get('mode', '???????')}, ETR: {ete_minutes} min"
600                pass_fail = f"{self.data['passed']:>2} Passed,{self.data['failed']:>2} Failed"
601                prog_bar = f"\xFA{'=' * int(18 * self.data['progress']):-<18}\xFC"
602                lcd_manager.write_text(0, 0, f"{name:<20}")
603                lcd_manager.write_text(1, 0, f"{ete:<20}")
604                lcd_manager.write_text(2, 0, f"{pass_fail:^20}")
605                lcd_manager.write_text(3, 0, prog_bar)
606            else:
607                lcd_manager.clear_screen()
608                lcd_manager.write_text(2, 0, f"{'Preparing...':^20}")
609        else:
610            lcd_manager.stack.pop()
611            lcd_manager.page = TestMenuType()

Draw item to screen.

def refresh( self, lcd_manager: UIHandler):
613    def refresh(self, lcd_manager: UIHandler):
614        """Output test progress."""
615        old_data = self.data
616        with suppress(queue.Empty, KeyError):
617            self.data = lcd_manager.queue.get_nowait()
618            self.data["name"] = str(self.data["name"]).partition(" - ")[0].removeprefix("Test")
619
620        if self.data:
621            name_length = len(str(self.data.get("name", "")))
622            if old_data.get("name") != self.data["name"]:  # New test
623                self.text_offset = -1  # Reset offset when new title is added
624                self.draw(lcd_manager)
625            elif name_length > 20 and time.perf_counter() - self.last_scroll_time > 1 / self.SCROLL_RATE:  # Scroll name
626                self.last_scroll_time = time.perf_counter()
627                self.text_offset = (self.text_offset + 1) % name_length
628                name = str(self.data["name"]) + "     "
629                name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20]
630                lcd_manager.write_text(0, 0, f"{name:<20}")
631            elif old_data != self.data:  # Don't draw when not needed
632                self.draw(lcd_manager)

Output test progress.

Inherited Members
PageType
title
class LogoType(PageType):
635class LogoType(PageType):
636    """Display logo."""
637
638    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
639        """Handle profile selection UI."""
640        if key <= KeyCode.KEY_EXIT_PRESS:  # Any key press
641            lcd_manager.save_state()
642            lcd_manager.page = TestMenuType()
643
644    def draw(self, lcd_manager: UIHandler):
645        """Draw image to screen."""
646        lcd_manager.draw_image("TAFLogo.bmp")

Display logo.

def process_key( self, lcd_manager: UIHandler, key: KeyCode):
638    def process_key(self, lcd_manager: UIHandler, key: KeyCode):
639        """Handle profile selection UI."""
640        if key <= KeyCode.KEY_EXIT_PRESS:  # Any key press
641            lcd_manager.save_state()
642            lcd_manager.page = TestMenuType()

Handle profile selection UI.

def draw( self, lcd_manager: UIHandler):
644    def draw(self, lcd_manager: UIHandler):
645        """Draw image to screen."""
646        lcd_manager.draw_image("TAFLogo.bmp")

Draw image to screen.

Inherited Members
PageType
title
refresh
def test_main():
649def test_main():
650    """Set up logging and run manager."""
651    logger.setLevel(logging.DEBUG)
652    formatter = logging.Formatter(
653        "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p"
654    )
655
656    # Set up console logging
657    console_logger = logging.StreamHandler()
658    console_logger.setLevel(logging.DEBUG)
659    console_logger.setFormatter(formatter)
660    console_logger.terminator = "\r\n"
661    logger.addHandler(console_logger)
662
663    # Set up file logging
664    LOG_FILE.parent.mkdir(parents=True, exist_ok=True)
665    file_logger = logging.FileHandler(LOG_FILE)
666    file_logger.setLevel(logging.DEBUG)
667    file_logger.setFormatter(formatter)
668    logger.addHandler(file_logger)
669    logger.debug(f"Log Filename: {LOG_FILE}")
670
671    monitor = LCDMonitor()  # Begin monitoring for packets
672    screen = UIHandler(monitor)  # Manage LCD UI
673    screen.run_forever()

Set up logging and run manager.