hitl_tester.modules.file_lock
Provides access to a basic file lock for synchronizing multiple processes.
(c) 2020-2024 TurnAround Factor, Inc.
CUI DISTRIBUTION CONTROL Controlled by: DLA J68 R&D SBIP CUI Category: Small Business Research and Technology Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS POC: GOV SBIP Program Manager Denise Price, 571-767-0111 Distribution authorized to U.S. Government Agencies only, to protect information not owned by the U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, Fort Belvoir, VA 22060-6221
SBIR DATA RIGHTS Contract No.:SP4701-23-C-0083 Contractor Name: TurnAround Factor, Inc. Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 Expiration of SBIR Data Rights Period: September 24, 2029 The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause contained in the above identified contract. No restrictions apply after the expiration date shown above. Any reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce the markings.
1""" 2Provides access to a basic file lock for synchronizing multiple processes. 3 4(c) 2020-2024 TurnAround Factor, Inc. 5 6CUI DISTRIBUTION CONTROL 7Controlled by: DLA J68 R&D SBIP 8CUI Category: Small Business Research and Technology 9Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS 10POC: GOV SBIP Program Manager Denise Price, 571-767-0111 11Distribution authorized to U.S. Government Agencies only, to protect information not owned by the 12U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that 13it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests 14for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, 15Fort Belvoir, VA 22060-6221 16 17SBIR DATA RIGHTS 18Contract No.:SP4701-23-C-0083 19Contractor Name: TurnAround Factor, Inc. 20Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 21Expiration of SBIR Data Rights Period: September 24, 2029 22The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer 23software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights 24in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause 25contained in the above identified contract. No restrictions apply after the expiration date shown above. Any 26reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce 27the markings. 28""" 29 30from __future__ import annotations 31 32import _thread 33import fcntl 34import shutil 35import signal 36import threading 37import time 38import traceback 39import uuid 40from pathlib import Path 41from typing import cast, Union, TypedDict 42 43import pyvisa 44from pyvisa.resources import MessageBasedResource 45 46from hitl_tester.modules.logger import logger 47 48MAX_ATTEMPTS = 4 49ATTEMPT_WAIT = 1 50 51 52class DelaySIGINT: 53 """Delay processing of SIGINT.""" 54 55 BUFFER_SIZE = 3 # Raise after buffer is full 56 57 def __init__(self): 58 self.signal_received_count = 0 59 self.old_handle = None 60 61 def signal_recorder(self, _signo, _stack_frame): 62 """Record that a signal was received.""" 63 self.signal_received_count += 1 64 remaining = DelaySIGINT.BUFFER_SIZE - self.signal_received_count 65 logger.write_critical_to_report( 66 f"Delaying {signal.Signals(_signo).name}, " 67 f"press CTRL-C {remaining} more time(s) to trigger immediately (unsafe)." 68 ) 69 if self.signal_received_count >= DelaySIGINT.BUFFER_SIZE: 70 self.try_raise() 71 72 def suppress(self): 73 """Suppress, but record, and SIGINT received.""" 74 if threading.current_thread() is threading.main_thread(): # Signals only work in main thread 75 self.signal_received_count = 0 76 self.old_handle = signal.signal(signal.SIGINT, self.signal_recorder) 77 78 def try_raise(self): 79 """Raise the signal if it was received.""" 80 if threading.current_thread() is threading.main_thread(): # Signals only work in main thread 81 signal.signal(signal.SIGINT, self.old_handle) 82 if self.signal_received_count: 83 signal.raise_signal(signal.SIGINT) 84 85 86delay_sig = DelaySIGINT() 87 88 89class ThreadLock: 90 """A deterministic thread lock for synchronization.""" 91 92 ThreadType = Union[_thread.LockType, _thread.RLock] # type: ignore[name-defined] 93 THREAD_LOCK_REGISTRY: dict[str, ThreadType] = {} 94 95 def __init__(self, lock_name: str, reentrant: bool = False): 96 lock: type[ThreadLock.ThreadType] = threading.RLock if reentrant else threading.Lock 97 if lock is not type(ThreadLock.THREAD_LOCK_REGISTRY.get(lock_name)): 98 ThreadLock.THREAD_LOCK_REGISTRY[lock_name] = lock() 99 self._lock = ThreadLock.THREAD_LOCK_REGISTRY[lock_name] 100 101 def acquire(self, blocking: bool = True, timeout: float = -1) -> bool: 102 """Acquire thread lock.""" 103 return self._lock.acquire(blocking, timeout) 104 105 def release(self): 106 """Release thread lock.""" 107 self._lock.release() 108 109 110class FileLock: 111 """A file lock for process/thread synchronization.""" 112 113 BASE_PATH = Path("/run/lock/hitl/") # In-memory temporary path 114 115 def __init__(self, lock_name: str): 116 # Create files/directory 117 if not FileLock.BASE_PATH.exists(): 118 FileLock.BASE_PATH.mkdir(mode=0o777) 119 try: 120 shutil.chown(FileLock.BASE_PATH, group="plugdev") 121 except PermissionError: 122 logger.write_error_to_report( 123 f"Could not change {FileLock.BASE_PATH} group to plugdev. May cause permission issues." 124 ) 125 126 lock_path = FileLock.BASE_PATH / f"LCK..{lock_name}" 127 for i in range(1, MAX_ATTEMPTS + 1): 128 try: 129 if not lock_path.exists(): 130 lock_path.touch(mode=0o444) 131 self._lock_fd = lock_path.open("r", encoding="ascii") # pylint: disable=consider-using-with 132 except PermissionError: 133 logger.write_error_to_report(f"Error opening lock (attempt {i}): {traceback.format_exc()}") 134 time.sleep(ATTEMPT_WAIT) 135 else: 136 break 137 else: 138 logger.write_critical_to_report(f"Failed to open lock file after {MAX_ATTEMPTS} attempts.") 139 raise PermissionError(str(lock_path)) 140 141 # Get locks 142 self._thread_lock = ThreadLock(lock_name) 143 self._locked = False 144 145 def acquire(self, blocking: bool = True, shared: bool = False) -> bool: 146 """Acquire lock file.""" 147 if self._thread_lock.acquire(blocking): 148 try: 149 fcntl.flock( 150 self._lock_fd, (fcntl.LOCK_SH if shared else fcntl.LOCK_EX) | (0 if blocking else fcntl.LOCK_NB) 151 ) 152 except OSError: 153 return False 154 self._locked = True 155 return True 156 return False 157 158 def release(self): 159 """Release lock file.""" 160 self._locked = False 161 fcntl.flock(self._lock_fd, fcntl.LOCK_UN) 162 self._thread_lock.release() 163 164 def locked(self) -> bool: 165 """Check if the file is locked.""" 166 if self._locked: 167 return True 168 if unlocked := self.acquire(blocking=False): 169 self.release() 170 return not unlocked 171 172 def __enter__(self): 173 return self.acquire() 174 175 def __exit__(self, exc_type, exc_value, exc_traceback): 176 self.release() 177 178 179class FileEvent(FileLock): 180 """Manage a multiprocess event using file-locks.""" 181 182 def is_set(self) -> bool: 183 """Check if the event is set or clear.""" 184 return super().locked() 185 186 def set(self): 187 """Set the event.""" 188 super().acquire(shared=True) 189 190 def clear(self): 191 """Clear the event. This will only work if we were the ones that set it.""" 192 if self._locked: 193 super().release() 194 195 def wait(self, timeout: float | None = None) -> bool: 196 """Wait until the event is set, returning True if it was or False if the timeout expired.""" 197 start_time = time.perf_counter() 198 while not (was_set := self.is_set()) and (timeout is None or time.perf_counter() - start_time <= timeout): 199 time.sleep(1) 200 return was_set 201 202 203class RFileLock(FileLock): 204 """A reentrant file lock for process/thread synchronization.""" 205 206 def __init__(self, lock_name: str): 207 super().__init__(lock_name) 208 self._thread_lock = ThreadLock(lock_name, reentrant=True) 209 self._recursion_level = 0 210 211 def acquire(self, blocking: bool = True, shared: bool = False) -> bool: 212 """Acquire lock file, blocking.""" 213 result = ( 214 super().acquire(blocking, shared) if self._recursion_level == 0 else self._thread_lock.acquire(blocking) 215 ) 216 self._recursion_level += result 217 return result 218 219 def release(self): 220 """Release lock file.""" 221 self._recursion_level = max(0, self._recursion_level - 1) 222 if self._recursion_level == 0: 223 super().release() 224 else: 225 self._thread_lock.release() 226 227 228class ResourceLock(RFileLock): 229 """Manages the VISA resource.""" 230 231 class OpenResourceKwargs(TypedDict, total=False): 232 """Help MyPy understand this dict is for kwargs only.""" 233 234 read_termination: str 235 write_termination: str 236 237 def __init__(self, resource_address: str | tuple[str, str, str], baud_rate: int = 9600): 238 self.kwargs: ResourceLock.OpenResourceKwargs = {} 239 if isinstance(resource_address, tuple): 240 resource_address, read_termination, write_termination = resource_address 241 self.kwargs["read_termination"] = read_termination 242 self.kwargs["write_termination"] = write_termination 243 lock_name = uuid.uuid5(uuid.NAMESPACE_URL, resource_address).hex.upper() 244 super().__init__(lock_name) 245 self._resource_address = resource_address 246 self._resource_manager = pyvisa.ResourceManager() 247 self.resource: MessageBasedResource | None = None 248 self.baud_rate = baud_rate 249 250 def acquire(self, blocking: bool = True, shared: bool = False) -> bool: 251 """Acquire lock file, blocking.""" 252 delay_sig.suppress() 253 if (result := super().acquire(blocking, shared)) and self._recursion_level == 1: 254 for i in range(1, MAX_ATTEMPTS + 1): 255 try: 256 self.resource = cast( 257 MessageBasedResource, 258 self._resource_manager.open_resource(self._resource_address, **self.kwargs), 259 ) 260 except Exception: # pylint: disable=broad-exception-caught 261 logger.write_error_to_report(f"Error opening resource (attempt {i}): {traceback.format_exc()}") 262 time.sleep(ATTEMPT_WAIT) 263 else: 264 if isinstance(self.resource, pyvisa.resources.serial.SerialInstrument): 265 self.resource.baud_rate = self.baud_rate 266 break 267 else: 268 logger.write_critical_to_report(f"Failed to connect after {MAX_ATTEMPTS} attempts.") 269 raise RuntimeError("Could not get lock") 270 return result 271 272 def release(self): 273 """Release lock file.""" 274 if self._recursion_level == 1: 275 self.resource.close() 276 super().release() 277 delay_sig.try_raise()
53class DelaySIGINT: 54 """Delay processing of SIGINT.""" 55 56 BUFFER_SIZE = 3 # Raise after buffer is full 57 58 def __init__(self): 59 self.signal_received_count = 0 60 self.old_handle = None 61 62 def signal_recorder(self, _signo, _stack_frame): 63 """Record that a signal was received.""" 64 self.signal_received_count += 1 65 remaining = DelaySIGINT.BUFFER_SIZE - self.signal_received_count 66 logger.write_critical_to_report( 67 f"Delaying {signal.Signals(_signo).name}, " 68 f"press CTRL-C {remaining} more time(s) to trigger immediately (unsafe)." 69 ) 70 if self.signal_received_count >= DelaySIGINT.BUFFER_SIZE: 71 self.try_raise() 72 73 def suppress(self): 74 """Suppress, but record, and SIGINT received.""" 75 if threading.current_thread() is threading.main_thread(): # Signals only work in main thread 76 self.signal_received_count = 0 77 self.old_handle = signal.signal(signal.SIGINT, self.signal_recorder) 78 79 def try_raise(self): 80 """Raise the signal if it was received.""" 81 if threading.current_thread() is threading.main_thread(): # Signals only work in main thread 82 signal.signal(signal.SIGINT, self.old_handle) 83 if self.signal_received_count: 84 signal.raise_signal(signal.SIGINT)
Delay processing of SIGINT.
62 def signal_recorder(self, _signo, _stack_frame): 63 """Record that a signal was received.""" 64 self.signal_received_count += 1 65 remaining = DelaySIGINT.BUFFER_SIZE - self.signal_received_count 66 logger.write_critical_to_report( 67 f"Delaying {signal.Signals(_signo).name}, " 68 f"press CTRL-C {remaining} more time(s) to trigger immediately (unsafe)." 69 ) 70 if self.signal_received_count >= DelaySIGINT.BUFFER_SIZE: 71 self.try_raise()
Record that a signal was received.
73 def suppress(self): 74 """Suppress, but record, and SIGINT received.""" 75 if threading.current_thread() is threading.main_thread(): # Signals only work in main thread 76 self.signal_received_count = 0 77 self.old_handle = signal.signal(signal.SIGINT, self.signal_recorder)
Suppress, but record, and SIGINT received.
79 def try_raise(self): 80 """Raise the signal if it was received.""" 81 if threading.current_thread() is threading.main_thread(): # Signals only work in main thread 82 signal.signal(signal.SIGINT, self.old_handle) 83 if self.signal_received_count: 84 signal.raise_signal(signal.SIGINT)
Raise the signal if it was received.
90class ThreadLock: 91 """A deterministic thread lock for synchronization.""" 92 93 ThreadType = Union[_thread.LockType, _thread.RLock] # type: ignore[name-defined] 94 THREAD_LOCK_REGISTRY: dict[str, ThreadType] = {} 95 96 def __init__(self, lock_name: str, reentrant: bool = False): 97 lock: type[ThreadLock.ThreadType] = threading.RLock if reentrant else threading.Lock 98 if lock is not type(ThreadLock.THREAD_LOCK_REGISTRY.get(lock_name)): 99 ThreadLock.THREAD_LOCK_REGISTRY[lock_name] = lock() 100 self._lock = ThreadLock.THREAD_LOCK_REGISTRY[lock_name] 101 102 def acquire(self, blocking: bool = True, timeout: float = -1) -> bool: 103 """Acquire thread lock.""" 104 return self._lock.acquire(blocking, timeout) 105 106 def release(self): 107 """Release thread lock.""" 108 self._lock.release()
A deterministic thread lock for synchronization.
96 def __init__(self, lock_name: str, reentrant: bool = False): 97 lock: type[ThreadLock.ThreadType] = threading.RLock if reentrant else threading.Lock 98 if lock is not type(ThreadLock.THREAD_LOCK_REGISTRY.get(lock_name)): 99 ThreadLock.THREAD_LOCK_REGISTRY[lock_name] = lock() 100 self._lock = ThreadLock.THREAD_LOCK_REGISTRY[lock_name]
111class FileLock: 112 """A file lock for process/thread synchronization.""" 113 114 BASE_PATH = Path("/run/lock/hitl/") # In-memory temporary path 115 116 def __init__(self, lock_name: str): 117 # Create files/directory 118 if not FileLock.BASE_PATH.exists(): 119 FileLock.BASE_PATH.mkdir(mode=0o777) 120 try: 121 shutil.chown(FileLock.BASE_PATH, group="plugdev") 122 except PermissionError: 123 logger.write_error_to_report( 124 f"Could not change {FileLock.BASE_PATH} group to plugdev. May cause permission issues." 125 ) 126 127 lock_path = FileLock.BASE_PATH / f"LCK..{lock_name}" 128 for i in range(1, MAX_ATTEMPTS + 1): 129 try: 130 if not lock_path.exists(): 131 lock_path.touch(mode=0o444) 132 self._lock_fd = lock_path.open("r", encoding="ascii") # pylint: disable=consider-using-with 133 except PermissionError: 134 logger.write_error_to_report(f"Error opening lock (attempt {i}): {traceback.format_exc()}") 135 time.sleep(ATTEMPT_WAIT) 136 else: 137 break 138 else: 139 logger.write_critical_to_report(f"Failed to open lock file after {MAX_ATTEMPTS} attempts.") 140 raise PermissionError(str(lock_path)) 141 142 # Get locks 143 self._thread_lock = ThreadLock(lock_name) 144 self._locked = False 145 146 def acquire(self, blocking: bool = True, shared: bool = False) -> bool: 147 """Acquire lock file.""" 148 if self._thread_lock.acquire(blocking): 149 try: 150 fcntl.flock( 151 self._lock_fd, (fcntl.LOCK_SH if shared else fcntl.LOCK_EX) | (0 if blocking else fcntl.LOCK_NB) 152 ) 153 except OSError: 154 return False 155 self._locked = True 156 return True 157 return False 158 159 def release(self): 160 """Release lock file.""" 161 self._locked = False 162 fcntl.flock(self._lock_fd, fcntl.LOCK_UN) 163 self._thread_lock.release() 164 165 def locked(self) -> bool: 166 """Check if the file is locked.""" 167 if self._locked: 168 return True 169 if unlocked := self.acquire(blocking=False): 170 self.release() 171 return not unlocked 172 173 def __enter__(self): 174 return self.acquire() 175 176 def __exit__(self, exc_type, exc_value, exc_traceback): 177 self.release()
A file lock for process/thread synchronization.
116 def __init__(self, lock_name: str): 117 # Create files/directory 118 if not FileLock.BASE_PATH.exists(): 119 FileLock.BASE_PATH.mkdir(mode=0o777) 120 try: 121 shutil.chown(FileLock.BASE_PATH, group="plugdev") 122 except PermissionError: 123 logger.write_error_to_report( 124 f"Could not change {FileLock.BASE_PATH} group to plugdev. May cause permission issues." 125 ) 126 127 lock_path = FileLock.BASE_PATH / f"LCK..{lock_name}" 128 for i in range(1, MAX_ATTEMPTS + 1): 129 try: 130 if not lock_path.exists(): 131 lock_path.touch(mode=0o444) 132 self._lock_fd = lock_path.open("r", encoding="ascii") # pylint: disable=consider-using-with 133 except PermissionError: 134 logger.write_error_to_report(f"Error opening lock (attempt {i}): {traceback.format_exc()}") 135 time.sleep(ATTEMPT_WAIT) 136 else: 137 break 138 else: 139 logger.write_critical_to_report(f"Failed to open lock file after {MAX_ATTEMPTS} attempts.") 140 raise PermissionError(str(lock_path)) 141 142 # Get locks 143 self._thread_lock = ThreadLock(lock_name) 144 self._locked = False
146 def acquire(self, blocking: bool = True, shared: bool = False) -> bool: 147 """Acquire lock file.""" 148 if self._thread_lock.acquire(blocking): 149 try: 150 fcntl.flock( 151 self._lock_fd, (fcntl.LOCK_SH if shared else fcntl.LOCK_EX) | (0 if blocking else fcntl.LOCK_NB) 152 ) 153 except OSError: 154 return False 155 self._locked = True 156 return True 157 return False
Acquire lock file.
180class FileEvent(FileLock): 181 """Manage a multiprocess event using file-locks.""" 182 183 def is_set(self) -> bool: 184 """Check if the event is set or clear.""" 185 return super().locked() 186 187 def set(self): 188 """Set the event.""" 189 super().acquire(shared=True) 190 191 def clear(self): 192 """Clear the event. This will only work if we were the ones that set it.""" 193 if self._locked: 194 super().release() 195 196 def wait(self, timeout: float | None = None) -> bool: 197 """Wait until the event is set, returning True if it was or False if the timeout expired.""" 198 start_time = time.perf_counter() 199 while not (was_set := self.is_set()) and (timeout is None or time.perf_counter() - start_time <= timeout): 200 time.sleep(1) 201 return was_set
Manage a multiprocess event using file-locks.
183 def is_set(self) -> bool: 184 """Check if the event is set or clear.""" 185 return super().locked()
Check if the event is set or clear.
191 def clear(self): 192 """Clear the event. This will only work if we were the ones that set it.""" 193 if self._locked: 194 super().release()
Clear the event. This will only work if we were the ones that set it.
196 def wait(self, timeout: float | None = None) -> bool: 197 """Wait until the event is set, returning True if it was or False if the timeout expired.""" 198 start_time = time.perf_counter() 199 while not (was_set := self.is_set()) and (timeout is None or time.perf_counter() - start_time <= timeout): 200 time.sleep(1) 201 return was_set
Wait until the event is set, returning True if it was or False if the timeout expired.
204class RFileLock(FileLock): 205 """A reentrant file lock for process/thread synchronization.""" 206 207 def __init__(self, lock_name: str): 208 super().__init__(lock_name) 209 self._thread_lock = ThreadLock(lock_name, reentrant=True) 210 self._recursion_level = 0 211 212 def acquire(self, blocking: bool = True, shared: bool = False) -> bool: 213 """Acquire lock file, blocking.""" 214 result = ( 215 super().acquire(blocking, shared) if self._recursion_level == 0 else self._thread_lock.acquire(blocking) 216 ) 217 self._recursion_level += result 218 return result 219 220 def release(self): 221 """Release lock file.""" 222 self._recursion_level = max(0, self._recursion_level - 1) 223 if self._recursion_level == 0: 224 super().release() 225 else: 226 self._thread_lock.release()
A reentrant file lock for process/thread synchronization.
212 def acquire(self, blocking: bool = True, shared: bool = False) -> bool: 213 """Acquire lock file, blocking.""" 214 result = ( 215 super().acquire(blocking, shared) if self._recursion_level == 0 else self._thread_lock.acquire(blocking) 216 ) 217 self._recursion_level += result 218 return result
Acquire lock file, blocking.
229class ResourceLock(RFileLock): 230 """Manages the VISA resource.""" 231 232 class OpenResourceKwargs(TypedDict, total=False): 233 """Help MyPy understand this dict is for kwargs only.""" 234 235 read_termination: str 236 write_termination: str 237 238 def __init__(self, resource_address: str | tuple[str, str, str], baud_rate: int = 9600): 239 self.kwargs: ResourceLock.OpenResourceKwargs = {} 240 if isinstance(resource_address, tuple): 241 resource_address, read_termination, write_termination = resource_address 242 self.kwargs["read_termination"] = read_termination 243 self.kwargs["write_termination"] = write_termination 244 lock_name = uuid.uuid5(uuid.NAMESPACE_URL, resource_address).hex.upper() 245 super().__init__(lock_name) 246 self._resource_address = resource_address 247 self._resource_manager = pyvisa.ResourceManager() 248 self.resource: MessageBasedResource | None = None 249 self.baud_rate = baud_rate 250 251 def acquire(self, blocking: bool = True, shared: bool = False) -> bool: 252 """Acquire lock file, blocking.""" 253 delay_sig.suppress() 254 if (result := super().acquire(blocking, shared)) and self._recursion_level == 1: 255 for i in range(1, MAX_ATTEMPTS + 1): 256 try: 257 self.resource = cast( 258 MessageBasedResource, 259 self._resource_manager.open_resource(self._resource_address, **self.kwargs), 260 ) 261 except Exception: # pylint: disable=broad-exception-caught 262 logger.write_error_to_report(f"Error opening resource (attempt {i}): {traceback.format_exc()}") 263 time.sleep(ATTEMPT_WAIT) 264 else: 265 if isinstance(self.resource, pyvisa.resources.serial.SerialInstrument): 266 self.resource.baud_rate = self.baud_rate 267 break 268 else: 269 logger.write_critical_to_report(f"Failed to connect after {MAX_ATTEMPTS} attempts.") 270 raise RuntimeError("Could not get lock") 271 return result 272 273 def release(self): 274 """Release lock file.""" 275 if self._recursion_level == 1: 276 self.resource.close() 277 super().release() 278 delay_sig.try_raise()
Manages the VISA resource.
238 def __init__(self, resource_address: str | tuple[str, str, str], baud_rate: int = 9600): 239 self.kwargs: ResourceLock.OpenResourceKwargs = {} 240 if isinstance(resource_address, tuple): 241 resource_address, read_termination, write_termination = resource_address 242 self.kwargs["read_termination"] = read_termination 243 self.kwargs["write_termination"] = write_termination 244 lock_name = uuid.uuid5(uuid.NAMESPACE_URL, resource_address).hex.upper() 245 super().__init__(lock_name) 246 self._resource_address = resource_address 247 self._resource_manager = pyvisa.ResourceManager() 248 self.resource: MessageBasedResource | None = None 249 self.baud_rate = baud_rate
251 def acquire(self, blocking: bool = True, shared: bool = False) -> bool: 252 """Acquire lock file, blocking.""" 253 delay_sig.suppress() 254 if (result := super().acquire(blocking, shared)) and self._recursion_level == 1: 255 for i in range(1, MAX_ATTEMPTS + 1): 256 try: 257 self.resource = cast( 258 MessageBasedResource, 259 self._resource_manager.open_resource(self._resource_address, **self.kwargs), 260 ) 261 except Exception: # pylint: disable=broad-exception-caught 262 logger.write_error_to_report(f"Error opening resource (attempt {i}): {traceback.format_exc()}") 263 time.sleep(ATTEMPT_WAIT) 264 else: 265 if isinstance(self.resource, pyvisa.resources.serial.SerialInstrument): 266 self.resource.baud_rate = self.baud_rate 267 break 268 else: 269 logger.write_critical_to_report(f"Failed to connect after {MAX_ATTEMPTS} attempts.") 270 raise RuntimeError("Could not get lock") 271 return result
Acquire lock file, blocking.
232 class OpenResourceKwargs(TypedDict, total=False): 233 """Help MyPy understand this dict is for kwargs only.""" 234 235 read_termination: str 236 write_termination: str
Help MyPy understand this dict is for kwargs only.
Inherited Members
- builtins.dict
- get
- setdefault
- pop
- popitem
- keys
- items
- values
- update
- fromkeys
- clear
- copy