hitl_tester.modules.cyber_6t.lcd_manager
LCD controller for Crystalfontz screens.
(c) 2020-2024 TurnAround Factor, Inc.
#
CUI DISTRIBUTION CONTROL
Controlled by: DLA J68 R&D SBIP
CUI Category: Small Business Research and Technology
Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
POC: GOV SBIP Program Manager Denise Price, 571-767-0111
Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
Fort Belvoir, VA 22060-6221
#
SBIR DATA RIGHTS
Contract No.:SP4701-23-C-0083
Contractor Name: TurnAround Factor, Inc.
Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
Expiration of SBIR Data Rights Period: September 24, 2029
The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
the markings.
1""" 2LCD controller for Crystalfontz screens. 3 4# (c) 2020-2024 TurnAround Factor, Inc. 5# 6# CUI DISTRIBUTION CONTROL 7# Controlled by: DLA J68 R&D SBIP 8# CUI Category: Small Business Research and Technology 9# Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS 10# POC: GOV SBIP Program Manager Denise Price, 571-767-0111 11# Distribution authorized to U.S. Government Agencies only, to protect information not owned by the 12# U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that 13# it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests 14# for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, 15# Fort Belvoir, VA 22060-6221 16# 17# SBIR DATA RIGHTS 18# Contract No.:SP4701-23-C-0083 19# Contractor Name: TurnAround Factor, Inc. 20# Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 21# Expiration of SBIR Data Rights Period: September 24, 2029 22# The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer 23# software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights 24# in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause 25# contained in the above identified contract. No restrictions apply after the expiration date shown above. Any 26# reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce 27# the markings. 28""" 29 30import csv 31import errno 32import itertools 33import logging 34import os 35import platform 36import queue 37import shlex 38import signal 39import subprocess 40import sys 41import threading 42import time 43import traceback 44from collections import deque 45from contextlib import suppress 46from dataclasses import dataclass, asdict, replace 47from datetime import datetime, timezone 48from enum import IntEnum 49from multiprocessing.managers import BaseManager 50from pathlib import Path 51from queue import Queue 52from subprocess import DEVNULL 53from typing import ClassVar, Any, Self 54 55import serial 56import serial.tools.list_ports 57 58 59WIDTH = 20 60HEIGHT = 4 61PORT = 51152 62AUTH_KEY = b"HITL" 63REPORT_NAME = Path(f"{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%S.%f')}_{platform.node()}_lcd_manager") 64LOG_FOLDER = Path(__file__).parent.parent.resolve() / "logs" / REPORT_NAME 65LOG_FILE = LOG_FOLDER / REPORT_NAME.with_suffix(".txt") 66logger = logging.getLogger("HITL-LCD") 67 68 69def signal_handler(_sig, _frame): 70 """Exit gracefully.""" 71 logger.info("Shutting down...") 72 sys.exit(0) 73 74 75@dataclass 76class Crystalfontz: 77 """Hardware info for Crystalfontz LCD.""" 78 79 models: ClassVar[list[Self]] = [] 80 pid: int 81 vid: int = 8763 82 baud: int = 115200 83 84 def __post_init__(self): 85 self.models.append(self) 86 87 88class LCDPort: 89 """Manager for serial port.""" 90 91 cfa835 = Crystalfontz(pid=5) 92 # cfa635 = Crystalfontz(pid=11) 93 94 def __init__(self) -> None: 95 self.index = 0 96 self.buffer: list[int] = [] 97 98 # Get serial port 99 for comport, model in itertools.product(serial.tools.list_ports.comports(), Crystalfontz.models): 100 if comport.vid == model.vid and comport.pid == model.pid: 101 logger.debug(f"Found LCD VID: {comport.vid}, PID: {comport.pid}") 102 break 103 else: 104 raise RuntimeError(f"LCD could not be found: {', '.join(map(repr, Crystalfontz.models))}") 105 self.serial = serial.Serial(port=comport.device, baudrate=model.baud, timeout=0) 106 107 def write(self, data: bytes, timeout: float | None = None): 108 """Write data to the serial port.""" 109 self.serial.write_timeout = timeout # 0 seems to block? 110 logger.debug(f"Writing bytes: {data!r}") 111 with suppress(serial.serialutil.SerialTimeoutException): 112 self.serial.write(data) 113 self.serial.flush() 114 115 def consume(self, timeout: float = 0.1): 116 """Fetch the current byte and advance.""" 117 start_time = time.perf_counter() 118 while self.serial.in_waiting > 0 or self.index >= len(self.buffer): 119 if self.serial.in_waiting > 0: 120 try: 121 self.buffer += list(self.serial.read(self.serial.in_waiting)) 122 except serial.serialutil.SerialException as e: 123 logger.debug(f"Got error: {e}") 124 time.sleep(1) 125 start_time = time.perf_counter() 126 if timeout and time.perf_counter() - start_time > timeout: 127 raise TimeoutError(f"No data after {timeout} seconds.") 128 result = self.buffer[self.index] 129 self.index += 1 130 return result 131 132 def clear_history(self): 133 """Discard data before the current index.""" 134 self.buffer = self.buffer[self.index :] 135 self.index = 0 136 137 138@dataclass 139class RawPacket: 140 """A raw LCD packet.""" 141 142 command: int = 0 143 data_length: int = 0 144 data: bytes = b"" 145 crc: int = 0 146 147 def to_bytes(self) -> bytes: 148 """Convert everything but the CRC to bytes.""" 149 return ( 150 bytes([self.command & 0xFF, self.data_length & 0xFF]) + self.data + self.crc.to_bytes(2, byteorder="little") 151 ) 152 153 def calculate_crc(self): 154 """Fill in CRC with the correct value.""" 155 crc = 0xFFFF # Preset to all 1's, prevent loss of leading zeros 156 for byte in self.to_bytes()[:-2]: # Strip current CRC bytes 157 for _ in range(8): 158 if (crc ^ byte) & 0x01: 159 crc >>= 1 160 crc ^= 0x8408 161 else: 162 crc >>= 1 163 byte >>= 1 164 self.crc = (~crc) & 0xFFFF 165 166 167class LCDMonitor: 168 """Manage LCD events.""" 169 170 # Supported commands: hw info (0x1), brightness (0x0E, 0x0D), cursor (0x0C), error (0xC0), leds (0x22) 171 172 MAX_ATTEMPTS = 2 173 174 class ResponseError(IntEnum): 175 """Possible LCD error responses.""" 176 177 UNKNOWN_ERROR = 1 178 UNKNOWN_COMMAND = 2 179 INVALID_COMMAND_LENGTH_OPTIONS = 3 180 WRITING_FLASH_MEM_FAILED = 4 181 READING_FLASH_MEM_FAILED = 5 182 CFA_FBSCAB_NOT_PRESENT_AT_INDEX = 6 183 CFA_FBSCAB_DID_NOT_REPLY_TO_REG = 7 184 MICRO_SD_NOT_INSERTED_OR_BAD = 8 185 MICRO_SD_NOT_FORMATTED = 9 186 MICRO_SD_FILE_COULD_NOT_BE_FOUND_OPENED = 10 187 MICRO_SD_UNKNOWN_ERROR = 11 188 MICRO_SD_FILE_COULD_NOT_BE_READ = 12 189 MICRO_SD_COULD_NOT_BE_WRITTEN = 13 190 FILE_HEADER_IS_INVALID = 14 191 MICRO_SD_FILE_IS_ALREADY_OPEN = 15 192 MICRO_SD_FILE_OPERATION_FAILED = 16 193 MICRO_SD_FILE_HAS_NOT_BEEN_OPENED = 17 194 GFX_STREAM_ALREADY_STARTED = 18 195 GFX_IS_OUT_OF_LCD_BOUNDS = 19 196 VIDEO_IS_NOT_OPEN_IN_SLOT = 20 197 GFX_STREAM_HAS_TIMED_OUT = 21 198 GPIO_NOT_SET_FOR_ATX_USE = 22 199 INTERFACE_NOT_ENABLED = 23 200 INTERFACE_NOT_AVAILABLE_ = 24 201 202 def __init__(self, packet_buffer_size: int = 4096): 203 self.dropped_packets = 0 204 self.packet_deque: deque[RawPacket] = deque(maxlen=packet_buffer_size) 205 206 # Start serial monitor thread 207 self.serial_port = LCDPort() 208 209 # Create log file 210 with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile: 211 csv.writer(csvfile).writerow(asdict(RawPacket())) 212 213 # Start monitor thread 214 self.daemon = threading.Thread(target=self.runtime, daemon=True) 215 self.daemon.start() 216 217 def runtime(self): 218 """Fetch packets as quickly as possible.""" 219 220 def fetch_packet() -> RawPacket: 221 """Decode serial stream to a valid packet.""" 222 while True: 223 # Fetch raw packet 224 try: 225 raw_packet = RawPacket( 226 command=self.serial_port.consume(timeout=0), 227 data_length=(length := self.serial_port.consume()), 228 data=bytes(self.serial_port.consume() for _ in range(length)), 229 crc=self.serial_port.consume() | (self.serial_port.consume() << 8), 230 ) 231 except TimeoutError: 232 logger.warning( 233 "Timeout when attempting to read packet. Possibly an invalid packet cause by dropped bytes." 234 ) 235 else: 236 # Validate CRC 237 logger.debug(f"Packet?: {asdict(raw_packet)}") 238 valid_packet = replace(raw_packet) # Calculate the expected CRC 239 valid_packet.calculate_crc() 240 if raw_packet.crc == valid_packet.crc: 241 logger.debug("got lcd packet") 242 self.serial_port.clear_history() 243 return raw_packet 244 logger.debug(f"Invalid CRC 0x{raw_packet.crc:04X}, expected 0x{valid_packet.crc:04X}.") 245 logger.debug(f"LCD Buffer: {self.serial_port.buffer}") 246 logger.debug(f"Discarding lcd byte: {self.serial_port.buffer[0]:02X}") 247 self.serial_port.index = 1 248 self.serial_port.clear_history() 249 250 while True: 251 packet = fetch_packet() 252 if len(self.packet_deque) == self.packet_deque.maxlen: # Deque is full 253 self.dropped_packets += 1 254 self.packet_deque.append(packet) # Save to deque 255 logger.debug(f"{len(self.packet_deque)} packet(s) in queue: {', '.join(map(str, self.packet_deque))}") 256 257 # Save to file 258 with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile: 259 csv.writer(csvfile).writerow(asdict(packet).values()) 260 261 time.sleep(0.1) # Packets only come in when a 262 263 def read(self, blocking: bool = True, timeout: float = 0) -> RawPacket | None: 264 """Get keys or acknowledgment.""" 265 start_time = time.perf_counter() 266 while not timeout or time.perf_counter() - start_time < timeout: 267 with suppress(IndexError): 268 return self.packet_deque.popleft() 269 if not blocking: 270 return None 271 time.sleep(0.1) # Packets only come in on user input 272 raise TimeoutError(f"A packet was not received after {timeout:.1f} seconds.") 273 274 def write(self, command_packet: RawPacket, timeout: float = 2): 275 """Send command to LCD""" 276 for attempt_number in range(self.MAX_ATTEMPTS): 277 self.serial_port.write(command_packet.to_bytes(), timeout) 278 with suppress(TimeoutError): 279 response = self.read(timeout=timeout) 280 if response and (0x40 | command_packet.command) == response.command: 281 break 282 if response and (response.command & 0xC0) == 0xC0: 283 error_enum = self.ResponseError(response.command & ((~0xC0) & 0xFF)) 284 error_message = error_enum.name.capitalize().replace("_", " ") 285 logger.warning(f"Received error response: {error_message}.") 286 break 287 logger.error(f"LCD response not received or invalid for attempt {attempt_number + 1}.") 288 time.sleep(0.1) 289 290 291class KeyCode(IntEnum): 292 """The key pressed on the keypad.""" 293 294 KEY_UP_PRESS = 1 295 KEY_DOWN_PRESS = 2 296 KEY_LEFT_PRESS = 3 297 KEY_RIGHT_PRESS = 4 298 KEY_ENTER_PRESS = 5 299 KEY_EXIT_PRESS = 6 300 KEY_UP_RELEASE = 7 301 KEY_DOWN_RELEASE = 8 302 KEY_LEFT_RELEASE = 9 303 KEY_RIGHT_RELEASE = 10 304 KEY_ENTER_RELEASE = 11 305 KEY_EXIT_RELEASE = 12 306 307 308class LEDColor(IntEnum): 309 """The LED color.""" 310 311 OFF = 0 312 RED = 1 313 GREEN = 2 314 YELLOW = 3 315 316 317class UIHandler: 318 """Manage LCD UI.""" 319 320 FREQUENCY = 20 # How often to check for input / refresh screen 321 322 def __init__(self, lcd_monitor: LCDMonitor): 323 self.lcd_monitor = lcd_monitor 324 self.page: PageType = TestMenuType() 325 self.stack: list[Any] = [] # Allows pages to communicate 326 # self.reset() 327 self.clear_screen() 328 self.page.draw(self) 329 330 # Progress queue for communicating with the test 331 class QueueManager(BaseManager): 332 """Manager for multiprocess queue.""" 333 334 self._backing_queue: Queue[dict[str, str | float]] = Queue() 335 QueueManager.register("get_queue", callable=lambda: self._backing_queue) 336 self._queue_manager = QueueManager(address=("", PORT), authkey=AUTH_KEY) 337 self._queue_manager.start() # pylint: disable=consider-using-with 338 self.queue: Queue[dict[str, str | float]] = self._queue_manager.get_queue() # type: ignore[attr-defined] # pylint: disable=no-member 339 340 def run_forever(self): 341 """Process LCD events forever.""" 342 while True: 343 try: 344 # Check if a test is running and show progress if so 345 if not isinstance(self.page, ProgressType): 346 current_name = Path("/proc/self/comm").read_text(encoding="ascii") 347 for pid in Path("/proc").glob("[0-9]*"): 348 with suppress(FileNotFoundError): 349 process_name = (pid / "comm").read_text(encoding="ascii") 350 if process_name != current_name and process_name.startswith("HITL") and pid.stem.isdigit(): 351 logger.debug(f"Found running test at PID: {pid.stem}") 352 self.stack.append(int(pid.stem)) 353 self.page = ProgressType() 354 355 # Check for new key inputs 356 if (packet := self.lcd_monitor.read(blocking=False)) and packet.command == 0x80: 357 self.key_event(packet) 358 self.page.refresh(self) 359 360 time.sleep(1 / self.FREQUENCY) # Don't waste cycles when nothing happens 99% of the time 361 except Exception: # pylint: disable=broad-exception-caught 362 logger.error(traceback.format_exc()) 363 time.sleep(1) 364 365 def key_event(self, packet: RawPacket): 366 """Process key event.""" 367 self.page.process_key(self, KeyCode(packet.data[0] if packet.data else 0)) 368 self.page.draw(self) 369 370 def draw_image(self, filename: str): 371 """Draw an image to the LCD.""" 372 raw_filename = filename.encode("latin1") # Map bytes 1-to-1 373 enable_transparency = False # pixel value 0 is transparent 374 invert_image_shade = False # will invert transparency value also 375 x_start = 0 376 y_start = 0 377 packet = RawPacket( 378 command=0x28, 379 data_length=4 + len(raw_filename or "EE"), 380 data=bytes([3, enable_transparency | (invert_image_shade << 1), x_start, y_start, *raw_filename]), 381 ) 382 packet.calculate_crc() 383 self.lcd_monitor.write(packet) 384 385 def write_text(self, row: int, column: int, text: str): 386 """Write text to the LCD.""" 387 raw_text = text.encode("latin1") # Map bytes 1-to-1 388 packet = RawPacket( 389 command=0x1F, 390 data_length=2 + len(raw_text or "E"), 391 data=bytes([column, row, *raw_text]), 392 ) 393 packet.calculate_crc() 394 self.lcd_monitor.write(packet) 395 396 def clear_screen(self): 397 """Clear text from the screen.""" 398 packet = RawPacket(command=0x06, data_length=0) 399 packet.calculate_crc() 400 self.lcd_monitor.write(packet) 401 402 def reset(self): 403 """Reset the LCD device.""" 404 packet = RawPacket(command=0x05, data_length=3, data=bytes([8, 25, 48])) 405 packet.calculate_crc() 406 self.lcd_monitor.write(packet) 407 time.sleep(4) # May not respond for up to 3 seconds 408 409 def set_led(self, colors: tuple[LEDColor, LEDColor, LEDColor, LEDColor]): 410 """Set the on-board LCD values.""" 411 for led_id in range(4): 412 packet = RawPacket( 413 command=0x22, data_length=2, data=bytes([5 + led_id * 2, 100 * bool(colors[led_id] & 2)]) 414 ) 415 packet.calculate_crc() 416 self.lcd_monitor.write(packet) 417 packet = RawPacket( 418 command=0x22, data_length=2, data=bytes([6 + led_id * 2, 100 * bool(colors[led_id] & 1)]) 419 ) 420 packet.calculate_crc() 421 self.lcd_monitor.write(packet) 422 423 def save_state(self): 424 """Save state as boot state""" 425 packet = RawPacket(command=0x4, data_length=0) 426 packet.calculate_crc() 427 self.lcd_monitor.write(packet) 428 429 430class PageType: 431 """An LCD page.""" 432 433 title = "" 434 435 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 436 """Perform some action based on the key.""" 437 438 def draw(self, lcd_manager: UIHandler): 439 """Draw to screen.""" 440 441 def refresh(self, lcd_manager: UIHandler): 442 """Update internal values. Called continuously.""" 443 444 445class ProfileType(PageType): 446 """Test selection menu""" 447 448 title = "Profile Selection" 449 450 def __init__(self): 451 profile_paths = (profile_path for profile_path in Path("/var/www/logs/profile/").glob("*.json")) 452 profile_paths = sorted(profile_paths, key=lambda path: path.stat().st_mtime) 453 self.items = {path.stem: path for path in profile_paths} 454 self.index = 0 455 try: 456 self.version = Path("/var/www/.version").read_text(encoding="utf-8").strip("\r\n") 457 except Exception: # pylint: disable=broad-exception-caught 458 logger.error(traceback.format_exc()) 459 self.version = "1.0.1" 460 461 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 462 """Handle profile selection UI.""" 463 if key is KeyCode.KEY_RIGHT_PRESS: 464 self.index = (self.index + 1) % len(self.items) 465 elif key is KeyCode.KEY_LEFT_PRESS: 466 self.index = (self.index - 1) % len(self.items) 467 elif key is KeyCode.KEY_ENTER_PRESS: 468 selected_profile = list(self.items.values())[self.index] 469 command = f"/opt/hitl/test fingerprint.plan -c -v -D PROFILE={selected_profile}" 470 lcd_manager.stack.append( 471 subprocess.Popen( # pylint: disable=consider-using-with 472 shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True 473 ).pid 474 ) 475 lcd_manager.page = ProgressType() 476 elif key is KeyCode.KEY_EXIT_PRESS: 477 lcd_manager.page = TestMenuType() 478 479 def draw(self, lcd_manager: UIHandler): 480 """Draw item to screen.""" 481 max_text_width = WIDTH - 2 482 manufacturer_name, _, serial_id = list(self.items)[self.index].partition("_6T_") 483 manufacturer_name = manufacturer_name[:WIDTH] 484 item_name = serial_id[:max_text_width] 485 486 lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}") 487 lcd_manager.write_text(1, 0, f"{manufacturer_name:<20}") 488 lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>") 489 lcd_manager.write_text(3, 0, f"v{self.version:<19}") 490 491 492class TestMenuType(PageType): 493 """Test selection menu""" 494 495 title = "Test Selection" 496 497 def __init__(self): 498 self.items = { 499 "Generate profile": self.generate_profile, 500 "Compare profiles": self.compare_profiles, 501 } 502 self.index = 0 503 try: 504 self.version = Path("/var/www/.version").read_text(encoding="utf-8").strip("\r\n") 505 except Exception: # pylint: disable=broad-exception-caught 506 logger.error(traceback.format_exc()) 507 self.version = "1.0.1" 508 509 def generate_profile(self, lcd_manager: UIHandler): 510 """Run profile generation test and go to progress page.""" 511 command = "/opt/hitl/test fingerprint.plan -c -v" 512 lcd_manager.stack.append( 513 subprocess.Popen( # pylint: disable=consider-using-with 514 shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True 515 ).pid 516 ) 517 lcd_manager.page = ProgressType() 518 519 def compare_profiles(self, lcd_manager: UIHandler): 520 """Go to profile selection page.""" 521 lcd_manager.page = ProfileType() 522 523 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 524 """Handle profile selection UI.""" 525 if key is KeyCode.KEY_RIGHT_PRESS: 526 self.index = (self.index + 1) % len(self.items) 527 elif key is KeyCode.KEY_LEFT_PRESS: 528 self.index = (self.index - 1) % len(self.items) 529 elif key is KeyCode.KEY_ENTER_PRESS: 530 self.items[list(self.items)[self.index]](lcd_manager) 531 elif key is KeyCode.KEY_UP_PRESS: 532 lcd_manager.page = LogoType() 533 534 def draw(self, lcd_manager: UIHandler): 535 """Draw item to screen.""" 536 max_text_width = WIDTH - 2 537 item_name = list(self.items)[self.index][:max_text_width] 538 539 lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}") 540 lcd_manager.write_text(1, 0, " " * WIDTH) 541 lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>") 542 lcd_manager.write_text(3, 0, f"v{self.version:<19}") 543 544 lcd_manager.set_led((LEDColor.RED, LEDColor.OFF, LEDColor.OFF, LEDColor.OFF)) 545 546 547class ProgressType(PageType): 548 """Test progress.""" 549 550 SCROLL_RATE = 2 # Frequency of screen text scroll 551 552 def __init__(self) -> None: 553 self.data: dict[str, str | float] = {} 554 self.text_offset = 0 555 self.last_scroll_time = 0.0 556 557 def _pid_is_alive(self, pid: int): 558 """Check if PID exists.""" 559 try: 560 os.kill(pid, 0) 561 except OSError as error: 562 match (error.errno): 563 case errno.ESRCH: # ESRCH == No such process 564 return False 565 case errno.EPERM: # EPERM clearly means there's a process to deny access to 566 return True 567 case _: 568 raise 569 else: 570 return True 571 572 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 573 if key is KeyCode.KEY_EXIT_PRESS: 574 lcd_manager.stack.pop() 575 command = "/opt/hitl/kill" 576 subprocess.Popen( # pylint: disable=consider-using-with 577 shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True 578 ) 579 lcd_manager.page = TestMenuType() 580 581 def draw(self, lcd_manager: UIHandler): 582 """Draw item to screen.""" 583 if isinstance(lcd_manager.stack[-1], int) and self._pid_is_alive(lcd_manager.stack[-1]): 584 if self.data: 585 # Set LED color 586 lcd_manager.set_led( 587 ( 588 LEDColor.RED if self.data["name"] == "Test Completed" else LEDColor.GREEN, 589 LEDColor.OFF, 590 LEDColor.OFF, 591 LEDColor.OFF, 592 ) 593 ) 594 # Apply a marquee effect to the test title text 595 name = str(self.data["name"]) + " " 596 name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20] 597 ete_minutes = int(self.data["ete"]) 598 ete = f"{self.data.get('mode', '???????')}, ETR: {ete_minutes} min" 599 pass_fail = f"{self.data['passed']:>2} Passed,{self.data['failed']:>2} Failed" 600 prog_bar = f"\xFA{'=' * int(18 * self.data['progress']):-<18}\xFC" 601 lcd_manager.write_text(0, 0, f"{name:<20}") 602 lcd_manager.write_text(1, 0, f"{ete:<20}") 603 lcd_manager.write_text(2, 0, f"{pass_fail:^20}") 604 lcd_manager.write_text(3, 0, prog_bar) 605 else: 606 lcd_manager.clear_screen() 607 lcd_manager.write_text(2, 0, f"{'Preparing...':^20}") 608 else: 609 lcd_manager.stack.pop() 610 lcd_manager.page = TestMenuType() 611 612 def refresh(self, lcd_manager: UIHandler): 613 """Output test progress.""" 614 old_data = self.data 615 with suppress(queue.Empty, KeyError): 616 self.data = lcd_manager.queue.get_nowait() 617 self.data["name"] = str(self.data["name"]).partition(" - ")[0].removeprefix("Test") 618 619 if self.data: 620 name_length = len(str(self.data.get("name", ""))) 621 if old_data.get("name") != self.data["name"]: # New test 622 self.text_offset = -1 # Reset offset when new title is added 623 self.draw(lcd_manager) 624 elif name_length > 20 and time.perf_counter() - self.last_scroll_time > 1 / self.SCROLL_RATE: # Scroll name 625 self.last_scroll_time = time.perf_counter() 626 self.text_offset = (self.text_offset + 1) % name_length 627 name = str(self.data["name"]) + " " 628 name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20] 629 lcd_manager.write_text(0, 0, f"{name:<20}") 630 elif old_data != self.data: # Don't draw when not needed 631 self.draw(lcd_manager) 632 633 634class LogoType(PageType): 635 """Display logo.""" 636 637 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 638 """Handle profile selection UI.""" 639 if key <= KeyCode.KEY_EXIT_PRESS: # Any key press 640 lcd_manager.save_state() 641 lcd_manager.page = TestMenuType() 642 643 def draw(self, lcd_manager: UIHandler): 644 """Draw image to screen.""" 645 lcd_manager.draw_image("TAFLogo.bmp") 646 647 648def test_main(): 649 """Set up logging and run manager.""" 650 logger.setLevel(logging.DEBUG) 651 formatter = logging.Formatter( 652 "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p" 653 ) 654 655 # Set up console logging 656 console_logger = logging.StreamHandler() 657 console_logger.setLevel(logging.DEBUG) 658 console_logger.setFormatter(formatter) 659 console_logger.terminator = "\r\n" 660 logger.addHandler(console_logger) 661 662 # Set up file logging 663 LOG_FILE.parent.mkdir(parents=True, exist_ok=True) 664 file_logger = logging.FileHandler(LOG_FILE) 665 file_logger.setLevel(logging.DEBUG) 666 file_logger.setFormatter(formatter) 667 logger.addHandler(file_logger) 668 logger.debug(f"Log Filename: {LOG_FILE}") 669 670 monitor = LCDMonitor() # Begin monitoring for packets 671 screen = UIHandler(monitor) # Manage LCD UI 672 screen.run_forever() 673 674 675if __name__ == "__main__": 676 signal.signal(signal.SIGINT, signal_handler) 677 test_main()
70def signal_handler(_sig, _frame): 71 """Exit gracefully.""" 72 logger.info("Shutting down...") 73 sys.exit(0)
Exit gracefully.
76@dataclass 77class Crystalfontz: 78 """Hardware info for Crystalfontz LCD.""" 79 80 models: ClassVar[list[Self]] = [] 81 pid: int 82 vid: int = 8763 83 baud: int = 115200 84 85 def __post_init__(self): 86 self.models.append(self)
Hardware info for Crystalfontz LCD.
89class LCDPort: 90 """Manager for serial port.""" 91 92 cfa835 = Crystalfontz(pid=5) 93 # cfa635 = Crystalfontz(pid=11) 94 95 def __init__(self) -> None: 96 self.index = 0 97 self.buffer: list[int] = [] 98 99 # Get serial port 100 for comport, model in itertools.product(serial.tools.list_ports.comports(), Crystalfontz.models): 101 if comport.vid == model.vid and comport.pid == model.pid: 102 logger.debug(f"Found LCD VID: {comport.vid}, PID: {comport.pid}") 103 break 104 else: 105 raise RuntimeError(f"LCD could not be found: {', '.join(map(repr, Crystalfontz.models))}") 106 self.serial = serial.Serial(port=comport.device, baudrate=model.baud, timeout=0) 107 108 def write(self, data: bytes, timeout: float | None = None): 109 """Write data to the serial port.""" 110 self.serial.write_timeout = timeout # 0 seems to block? 111 logger.debug(f"Writing bytes: {data!r}") 112 with suppress(serial.serialutil.SerialTimeoutException): 113 self.serial.write(data) 114 self.serial.flush() 115 116 def consume(self, timeout: float = 0.1): 117 """Fetch the current byte and advance.""" 118 start_time = time.perf_counter() 119 while self.serial.in_waiting > 0 or self.index >= len(self.buffer): 120 if self.serial.in_waiting > 0: 121 try: 122 self.buffer += list(self.serial.read(self.serial.in_waiting)) 123 except serial.serialutil.SerialException as e: 124 logger.debug(f"Got error: {e}") 125 time.sleep(1) 126 start_time = time.perf_counter() 127 if timeout and time.perf_counter() - start_time > timeout: 128 raise TimeoutError(f"No data after {timeout} seconds.") 129 result = self.buffer[self.index] 130 self.index += 1 131 return result 132 133 def clear_history(self): 134 """Discard data before the current index.""" 135 self.buffer = self.buffer[self.index :] 136 self.index = 0
Manager for serial port.
108 def write(self, data: bytes, timeout: float | None = None): 109 """Write data to the serial port.""" 110 self.serial.write_timeout = timeout # 0 seems to block? 111 logger.debug(f"Writing bytes: {data!r}") 112 with suppress(serial.serialutil.SerialTimeoutException): 113 self.serial.write(data) 114 self.serial.flush()
Write data to the serial port.
116 def consume(self, timeout: float = 0.1): 117 """Fetch the current byte and advance.""" 118 start_time = time.perf_counter() 119 while self.serial.in_waiting > 0 or self.index >= len(self.buffer): 120 if self.serial.in_waiting > 0: 121 try: 122 self.buffer += list(self.serial.read(self.serial.in_waiting)) 123 except serial.serialutil.SerialException as e: 124 logger.debug(f"Got error: {e}") 125 time.sleep(1) 126 start_time = time.perf_counter() 127 if timeout and time.perf_counter() - start_time > timeout: 128 raise TimeoutError(f"No data after {timeout} seconds.") 129 result = self.buffer[self.index] 130 self.index += 1 131 return result
Fetch the current byte and advance.
139@dataclass 140class RawPacket: 141 """A raw LCD packet.""" 142 143 command: int = 0 144 data_length: int = 0 145 data: bytes = b"" 146 crc: int = 0 147 148 def to_bytes(self) -> bytes: 149 """Convert everything but the CRC to bytes.""" 150 return ( 151 bytes([self.command & 0xFF, self.data_length & 0xFF]) + self.data + self.crc.to_bytes(2, byteorder="little") 152 ) 153 154 def calculate_crc(self): 155 """Fill in CRC with the correct value.""" 156 crc = 0xFFFF # Preset to all 1's, prevent loss of leading zeros 157 for byte in self.to_bytes()[:-2]: # Strip current CRC bytes 158 for _ in range(8): 159 if (crc ^ byte) & 0x01: 160 crc >>= 1 161 crc ^= 0x8408 162 else: 163 crc >>= 1 164 byte >>= 1 165 self.crc = (~crc) & 0xFFFF
A raw LCD packet.
148 def to_bytes(self) -> bytes: 149 """Convert everything but the CRC to bytes.""" 150 return ( 151 bytes([self.command & 0xFF, self.data_length & 0xFF]) + self.data + self.crc.to_bytes(2, byteorder="little") 152 )
Convert everything but the CRC to bytes.
154 def calculate_crc(self): 155 """Fill in CRC with the correct value.""" 156 crc = 0xFFFF # Preset to all 1's, prevent loss of leading zeros 157 for byte in self.to_bytes()[:-2]: # Strip current CRC bytes 158 for _ in range(8): 159 if (crc ^ byte) & 0x01: 160 crc >>= 1 161 crc ^= 0x8408 162 else: 163 crc >>= 1 164 byte >>= 1 165 self.crc = (~crc) & 0xFFFF
Fill in CRC with the correct value.
168class LCDMonitor: 169 """Manage LCD events.""" 170 171 # Supported commands: hw info (0x1), brightness (0x0E, 0x0D), cursor (0x0C), error (0xC0), leds (0x22) 172 173 MAX_ATTEMPTS = 2 174 175 class ResponseError(IntEnum): 176 """Possible LCD error responses.""" 177 178 UNKNOWN_ERROR = 1 179 UNKNOWN_COMMAND = 2 180 INVALID_COMMAND_LENGTH_OPTIONS = 3 181 WRITING_FLASH_MEM_FAILED = 4 182 READING_FLASH_MEM_FAILED = 5 183 CFA_FBSCAB_NOT_PRESENT_AT_INDEX = 6 184 CFA_FBSCAB_DID_NOT_REPLY_TO_REG = 7 185 MICRO_SD_NOT_INSERTED_OR_BAD = 8 186 MICRO_SD_NOT_FORMATTED = 9 187 MICRO_SD_FILE_COULD_NOT_BE_FOUND_OPENED = 10 188 MICRO_SD_UNKNOWN_ERROR = 11 189 MICRO_SD_FILE_COULD_NOT_BE_READ = 12 190 MICRO_SD_COULD_NOT_BE_WRITTEN = 13 191 FILE_HEADER_IS_INVALID = 14 192 MICRO_SD_FILE_IS_ALREADY_OPEN = 15 193 MICRO_SD_FILE_OPERATION_FAILED = 16 194 MICRO_SD_FILE_HAS_NOT_BEEN_OPENED = 17 195 GFX_STREAM_ALREADY_STARTED = 18 196 GFX_IS_OUT_OF_LCD_BOUNDS = 19 197 VIDEO_IS_NOT_OPEN_IN_SLOT = 20 198 GFX_STREAM_HAS_TIMED_OUT = 21 199 GPIO_NOT_SET_FOR_ATX_USE = 22 200 INTERFACE_NOT_ENABLED = 23 201 INTERFACE_NOT_AVAILABLE_ = 24 202 203 def __init__(self, packet_buffer_size: int = 4096): 204 self.dropped_packets = 0 205 self.packet_deque: deque[RawPacket] = deque(maxlen=packet_buffer_size) 206 207 # Start serial monitor thread 208 self.serial_port = LCDPort() 209 210 # Create log file 211 with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile: 212 csv.writer(csvfile).writerow(asdict(RawPacket())) 213 214 # Start monitor thread 215 self.daemon = threading.Thread(target=self.runtime, daemon=True) 216 self.daemon.start() 217 218 def runtime(self): 219 """Fetch packets as quickly as possible.""" 220 221 def fetch_packet() -> RawPacket: 222 """Decode serial stream to a valid packet.""" 223 while True: 224 # Fetch raw packet 225 try: 226 raw_packet = RawPacket( 227 command=self.serial_port.consume(timeout=0), 228 data_length=(length := self.serial_port.consume()), 229 data=bytes(self.serial_port.consume() for _ in range(length)), 230 crc=self.serial_port.consume() | (self.serial_port.consume() << 8), 231 ) 232 except TimeoutError: 233 logger.warning( 234 "Timeout when attempting to read packet. Possibly an invalid packet cause by dropped bytes." 235 ) 236 else: 237 # Validate CRC 238 logger.debug(f"Packet?: {asdict(raw_packet)}") 239 valid_packet = replace(raw_packet) # Calculate the expected CRC 240 valid_packet.calculate_crc() 241 if raw_packet.crc == valid_packet.crc: 242 logger.debug("got lcd packet") 243 self.serial_port.clear_history() 244 return raw_packet 245 logger.debug(f"Invalid CRC 0x{raw_packet.crc:04X}, expected 0x{valid_packet.crc:04X}.") 246 logger.debug(f"LCD Buffer: {self.serial_port.buffer}") 247 logger.debug(f"Discarding lcd byte: {self.serial_port.buffer[0]:02X}") 248 self.serial_port.index = 1 249 self.serial_port.clear_history() 250 251 while True: 252 packet = fetch_packet() 253 if len(self.packet_deque) == self.packet_deque.maxlen: # Deque is full 254 self.dropped_packets += 1 255 self.packet_deque.append(packet) # Save to deque 256 logger.debug(f"{len(self.packet_deque)} packet(s) in queue: {', '.join(map(str, self.packet_deque))}") 257 258 # Save to file 259 with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile: 260 csv.writer(csvfile).writerow(asdict(packet).values()) 261 262 time.sleep(0.1) # Packets only come in when a 263 264 def read(self, blocking: bool = True, timeout: float = 0) -> RawPacket | None: 265 """Get keys or acknowledgment.""" 266 start_time = time.perf_counter() 267 while not timeout or time.perf_counter() - start_time < timeout: 268 with suppress(IndexError): 269 return self.packet_deque.popleft() 270 if not blocking: 271 return None 272 time.sleep(0.1) # Packets only come in on user input 273 raise TimeoutError(f"A packet was not received after {timeout:.1f} seconds.") 274 275 def write(self, command_packet: RawPacket, timeout: float = 2): 276 """Send command to LCD""" 277 for attempt_number in range(self.MAX_ATTEMPTS): 278 self.serial_port.write(command_packet.to_bytes(), timeout) 279 with suppress(TimeoutError): 280 response = self.read(timeout=timeout) 281 if response and (0x40 | command_packet.command) == response.command: 282 break 283 if response and (response.command & 0xC0) == 0xC0: 284 error_enum = self.ResponseError(response.command & ((~0xC0) & 0xFF)) 285 error_message = error_enum.name.capitalize().replace("_", " ") 286 logger.warning(f"Received error response: {error_message}.") 287 break 288 logger.error(f"LCD response not received or invalid for attempt {attempt_number + 1}.") 289 time.sleep(0.1)
Manage LCD events.
203 def __init__(self, packet_buffer_size: int = 4096): 204 self.dropped_packets = 0 205 self.packet_deque: deque[RawPacket] = deque(maxlen=packet_buffer_size) 206 207 # Start serial monitor thread 208 self.serial_port = LCDPort() 209 210 # Create log file 211 with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile: 212 csv.writer(csvfile).writerow(asdict(RawPacket())) 213 214 # Start monitor thread 215 self.daemon = threading.Thread(target=self.runtime, daemon=True) 216 self.daemon.start()
218 def runtime(self): 219 """Fetch packets as quickly as possible.""" 220 221 def fetch_packet() -> RawPacket: 222 """Decode serial stream to a valid packet.""" 223 while True: 224 # Fetch raw packet 225 try: 226 raw_packet = RawPacket( 227 command=self.serial_port.consume(timeout=0), 228 data_length=(length := self.serial_port.consume()), 229 data=bytes(self.serial_port.consume() for _ in range(length)), 230 crc=self.serial_port.consume() | (self.serial_port.consume() << 8), 231 ) 232 except TimeoutError: 233 logger.warning( 234 "Timeout when attempting to read packet. Possibly an invalid packet cause by dropped bytes." 235 ) 236 else: 237 # Validate CRC 238 logger.debug(f"Packet?: {asdict(raw_packet)}") 239 valid_packet = replace(raw_packet) # Calculate the expected CRC 240 valid_packet.calculate_crc() 241 if raw_packet.crc == valid_packet.crc: 242 logger.debug("got lcd packet") 243 self.serial_port.clear_history() 244 return raw_packet 245 logger.debug(f"Invalid CRC 0x{raw_packet.crc:04X}, expected 0x{valid_packet.crc:04X}.") 246 logger.debug(f"LCD Buffer: {self.serial_port.buffer}") 247 logger.debug(f"Discarding lcd byte: {self.serial_port.buffer[0]:02X}") 248 self.serial_port.index = 1 249 self.serial_port.clear_history() 250 251 while True: 252 packet = fetch_packet() 253 if len(self.packet_deque) == self.packet_deque.maxlen: # Deque is full 254 self.dropped_packets += 1 255 self.packet_deque.append(packet) # Save to deque 256 logger.debug(f"{len(self.packet_deque)} packet(s) in queue: {', '.join(map(str, self.packet_deque))}") 257 258 # Save to file 259 with open(LOG_FILE.with_suffix(".csv"), "a", encoding="UTF-8") as csvfile: 260 csv.writer(csvfile).writerow(asdict(packet).values()) 261 262 time.sleep(0.1) # Packets only come in when a
Fetch packets as quickly as possible.
264 def read(self, blocking: bool = True, timeout: float = 0) -> RawPacket | None: 265 """Get keys or acknowledgment.""" 266 start_time = time.perf_counter() 267 while not timeout or time.perf_counter() - start_time < timeout: 268 with suppress(IndexError): 269 return self.packet_deque.popleft() 270 if not blocking: 271 return None 272 time.sleep(0.1) # Packets only come in on user input 273 raise TimeoutError(f"A packet was not received after {timeout:.1f} seconds.")
Get keys or acknowledgment.
275 def write(self, command_packet: RawPacket, timeout: float = 2): 276 """Send command to LCD""" 277 for attempt_number in range(self.MAX_ATTEMPTS): 278 self.serial_port.write(command_packet.to_bytes(), timeout) 279 with suppress(TimeoutError): 280 response = self.read(timeout=timeout) 281 if response and (0x40 | command_packet.command) == response.command: 282 break 283 if response and (response.command & 0xC0) == 0xC0: 284 error_enum = self.ResponseError(response.command & ((~0xC0) & 0xFF)) 285 error_message = error_enum.name.capitalize().replace("_", " ") 286 logger.warning(f"Received error response: {error_message}.") 287 break 288 logger.error(f"LCD response not received or invalid for attempt {attempt_number + 1}.") 289 time.sleep(0.1)
Send command to LCD
175 class ResponseError(IntEnum): 176 """Possible LCD error responses.""" 177 178 UNKNOWN_ERROR = 1 179 UNKNOWN_COMMAND = 2 180 INVALID_COMMAND_LENGTH_OPTIONS = 3 181 WRITING_FLASH_MEM_FAILED = 4 182 READING_FLASH_MEM_FAILED = 5 183 CFA_FBSCAB_NOT_PRESENT_AT_INDEX = 6 184 CFA_FBSCAB_DID_NOT_REPLY_TO_REG = 7 185 MICRO_SD_NOT_INSERTED_OR_BAD = 8 186 MICRO_SD_NOT_FORMATTED = 9 187 MICRO_SD_FILE_COULD_NOT_BE_FOUND_OPENED = 10 188 MICRO_SD_UNKNOWN_ERROR = 11 189 MICRO_SD_FILE_COULD_NOT_BE_READ = 12 190 MICRO_SD_COULD_NOT_BE_WRITTEN = 13 191 FILE_HEADER_IS_INVALID = 14 192 MICRO_SD_FILE_IS_ALREADY_OPEN = 15 193 MICRO_SD_FILE_OPERATION_FAILED = 16 194 MICRO_SD_FILE_HAS_NOT_BEEN_OPENED = 17 195 GFX_STREAM_ALREADY_STARTED = 18 196 GFX_IS_OUT_OF_LCD_BOUNDS = 19 197 VIDEO_IS_NOT_OPEN_IN_SLOT = 20 198 GFX_STREAM_HAS_TIMED_OUT = 21 199 GPIO_NOT_SET_FOR_ATX_USE = 22 200 INTERFACE_NOT_ENABLED = 23 201 INTERFACE_NOT_AVAILABLE_ = 24
Possible LCD error responses.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- is_integer
- real
- imag
- numerator
- denominator
292class KeyCode(IntEnum): 293 """The key pressed on the keypad.""" 294 295 KEY_UP_PRESS = 1 296 KEY_DOWN_PRESS = 2 297 KEY_LEFT_PRESS = 3 298 KEY_RIGHT_PRESS = 4 299 KEY_ENTER_PRESS = 5 300 KEY_EXIT_PRESS = 6 301 KEY_UP_RELEASE = 7 302 KEY_DOWN_RELEASE = 8 303 KEY_LEFT_RELEASE = 9 304 KEY_RIGHT_RELEASE = 10 305 KEY_ENTER_RELEASE = 11 306 KEY_EXIT_RELEASE = 12
The key pressed on the keypad.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- is_integer
- real
- imag
- numerator
- denominator
309class LEDColor(IntEnum): 310 """The LED color.""" 311 312 OFF = 0 313 RED = 1 314 GREEN = 2 315 YELLOW = 3
The LED color.
Inherited Members
- enum.Enum
- name
- value
- builtins.int
- conjugate
- bit_length
- bit_count
- to_bytes
- from_bytes
- as_integer_ratio
- is_integer
- real
- imag
- numerator
- denominator
318class UIHandler: 319 """Manage LCD UI.""" 320 321 FREQUENCY = 20 # How often to check for input / refresh screen 322 323 def __init__(self, lcd_monitor: LCDMonitor): 324 self.lcd_monitor = lcd_monitor 325 self.page: PageType = TestMenuType() 326 self.stack: list[Any] = [] # Allows pages to communicate 327 # self.reset() 328 self.clear_screen() 329 self.page.draw(self) 330 331 # Progress queue for communicating with the test 332 class QueueManager(BaseManager): 333 """Manager for multiprocess queue.""" 334 335 self._backing_queue: Queue[dict[str, str | float]] = Queue() 336 QueueManager.register("get_queue", callable=lambda: self._backing_queue) 337 self._queue_manager = QueueManager(address=("", PORT), authkey=AUTH_KEY) 338 self._queue_manager.start() # pylint: disable=consider-using-with 339 self.queue: Queue[dict[str, str | float]] = self._queue_manager.get_queue() # type: ignore[attr-defined] # pylint: disable=no-member 340 341 def run_forever(self): 342 """Process LCD events forever.""" 343 while True: 344 try: 345 # Check if a test is running and show progress if so 346 if not isinstance(self.page, ProgressType): 347 current_name = Path("/proc/self/comm").read_text(encoding="ascii") 348 for pid in Path("/proc").glob("[0-9]*"): 349 with suppress(FileNotFoundError): 350 process_name = (pid / "comm").read_text(encoding="ascii") 351 if process_name != current_name and process_name.startswith("HITL") and pid.stem.isdigit(): 352 logger.debug(f"Found running test at PID: {pid.stem}") 353 self.stack.append(int(pid.stem)) 354 self.page = ProgressType() 355 356 # Check for new key inputs 357 if (packet := self.lcd_monitor.read(blocking=False)) and packet.command == 0x80: 358 self.key_event(packet) 359 self.page.refresh(self) 360 361 time.sleep(1 / self.FREQUENCY) # Don't waste cycles when nothing happens 99% of the time 362 except Exception: # pylint: disable=broad-exception-caught 363 logger.error(traceback.format_exc()) 364 time.sleep(1) 365 366 def key_event(self, packet: RawPacket): 367 """Process key event.""" 368 self.page.process_key(self, KeyCode(packet.data[0] if packet.data else 0)) 369 self.page.draw(self) 370 371 def draw_image(self, filename: str): 372 """Draw an image to the LCD.""" 373 raw_filename = filename.encode("latin1") # Map bytes 1-to-1 374 enable_transparency = False # pixel value 0 is transparent 375 invert_image_shade = False # will invert transparency value also 376 x_start = 0 377 y_start = 0 378 packet = RawPacket( 379 command=0x28, 380 data_length=4 + len(raw_filename or "EE"), 381 data=bytes([3, enable_transparency | (invert_image_shade << 1), x_start, y_start, *raw_filename]), 382 ) 383 packet.calculate_crc() 384 self.lcd_monitor.write(packet) 385 386 def write_text(self, row: int, column: int, text: str): 387 """Write text to the LCD.""" 388 raw_text = text.encode("latin1") # Map bytes 1-to-1 389 packet = RawPacket( 390 command=0x1F, 391 data_length=2 + len(raw_text or "E"), 392 data=bytes([column, row, *raw_text]), 393 ) 394 packet.calculate_crc() 395 self.lcd_monitor.write(packet) 396 397 def clear_screen(self): 398 """Clear text from the screen.""" 399 packet = RawPacket(command=0x06, data_length=0) 400 packet.calculate_crc() 401 self.lcd_monitor.write(packet) 402 403 def reset(self): 404 """Reset the LCD device.""" 405 packet = RawPacket(command=0x05, data_length=3, data=bytes([8, 25, 48])) 406 packet.calculate_crc() 407 self.lcd_monitor.write(packet) 408 time.sleep(4) # May not respond for up to 3 seconds 409 410 def set_led(self, colors: tuple[LEDColor, LEDColor, LEDColor, LEDColor]): 411 """Set the on-board LCD values.""" 412 for led_id in range(4): 413 packet = RawPacket( 414 command=0x22, data_length=2, data=bytes([5 + led_id * 2, 100 * bool(colors[led_id] & 2)]) 415 ) 416 packet.calculate_crc() 417 self.lcd_monitor.write(packet) 418 packet = RawPacket( 419 command=0x22, data_length=2, data=bytes([6 + led_id * 2, 100 * bool(colors[led_id] & 1)]) 420 ) 421 packet.calculate_crc() 422 self.lcd_monitor.write(packet) 423 424 def save_state(self): 425 """Save state as boot state""" 426 packet = RawPacket(command=0x4, data_length=0) 427 packet.calculate_crc() 428 self.lcd_monitor.write(packet)
Manage LCD UI.
323 def __init__(self, lcd_monitor: LCDMonitor): 324 self.lcd_monitor = lcd_monitor 325 self.page: PageType = TestMenuType() 326 self.stack: list[Any] = [] # Allows pages to communicate 327 # self.reset() 328 self.clear_screen() 329 self.page.draw(self) 330 331 # Progress queue for communicating with the test 332 class QueueManager(BaseManager): 333 """Manager for multiprocess queue.""" 334 335 self._backing_queue: Queue[dict[str, str | float]] = Queue() 336 QueueManager.register("get_queue", callable=lambda: self._backing_queue) 337 self._queue_manager = QueueManager(address=("", PORT), authkey=AUTH_KEY) 338 self._queue_manager.start() # pylint: disable=consider-using-with 339 self.queue: Queue[dict[str, str | float]] = self._queue_manager.get_queue() # type: ignore[attr-defined] # pylint: disable=no-member
341 def run_forever(self): 342 """Process LCD events forever.""" 343 while True: 344 try: 345 # Check if a test is running and show progress if so 346 if not isinstance(self.page, ProgressType): 347 current_name = Path("/proc/self/comm").read_text(encoding="ascii") 348 for pid in Path("/proc").glob("[0-9]*"): 349 with suppress(FileNotFoundError): 350 process_name = (pid / "comm").read_text(encoding="ascii") 351 if process_name != current_name and process_name.startswith("HITL") and pid.stem.isdigit(): 352 logger.debug(f"Found running test at PID: {pid.stem}") 353 self.stack.append(int(pid.stem)) 354 self.page = ProgressType() 355 356 # Check for new key inputs 357 if (packet := self.lcd_monitor.read(blocking=False)) and packet.command == 0x80: 358 self.key_event(packet) 359 self.page.refresh(self) 360 361 time.sleep(1 / self.FREQUENCY) # Don't waste cycles when nothing happens 99% of the time 362 except Exception: # pylint: disable=broad-exception-caught 363 logger.error(traceback.format_exc()) 364 time.sleep(1)
Process LCD events forever.
366 def key_event(self, packet: RawPacket): 367 """Process key event.""" 368 self.page.process_key(self, KeyCode(packet.data[0] if packet.data else 0)) 369 self.page.draw(self)
Process key event.
371 def draw_image(self, filename: str): 372 """Draw an image to the LCD.""" 373 raw_filename = filename.encode("latin1") # Map bytes 1-to-1 374 enable_transparency = False # pixel value 0 is transparent 375 invert_image_shade = False # will invert transparency value also 376 x_start = 0 377 y_start = 0 378 packet = RawPacket( 379 command=0x28, 380 data_length=4 + len(raw_filename or "EE"), 381 data=bytes([3, enable_transparency | (invert_image_shade << 1), x_start, y_start, *raw_filename]), 382 ) 383 packet.calculate_crc() 384 self.lcd_monitor.write(packet)
Draw an image to the LCD.
386 def write_text(self, row: int, column: int, text: str): 387 """Write text to the LCD.""" 388 raw_text = text.encode("latin1") # Map bytes 1-to-1 389 packet = RawPacket( 390 command=0x1F, 391 data_length=2 + len(raw_text or "E"), 392 data=bytes([column, row, *raw_text]), 393 ) 394 packet.calculate_crc() 395 self.lcd_monitor.write(packet)
Write text to the LCD.
397 def clear_screen(self): 398 """Clear text from the screen.""" 399 packet = RawPacket(command=0x06, data_length=0) 400 packet.calculate_crc() 401 self.lcd_monitor.write(packet)
Clear text from the screen.
403 def reset(self): 404 """Reset the LCD device.""" 405 packet = RawPacket(command=0x05, data_length=3, data=bytes([8, 25, 48])) 406 packet.calculate_crc() 407 self.lcd_monitor.write(packet) 408 time.sleep(4) # May not respond for up to 3 seconds
Reset the LCD device.
410 def set_led(self, colors: tuple[LEDColor, LEDColor, LEDColor, LEDColor]): 411 """Set the on-board LCD values.""" 412 for led_id in range(4): 413 packet = RawPacket( 414 command=0x22, data_length=2, data=bytes([5 + led_id * 2, 100 * bool(colors[led_id] & 2)]) 415 ) 416 packet.calculate_crc() 417 self.lcd_monitor.write(packet) 418 packet = RawPacket( 419 command=0x22, data_length=2, data=bytes([6 + led_id * 2, 100 * bool(colors[led_id] & 1)]) 420 ) 421 packet.calculate_crc() 422 self.lcd_monitor.write(packet)
Set the on-board LCD values.
431class PageType: 432 """An LCD page.""" 433 434 title = "" 435 436 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 437 """Perform some action based on the key.""" 438 439 def draw(self, lcd_manager: UIHandler): 440 """Draw to screen.""" 441 442 def refresh(self, lcd_manager: UIHandler): 443 """Update internal values. Called continuously."""
An LCD page.
436 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 437 """Perform some action based on the key."""
Perform some action based on the key.
446class ProfileType(PageType): 447 """Test selection menu""" 448 449 title = "Profile Selection" 450 451 def __init__(self): 452 profile_paths = (profile_path for profile_path in Path("/var/www/logs/profile/").glob("*.json")) 453 profile_paths = sorted(profile_paths, key=lambda path: path.stat().st_mtime) 454 self.items = {path.stem: path for path in profile_paths} 455 self.index = 0 456 try: 457 self.version = Path("/var/www/.version").read_text(encoding="utf-8").strip("\r\n") 458 except Exception: # pylint: disable=broad-exception-caught 459 logger.error(traceback.format_exc()) 460 self.version = "1.0.1" 461 462 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 463 """Handle profile selection UI.""" 464 if key is KeyCode.KEY_RIGHT_PRESS: 465 self.index = (self.index + 1) % len(self.items) 466 elif key is KeyCode.KEY_LEFT_PRESS: 467 self.index = (self.index - 1) % len(self.items) 468 elif key is KeyCode.KEY_ENTER_PRESS: 469 selected_profile = list(self.items.values())[self.index] 470 command = f"/opt/hitl/test fingerprint.plan -c -v -D PROFILE={selected_profile}" 471 lcd_manager.stack.append( 472 subprocess.Popen( # pylint: disable=consider-using-with 473 shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True 474 ).pid 475 ) 476 lcd_manager.page = ProgressType() 477 elif key is KeyCode.KEY_EXIT_PRESS: 478 lcd_manager.page = TestMenuType() 479 480 def draw(self, lcd_manager: UIHandler): 481 """Draw item to screen.""" 482 max_text_width = WIDTH - 2 483 manufacturer_name, _, serial_id = list(self.items)[self.index].partition("_6T_") 484 manufacturer_name = manufacturer_name[:WIDTH] 485 item_name = serial_id[:max_text_width] 486 487 lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}") 488 lcd_manager.write_text(1, 0, f"{manufacturer_name:<20}") 489 lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>") 490 lcd_manager.write_text(3, 0, f"v{self.version:<19}")
Test selection menu
462 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 463 """Handle profile selection UI.""" 464 if key is KeyCode.KEY_RIGHT_PRESS: 465 self.index = (self.index + 1) % len(self.items) 466 elif key is KeyCode.KEY_LEFT_PRESS: 467 self.index = (self.index - 1) % len(self.items) 468 elif key is KeyCode.KEY_ENTER_PRESS: 469 selected_profile = list(self.items.values())[self.index] 470 command = f"/opt/hitl/test fingerprint.plan -c -v -D PROFILE={selected_profile}" 471 lcd_manager.stack.append( 472 subprocess.Popen( # pylint: disable=consider-using-with 473 shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True 474 ).pid 475 ) 476 lcd_manager.page = ProgressType() 477 elif key is KeyCode.KEY_EXIT_PRESS: 478 lcd_manager.page = TestMenuType()
Handle profile selection UI.
480 def draw(self, lcd_manager: UIHandler): 481 """Draw item to screen.""" 482 max_text_width = WIDTH - 2 483 manufacturer_name, _, serial_id = list(self.items)[self.index].partition("_6T_") 484 manufacturer_name = manufacturer_name[:WIDTH] 485 item_name = serial_id[:max_text_width] 486 487 lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}") 488 lcd_manager.write_text(1, 0, f"{manufacturer_name:<20}") 489 lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>") 490 lcd_manager.write_text(3, 0, f"v{self.version:<19}")
Draw item to screen.
493class TestMenuType(PageType): 494 """Test selection menu""" 495 496 title = "Test Selection" 497 498 def __init__(self): 499 self.items = { 500 "Generate profile": self.generate_profile, 501 "Compare profiles": self.compare_profiles, 502 } 503 self.index = 0 504 try: 505 self.version = Path("/var/www/.version").read_text(encoding="utf-8").strip("\r\n") 506 except Exception: # pylint: disable=broad-exception-caught 507 logger.error(traceback.format_exc()) 508 self.version = "1.0.1" 509 510 def generate_profile(self, lcd_manager: UIHandler): 511 """Run profile generation test and go to progress page.""" 512 command = "/opt/hitl/test fingerprint.plan -c -v" 513 lcd_manager.stack.append( 514 subprocess.Popen( # pylint: disable=consider-using-with 515 shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True 516 ).pid 517 ) 518 lcd_manager.page = ProgressType() 519 520 def compare_profiles(self, lcd_manager: UIHandler): 521 """Go to profile selection page.""" 522 lcd_manager.page = ProfileType() 523 524 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 525 """Handle profile selection UI.""" 526 if key is KeyCode.KEY_RIGHT_PRESS: 527 self.index = (self.index + 1) % len(self.items) 528 elif key is KeyCode.KEY_LEFT_PRESS: 529 self.index = (self.index - 1) % len(self.items) 530 elif key is KeyCode.KEY_ENTER_PRESS: 531 self.items[list(self.items)[self.index]](lcd_manager) 532 elif key is KeyCode.KEY_UP_PRESS: 533 lcd_manager.page = LogoType() 534 535 def draw(self, lcd_manager: UIHandler): 536 """Draw item to screen.""" 537 max_text_width = WIDTH - 2 538 item_name = list(self.items)[self.index][:max_text_width] 539 540 lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}") 541 lcd_manager.write_text(1, 0, " " * WIDTH) 542 lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>") 543 lcd_manager.write_text(3, 0, f"v{self.version:<19}") 544 545 lcd_manager.set_led((LEDColor.RED, LEDColor.OFF, LEDColor.OFF, LEDColor.OFF))
Test selection menu
510 def generate_profile(self, lcd_manager: UIHandler): 511 """Run profile generation test and go to progress page.""" 512 command = "/opt/hitl/test fingerprint.plan -c -v" 513 lcd_manager.stack.append( 514 subprocess.Popen( # pylint: disable=consider-using-with 515 shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True 516 ).pid 517 ) 518 lcd_manager.page = ProgressType()
Run profile generation test and go to progress page.
520 def compare_profiles(self, lcd_manager: UIHandler): 521 """Go to profile selection page.""" 522 lcd_manager.page = ProfileType()
Go to profile selection page.
524 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 525 """Handle profile selection UI.""" 526 if key is KeyCode.KEY_RIGHT_PRESS: 527 self.index = (self.index + 1) % len(self.items) 528 elif key is KeyCode.KEY_LEFT_PRESS: 529 self.index = (self.index - 1) % len(self.items) 530 elif key is KeyCode.KEY_ENTER_PRESS: 531 self.items[list(self.items)[self.index]](lcd_manager) 532 elif key is KeyCode.KEY_UP_PRESS: 533 lcd_manager.page = LogoType()
Handle profile selection UI.
535 def draw(self, lcd_manager: UIHandler): 536 """Draw item to screen.""" 537 max_text_width = WIDTH - 2 538 item_name = list(self.items)[self.index][:max_text_width] 539 540 lcd_manager.write_text(0, 0, f"{self.title:=^{WIDTH}}") 541 lcd_manager.write_text(1, 0, " " * WIDTH) 542 lcd_manager.write_text(2, 0, f"<{item_name:^{max_text_width}}>") 543 lcd_manager.write_text(3, 0, f"v{self.version:<19}") 544 545 lcd_manager.set_led((LEDColor.RED, LEDColor.OFF, LEDColor.OFF, LEDColor.OFF))
Draw item to screen.
548class ProgressType(PageType): 549 """Test progress.""" 550 551 SCROLL_RATE = 2 # Frequency of screen text scroll 552 553 def __init__(self) -> None: 554 self.data: dict[str, str | float] = {} 555 self.text_offset = 0 556 self.last_scroll_time = 0.0 557 558 def _pid_is_alive(self, pid: int): 559 """Check if PID exists.""" 560 try: 561 os.kill(pid, 0) 562 except OSError as error: 563 match (error.errno): 564 case errno.ESRCH: # ESRCH == No such process 565 return False 566 case errno.EPERM: # EPERM clearly means there's a process to deny access to 567 return True 568 case _: 569 raise 570 else: 571 return True 572 573 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 574 if key is KeyCode.KEY_EXIT_PRESS: 575 lcd_manager.stack.pop() 576 command = "/opt/hitl/kill" 577 subprocess.Popen( # pylint: disable=consider-using-with 578 shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True 579 ) 580 lcd_manager.page = TestMenuType() 581 582 def draw(self, lcd_manager: UIHandler): 583 """Draw item to screen.""" 584 if isinstance(lcd_manager.stack[-1], int) and self._pid_is_alive(lcd_manager.stack[-1]): 585 if self.data: 586 # Set LED color 587 lcd_manager.set_led( 588 ( 589 LEDColor.RED if self.data["name"] == "Test Completed" else LEDColor.GREEN, 590 LEDColor.OFF, 591 LEDColor.OFF, 592 LEDColor.OFF, 593 ) 594 ) 595 # Apply a marquee effect to the test title text 596 name = str(self.data["name"]) + " " 597 name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20] 598 ete_minutes = int(self.data["ete"]) 599 ete = f"{self.data.get('mode', '???????')}, ETR: {ete_minutes} min" 600 pass_fail = f"{self.data['passed']:>2} Passed,{self.data['failed']:>2} Failed" 601 prog_bar = f"\xFA{'=' * int(18 * self.data['progress']):-<18}\xFC" 602 lcd_manager.write_text(0, 0, f"{name:<20}") 603 lcd_manager.write_text(1, 0, f"{ete:<20}") 604 lcd_manager.write_text(2, 0, f"{pass_fail:^20}") 605 lcd_manager.write_text(3, 0, prog_bar) 606 else: 607 lcd_manager.clear_screen() 608 lcd_manager.write_text(2, 0, f"{'Preparing...':^20}") 609 else: 610 lcd_manager.stack.pop() 611 lcd_manager.page = TestMenuType() 612 613 def refresh(self, lcd_manager: UIHandler): 614 """Output test progress.""" 615 old_data = self.data 616 with suppress(queue.Empty, KeyError): 617 self.data = lcd_manager.queue.get_nowait() 618 self.data["name"] = str(self.data["name"]).partition(" - ")[0].removeprefix("Test") 619 620 if self.data: 621 name_length = len(str(self.data.get("name", ""))) 622 if old_data.get("name") != self.data["name"]: # New test 623 self.text_offset = -1 # Reset offset when new title is added 624 self.draw(lcd_manager) 625 elif name_length > 20 and time.perf_counter() - self.last_scroll_time > 1 / self.SCROLL_RATE: # Scroll name 626 self.last_scroll_time = time.perf_counter() 627 self.text_offset = (self.text_offset + 1) % name_length 628 name = str(self.data["name"]) + " " 629 name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20] 630 lcd_manager.write_text(0, 0, f"{name:<20}") 631 elif old_data != self.data: # Don't draw when not needed 632 self.draw(lcd_manager)
Test progress.
573 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 574 if key is KeyCode.KEY_EXIT_PRESS: 575 lcd_manager.stack.pop() 576 command = "/opt/hitl/kill" 577 subprocess.Popen( # pylint: disable=consider-using-with 578 shlex.split(command), stdout=DEVNULL, stderr=DEVNULL, start_new_session=True 579 ) 580 lcd_manager.page = TestMenuType()
Perform some action based on the key.
582 def draw(self, lcd_manager: UIHandler): 583 """Draw item to screen.""" 584 if isinstance(lcd_manager.stack[-1], int) and self._pid_is_alive(lcd_manager.stack[-1]): 585 if self.data: 586 # Set LED color 587 lcd_manager.set_led( 588 ( 589 LEDColor.RED if self.data["name"] == "Test Completed" else LEDColor.GREEN, 590 LEDColor.OFF, 591 LEDColor.OFF, 592 LEDColor.OFF, 593 ) 594 ) 595 # Apply a marquee effect to the test title text 596 name = str(self.data["name"]) + " " 597 name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20] 598 ete_minutes = int(self.data["ete"]) 599 ete = f"{self.data.get('mode', '???????')}, ETR: {ete_minutes} min" 600 pass_fail = f"{self.data['passed']:>2} Passed,{self.data['failed']:>2} Failed" 601 prog_bar = f"\xFA{'=' * int(18 * self.data['progress']):-<18}\xFC" 602 lcd_manager.write_text(0, 0, f"{name:<20}") 603 lcd_manager.write_text(1, 0, f"{ete:<20}") 604 lcd_manager.write_text(2, 0, f"{pass_fail:^20}") 605 lcd_manager.write_text(3, 0, prog_bar) 606 else: 607 lcd_manager.clear_screen() 608 lcd_manager.write_text(2, 0, f"{'Preparing...':^20}") 609 else: 610 lcd_manager.stack.pop() 611 lcd_manager.page = TestMenuType()
Draw item to screen.
613 def refresh(self, lcd_manager: UIHandler): 614 """Output test progress.""" 615 old_data = self.data 616 with suppress(queue.Empty, KeyError): 617 self.data = lcd_manager.queue.get_nowait() 618 self.data["name"] = str(self.data["name"]).partition(" - ")[0].removeprefix("Test") 619 620 if self.data: 621 name_length = len(str(self.data.get("name", ""))) 622 if old_data.get("name") != self.data["name"]: # New test 623 self.text_offset = -1 # Reset offset when new title is added 624 self.draw(lcd_manager) 625 elif name_length > 20 and time.perf_counter() - self.last_scroll_time > 1 / self.SCROLL_RATE: # Scroll name 626 self.last_scroll_time = time.perf_counter() 627 self.text_offset = (self.text_offset + 1) % name_length 628 name = str(self.data["name"]) + " " 629 name = (name[self.text_offset % len(name) :] + name[: self.text_offset % len(name)])[:20] 630 lcd_manager.write_text(0, 0, f"{name:<20}") 631 elif old_data != self.data: # Don't draw when not needed 632 self.draw(lcd_manager)
Output test progress.
635class LogoType(PageType): 636 """Display logo.""" 637 638 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 639 """Handle profile selection UI.""" 640 if key <= KeyCode.KEY_EXIT_PRESS: # Any key press 641 lcd_manager.save_state() 642 lcd_manager.page = TestMenuType() 643 644 def draw(self, lcd_manager: UIHandler): 645 """Draw image to screen.""" 646 lcd_manager.draw_image("TAFLogo.bmp")
Display logo.
638 def process_key(self, lcd_manager: UIHandler, key: KeyCode): 639 """Handle profile selection UI.""" 640 if key <= KeyCode.KEY_EXIT_PRESS: # Any key press 641 lcd_manager.save_state() 642 lcd_manager.page = TestMenuType()
Handle profile selection UI.
649def test_main(): 650 """Set up logging and run manager.""" 651 logger.setLevel(logging.DEBUG) 652 formatter = logging.Formatter( 653 "%(asctime)s - %(name)s - %(levelname)s - %(message)s", datefmt="%m/%d/%Y %I:%M:%S %p" 654 ) 655 656 # Set up console logging 657 console_logger = logging.StreamHandler() 658 console_logger.setLevel(logging.DEBUG) 659 console_logger.setFormatter(formatter) 660 console_logger.terminator = "\r\n" 661 logger.addHandler(console_logger) 662 663 # Set up file logging 664 LOG_FILE.parent.mkdir(parents=True, exist_ok=True) 665 file_logger = logging.FileHandler(LOG_FILE) 666 file_logger.setLevel(logging.DEBUG) 667 file_logger.setFormatter(formatter) 668 logger.addHandler(file_logger) 669 logger.debug(f"Log Filename: {LOG_FILE}") 670 671 monitor = LCDMonitor() # Begin monitoring for packets 672 screen = UIHandler(monitor) # Manage LCD UI 673 screen.run_forever()
Set up logging and run manager.