hitl_tester.modules.file_lock

Provides access to a basic file lock for synchronizing multiple processes.

(c) 2020-2024 TurnAround Factor, Inc.

CUI DISTRIBUTION CONTROL Controlled by: DLA J68 R&D SBIP CUI Category: Small Business Research and Technology Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS POC: GOV SBIP Program Manager Denise Price, 571-767-0111 Distribution authorized to U.S. Government Agencies only, to protect information not owned by the U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317, Fort Belvoir, VA 22060-6221

SBIR DATA RIGHTS Contract No.:SP4701-23-C-0083 Contractor Name: TurnAround Factor, Inc. Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005 Expiration of SBIR Data Rights Period: September 24, 2029 The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause contained in the above identified contract. No restrictions apply after the expiration date shown above. Any reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce the markings.

  1"""
  2Provides access to a basic file lock for synchronizing multiple processes.
  3
  4(c) 2020-2024 TurnAround Factor, Inc.
  5
  6CUI DISTRIBUTION CONTROL
  7Controlled by: DLA J68 R&D SBIP
  8CUI Category: Small Business Research and Technology
  9Distribution/Dissemination Controls: PROTECTED BY SBIR DATA RIGHTS
 10POC: GOV SBIP Program Manager Denise Price, 571-767-0111
 11Distribution authorized to U.S. Government Agencies only, to protect information not owned by the
 12U.S. Government and protected by a contractor’s ‘limited rights’ statement, or received with the understanding that
 13it is not routinely transmitted outside the U.S. Government (determination made September 14, 2024). Other requests
 14for this document shall be referred to ACOR DLA Logistics Operations (J-68), 8725 John J. Kingman Rd., Suite 4317,
 15Fort Belvoir, VA 22060-6221
 16
 17SBIR DATA RIGHTS
 18Contract No.:SP4701-23-C-0083
 19Contractor Name: TurnAround Factor, Inc.
 20Contractor Address: 10365 Wood Park Ct. Suite 313 / Ashland, VA 23005
 21Expiration of SBIR Data Rights Period: September 24, 2029
 22The Government's rights to use, modify, reproduce, release, perform, display, or disclose technical data or computer
 23software marked with this legend are restricted during the period shown as provided in paragraph (b)(4) of the Rights
 24in Noncommercial Technical Data and Computer Software--Small Business Innovative Research (SBIR) Program clause
 25contained in the above identified contract. No restrictions apply after the expiration date shown above. Any
 26reproduction of technical data, computer software, or portions thereof marked with this legend must also reproduce
 27the markings.
 28"""
 29
 30from __future__ import annotations
 31
 32import _thread
 33import fcntl
 34import shutil
 35import signal
 36import threading
 37import time
 38import traceback
 39import uuid
 40from pathlib import Path
 41from typing import cast, Union, TypedDict
 42
 43import pyvisa
 44from pyvisa.resources import MessageBasedResource
 45
 46from hitl_tester.modules.logger import logger
 47
 48MAX_ATTEMPTS = 4
 49ATTEMPT_WAIT = 1
 50
 51
 52class DelaySIGINT:
 53    """Delay processing of SIGINT."""
 54
 55    BUFFER_SIZE = 3  # Raise after buffer is full
 56
 57    def __init__(self):
 58        self.signal_received_count = 0
 59        self.old_handle = None
 60
 61    def signal_recorder(self, _signo, _stack_frame):
 62        """Record that a signal was received."""
 63        self.signal_received_count += 1
 64        remaining = DelaySIGINT.BUFFER_SIZE - self.signal_received_count
 65        logger.write_critical_to_report(
 66            f"Delaying {signal.Signals(_signo).name}, "
 67            f"press CTRL-C {remaining} more time(s) to trigger immediately (unsafe)."
 68        )
 69        if self.signal_received_count >= DelaySIGINT.BUFFER_SIZE:
 70            self.try_raise()
 71
 72    def suppress(self):
 73        """Suppress, but record, and SIGINT received."""
 74        if threading.current_thread() is threading.main_thread():  # Signals only work in main thread
 75            self.signal_received_count = 0
 76            self.old_handle = signal.signal(signal.SIGINT, self.signal_recorder)
 77
 78    def try_raise(self):
 79        """Raise the signal if it was received."""
 80        if threading.current_thread() is threading.main_thread():  # Signals only work in main thread
 81            signal.signal(signal.SIGINT, self.old_handle)
 82            if self.signal_received_count:
 83                signal.raise_signal(signal.SIGINT)
 84
 85
 86delay_sig = DelaySIGINT()
 87
 88
 89class ThreadLock:
 90    """A deterministic thread lock for synchronization."""
 91
 92    ThreadType = Union[_thread.LockType, _thread.RLock]  # type: ignore[name-defined]
 93    THREAD_LOCK_REGISTRY: dict[str, ThreadType] = {}
 94
 95    def __init__(self, lock_name: str, reentrant: bool = False):
 96        lock: type[ThreadLock.ThreadType] = threading.RLock if reentrant else threading.Lock
 97        if lock is not type(ThreadLock.THREAD_LOCK_REGISTRY.get(lock_name)):
 98            ThreadLock.THREAD_LOCK_REGISTRY[lock_name] = lock()
 99        self._lock = ThreadLock.THREAD_LOCK_REGISTRY[lock_name]
100
101    def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
102        """Acquire thread lock."""
103        return self._lock.acquire(blocking, timeout)
104
105    def release(self):
106        """Release thread lock."""
107        self._lock.release()
108
109
110class FileLock:
111    """A file lock for process/thread synchronization."""
112
113    BASE_PATH = Path("/run/lock/hitl/")  # In-memory temporary path
114
115    def __init__(self, lock_name: str):
116        # Create files/directory
117        if not FileLock.BASE_PATH.exists():
118            FileLock.BASE_PATH.mkdir(mode=0o777)
119            try:
120                shutil.chown(FileLock.BASE_PATH, group="plugdev")
121            except PermissionError:
122                logger.write_error_to_report(
123                    f"Could not change {FileLock.BASE_PATH} group to plugdev. May cause permission issues."
124                )
125
126        lock_path = FileLock.BASE_PATH / f"LCK..{lock_name}"
127        for i in range(1, MAX_ATTEMPTS + 1):
128            try:
129                if not lock_path.exists():
130                    lock_path.touch(mode=0o444)
131                self._lock_fd = lock_path.open("r", encoding="ascii")  # pylint: disable=consider-using-with
132            except PermissionError:
133                logger.write_error_to_report(f"Error opening lock (attempt {i}): {traceback.format_exc()}")
134                time.sleep(ATTEMPT_WAIT)
135            else:
136                break
137        else:
138            logger.write_critical_to_report(f"Failed to open lock file after {MAX_ATTEMPTS} attempts.")
139            raise PermissionError(str(lock_path))
140
141        # Get locks
142        self._thread_lock = ThreadLock(lock_name)
143        self._locked = False
144
145    def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
146        """Acquire lock file."""
147        if self._thread_lock.acquire(blocking):
148            try:
149                fcntl.flock(
150                    self._lock_fd, (fcntl.LOCK_SH if shared else fcntl.LOCK_EX) | (0 if blocking else fcntl.LOCK_NB)
151                )
152            except OSError:
153                return False
154            self._locked = True
155            return True
156        return False
157
158    def release(self):
159        """Release lock file."""
160        self._locked = False
161        fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
162        self._thread_lock.release()
163
164    def locked(self) -> bool:
165        """Check if the file is locked."""
166        if self._locked:
167            return True
168        if unlocked := self.acquire(blocking=False):
169            self.release()
170        return not unlocked
171
172    def __enter__(self):
173        return self.acquire()
174
175    def __exit__(self, exc_type, exc_value, exc_traceback):
176        self.release()
177
178
179class FileEvent(FileLock):
180    """Manage a multiprocess event using file-locks."""
181
182    def is_set(self) -> bool:
183        """Check if the event is set or clear."""
184        return super().locked()
185
186    def set(self):
187        """Set the event."""
188        super().acquire(shared=True)
189
190    def clear(self):
191        """Clear the event. This will only work if we were the ones that set it."""
192        if self._locked:
193            super().release()
194
195    def wait(self, timeout: float | None = None) -> bool:
196        """Wait until the event is set, returning True if it was or False if the timeout expired."""
197        start_time = time.perf_counter()
198        while not (was_set := self.is_set()) and (timeout is None or time.perf_counter() - start_time <= timeout):
199            time.sleep(1)
200        return was_set
201
202
203class RFileLock(FileLock):
204    """A reentrant file lock for process/thread synchronization."""
205
206    def __init__(self, lock_name: str):
207        super().__init__(lock_name)
208        self._thread_lock = ThreadLock(lock_name, reentrant=True)
209        self._recursion_level = 0
210
211    def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
212        """Acquire lock file, blocking."""
213        result = (
214            super().acquire(blocking, shared) if self._recursion_level == 0 else self._thread_lock.acquire(blocking)
215        )
216        self._recursion_level += result
217        return result
218
219    def release(self):
220        """Release lock file."""
221        self._recursion_level = max(0, self._recursion_level - 1)
222        if self._recursion_level == 0:
223            super().release()
224        else:
225            self._thread_lock.release()
226
227
228class ResourceLock(RFileLock):
229    """Manages the VISA resource."""
230
231    class OpenResourceKwargs(TypedDict, total=False):
232        """Help MyPy understand this dict is for kwargs only."""
233
234        read_termination: str
235        write_termination: str
236
237    def __init__(self, resource_address: str | tuple[str, str, str], baud_rate: int = 9600):
238        self.kwargs: ResourceLock.OpenResourceKwargs = {}
239        if isinstance(resource_address, tuple):
240            resource_address, read_termination, write_termination = resource_address
241            self.kwargs["read_termination"] = read_termination
242            self.kwargs["write_termination"] = write_termination
243        lock_name = uuid.uuid5(uuid.NAMESPACE_URL, resource_address).hex.upper()
244        super().__init__(lock_name)
245        self._resource_address = resource_address
246        self._resource_manager = pyvisa.ResourceManager()
247        self.resource: MessageBasedResource | None = None
248        self.baud_rate = baud_rate
249
250    def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
251        """Acquire lock file, blocking."""
252        delay_sig.suppress()
253        if (result := super().acquire(blocking, shared)) and self._recursion_level == 1:
254            for i in range(1, MAX_ATTEMPTS + 1):
255                try:
256                    self.resource = cast(
257                        MessageBasedResource,
258                        self._resource_manager.open_resource(self._resource_address, **self.kwargs),
259                    )
260                except Exception:  # pylint: disable=broad-exception-caught
261                    logger.write_error_to_report(f"Error opening resource (attempt {i}): {traceback.format_exc()}")
262                    time.sleep(ATTEMPT_WAIT)
263                else:
264                    if isinstance(self.resource, pyvisa.resources.serial.SerialInstrument):
265                        self.resource.baud_rate = self.baud_rate
266                    break
267            else:
268                logger.write_critical_to_report(f"Failed to connect after {MAX_ATTEMPTS} attempts.")
269                raise RuntimeError("Could not get lock")
270        return result
271
272    def release(self):
273        """Release lock file."""
274        if self._recursion_level == 1:
275            self.resource.close()
276        super().release()
277        delay_sig.try_raise()
MAX_ATTEMPTS = 4
ATTEMPT_WAIT = 1
class DelaySIGINT:
53class DelaySIGINT:
54    """Delay processing of SIGINT."""
55
56    BUFFER_SIZE = 3  # Raise after buffer is full
57
58    def __init__(self):
59        self.signal_received_count = 0
60        self.old_handle = None
61
62    def signal_recorder(self, _signo, _stack_frame):
63        """Record that a signal was received."""
64        self.signal_received_count += 1
65        remaining = DelaySIGINT.BUFFER_SIZE - self.signal_received_count
66        logger.write_critical_to_report(
67            f"Delaying {signal.Signals(_signo).name}, "
68            f"press CTRL-C {remaining} more time(s) to trigger immediately (unsafe)."
69        )
70        if self.signal_received_count >= DelaySIGINT.BUFFER_SIZE:
71            self.try_raise()
72
73    def suppress(self):
74        """Suppress, but record, and SIGINT received."""
75        if threading.current_thread() is threading.main_thread():  # Signals only work in main thread
76            self.signal_received_count = 0
77            self.old_handle = signal.signal(signal.SIGINT, self.signal_recorder)
78
79    def try_raise(self):
80        """Raise the signal if it was received."""
81        if threading.current_thread() is threading.main_thread():  # Signals only work in main thread
82            signal.signal(signal.SIGINT, self.old_handle)
83            if self.signal_received_count:
84                signal.raise_signal(signal.SIGINT)

Delay processing of SIGINT.

BUFFER_SIZE = 3
signal_received_count
old_handle
def signal_recorder(self, _signo, _stack_frame):
62    def signal_recorder(self, _signo, _stack_frame):
63        """Record that a signal was received."""
64        self.signal_received_count += 1
65        remaining = DelaySIGINT.BUFFER_SIZE - self.signal_received_count
66        logger.write_critical_to_report(
67            f"Delaying {signal.Signals(_signo).name}, "
68            f"press CTRL-C {remaining} more time(s) to trigger immediately (unsafe)."
69        )
70        if self.signal_received_count >= DelaySIGINT.BUFFER_SIZE:
71            self.try_raise()

Record that a signal was received.

def suppress(self):
73    def suppress(self):
74        """Suppress, but record, and SIGINT received."""
75        if threading.current_thread() is threading.main_thread():  # Signals only work in main thread
76            self.signal_received_count = 0
77            self.old_handle = signal.signal(signal.SIGINT, self.signal_recorder)

Suppress, but record, and SIGINT received.

def try_raise(self):
79    def try_raise(self):
80        """Raise the signal if it was received."""
81        if threading.current_thread() is threading.main_thread():  # Signals only work in main thread
82            signal.signal(signal.SIGINT, self.old_handle)
83            if self.signal_received_count:
84                signal.raise_signal(signal.SIGINT)

Raise the signal if it was received.

delay_sig = <DelaySIGINT object>
class ThreadLock:
 90class ThreadLock:
 91    """A deterministic thread lock for synchronization."""
 92
 93    ThreadType = Union[_thread.LockType, _thread.RLock]  # type: ignore[name-defined]
 94    THREAD_LOCK_REGISTRY: dict[str, ThreadType] = {}
 95
 96    def __init__(self, lock_name: str, reentrant: bool = False):
 97        lock: type[ThreadLock.ThreadType] = threading.RLock if reentrant else threading.Lock
 98        if lock is not type(ThreadLock.THREAD_LOCK_REGISTRY.get(lock_name)):
 99            ThreadLock.THREAD_LOCK_REGISTRY[lock_name] = lock()
100        self._lock = ThreadLock.THREAD_LOCK_REGISTRY[lock_name]
101
102    def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
103        """Acquire thread lock."""
104        return self._lock.acquire(blocking, timeout)
105
106    def release(self):
107        """Release thread lock."""
108        self._lock.release()

A deterministic thread lock for synchronization.

ThreadLock(lock_name: str, reentrant: bool = False)
 96    def __init__(self, lock_name: str, reentrant: bool = False):
 97        lock: type[ThreadLock.ThreadType] = threading.RLock if reentrant else threading.Lock
 98        if lock is not type(ThreadLock.THREAD_LOCK_REGISTRY.get(lock_name)):
 99            ThreadLock.THREAD_LOCK_REGISTRY[lock_name] = lock()
100        self._lock = ThreadLock.THREAD_LOCK_REGISTRY[lock_name]
ThreadType = typing.Union[_thread.lock, _thread.RLock]
THREAD_LOCK_REGISTRY: dict[str, typing.Union[_thread.lock, _thread.RLock]] = {'piplate': <unlocked _thread.RLock object owner=0 count=0>, 'CE': <unlocked _thread.lock object>, 'test_alive': <unlocked _thread.lock object>}
def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
102    def acquire(self, blocking: bool = True, timeout: float = -1) -> bool:
103        """Acquire thread lock."""
104        return self._lock.acquire(blocking, timeout)

Acquire thread lock.

def release(self):
106    def release(self):
107        """Release thread lock."""
108        self._lock.release()

Release thread lock.

class FileLock:
111class FileLock:
112    """A file lock for process/thread synchronization."""
113
114    BASE_PATH = Path("/run/lock/hitl/")  # In-memory temporary path
115
116    def __init__(self, lock_name: str):
117        # Create files/directory
118        if not FileLock.BASE_PATH.exists():
119            FileLock.BASE_PATH.mkdir(mode=0o777)
120            try:
121                shutil.chown(FileLock.BASE_PATH, group="plugdev")
122            except PermissionError:
123                logger.write_error_to_report(
124                    f"Could not change {FileLock.BASE_PATH} group to plugdev. May cause permission issues."
125                )
126
127        lock_path = FileLock.BASE_PATH / f"LCK..{lock_name}"
128        for i in range(1, MAX_ATTEMPTS + 1):
129            try:
130                if not lock_path.exists():
131                    lock_path.touch(mode=0o444)
132                self._lock_fd = lock_path.open("r", encoding="ascii")  # pylint: disable=consider-using-with
133            except PermissionError:
134                logger.write_error_to_report(f"Error opening lock (attempt {i}): {traceback.format_exc()}")
135                time.sleep(ATTEMPT_WAIT)
136            else:
137                break
138        else:
139            logger.write_critical_to_report(f"Failed to open lock file after {MAX_ATTEMPTS} attempts.")
140            raise PermissionError(str(lock_path))
141
142        # Get locks
143        self._thread_lock = ThreadLock(lock_name)
144        self._locked = False
145
146    def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
147        """Acquire lock file."""
148        if self._thread_lock.acquire(blocking):
149            try:
150                fcntl.flock(
151                    self._lock_fd, (fcntl.LOCK_SH if shared else fcntl.LOCK_EX) | (0 if blocking else fcntl.LOCK_NB)
152                )
153            except OSError:
154                return False
155            self._locked = True
156            return True
157        return False
158
159    def release(self):
160        """Release lock file."""
161        self._locked = False
162        fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
163        self._thread_lock.release()
164
165    def locked(self) -> bool:
166        """Check if the file is locked."""
167        if self._locked:
168            return True
169        if unlocked := self.acquire(blocking=False):
170            self.release()
171        return not unlocked
172
173    def __enter__(self):
174        return self.acquire()
175
176    def __exit__(self, exc_type, exc_value, exc_traceback):
177        self.release()

A file lock for process/thread synchronization.

FileLock(lock_name: str)
116    def __init__(self, lock_name: str):
117        # Create files/directory
118        if not FileLock.BASE_PATH.exists():
119            FileLock.BASE_PATH.mkdir(mode=0o777)
120            try:
121                shutil.chown(FileLock.BASE_PATH, group="plugdev")
122            except PermissionError:
123                logger.write_error_to_report(
124                    f"Could not change {FileLock.BASE_PATH} group to plugdev. May cause permission issues."
125                )
126
127        lock_path = FileLock.BASE_PATH / f"LCK..{lock_name}"
128        for i in range(1, MAX_ATTEMPTS + 1):
129            try:
130                if not lock_path.exists():
131                    lock_path.touch(mode=0o444)
132                self._lock_fd = lock_path.open("r", encoding="ascii")  # pylint: disable=consider-using-with
133            except PermissionError:
134                logger.write_error_to_report(f"Error opening lock (attempt {i}): {traceback.format_exc()}")
135                time.sleep(ATTEMPT_WAIT)
136            else:
137                break
138        else:
139            logger.write_critical_to_report(f"Failed to open lock file after {MAX_ATTEMPTS} attempts.")
140            raise PermissionError(str(lock_path))
141
142        # Get locks
143        self._thread_lock = ThreadLock(lock_name)
144        self._locked = False
BASE_PATH = PosixPath('/run/lock/hitl')
def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
146    def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
147        """Acquire lock file."""
148        if self._thread_lock.acquire(blocking):
149            try:
150                fcntl.flock(
151                    self._lock_fd, (fcntl.LOCK_SH if shared else fcntl.LOCK_EX) | (0 if blocking else fcntl.LOCK_NB)
152                )
153            except OSError:
154                return False
155            self._locked = True
156            return True
157        return False

Acquire lock file.

def release(self):
159    def release(self):
160        """Release lock file."""
161        self._locked = False
162        fcntl.flock(self._lock_fd, fcntl.LOCK_UN)
163        self._thread_lock.release()

Release lock file.

def locked(self) -> bool:
165    def locked(self) -> bool:
166        """Check if the file is locked."""
167        if self._locked:
168            return True
169        if unlocked := self.acquire(blocking=False):
170            self.release()
171        return not unlocked

Check if the file is locked.

class FileEvent(FileLock):
180class FileEvent(FileLock):
181    """Manage a multiprocess event using file-locks."""
182
183    def is_set(self) -> bool:
184        """Check if the event is set or clear."""
185        return super().locked()
186
187    def set(self):
188        """Set the event."""
189        super().acquire(shared=True)
190
191    def clear(self):
192        """Clear the event. This will only work if we were the ones that set it."""
193        if self._locked:
194            super().release()
195
196    def wait(self, timeout: float | None = None) -> bool:
197        """Wait until the event is set, returning True if it was or False if the timeout expired."""
198        start_time = time.perf_counter()
199        while not (was_set := self.is_set()) and (timeout is None or time.perf_counter() - start_time <= timeout):
200            time.sleep(1)
201        return was_set

Manage a multiprocess event using file-locks.

def is_set(self) -> bool:
183    def is_set(self) -> bool:
184        """Check if the event is set or clear."""
185        return super().locked()

Check if the event is set or clear.

def set(self):
187    def set(self):
188        """Set the event."""
189        super().acquire(shared=True)

Set the event.

def clear(self):
191    def clear(self):
192        """Clear the event. This will only work if we were the ones that set it."""
193        if self._locked:
194            super().release()

Clear the event. This will only work if we were the ones that set it.

def wait(self, timeout: float | None = None) -> bool:
196    def wait(self, timeout: float | None = None) -> bool:
197        """Wait until the event is set, returning True if it was or False if the timeout expired."""
198        start_time = time.perf_counter()
199        while not (was_set := self.is_set()) and (timeout is None or time.perf_counter() - start_time <= timeout):
200            time.sleep(1)
201        return was_set

Wait until the event is set, returning True if it was or False if the timeout expired.

class RFileLock(FileLock):
204class RFileLock(FileLock):
205    """A reentrant file lock for process/thread synchronization."""
206
207    def __init__(self, lock_name: str):
208        super().__init__(lock_name)
209        self._thread_lock = ThreadLock(lock_name, reentrant=True)
210        self._recursion_level = 0
211
212    def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
213        """Acquire lock file, blocking."""
214        result = (
215            super().acquire(blocking, shared) if self._recursion_level == 0 else self._thread_lock.acquire(blocking)
216        )
217        self._recursion_level += result
218        return result
219
220    def release(self):
221        """Release lock file."""
222        self._recursion_level = max(0, self._recursion_level - 1)
223        if self._recursion_level == 0:
224            super().release()
225        else:
226            self._thread_lock.release()

A reentrant file lock for process/thread synchronization.

RFileLock(lock_name: str)
207    def __init__(self, lock_name: str):
208        super().__init__(lock_name)
209        self._thread_lock = ThreadLock(lock_name, reentrant=True)
210        self._recursion_level = 0
def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
212    def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
213        """Acquire lock file, blocking."""
214        result = (
215            super().acquire(blocking, shared) if self._recursion_level == 0 else self._thread_lock.acquire(blocking)
216        )
217        self._recursion_level += result
218        return result

Acquire lock file, blocking.

def release(self):
220    def release(self):
221        """Release lock file."""
222        self._recursion_level = max(0, self._recursion_level - 1)
223        if self._recursion_level == 0:
224            super().release()
225        else:
226            self._thread_lock.release()

Release lock file.

Inherited Members
FileLock
BASE_PATH
locked
class ResourceLock(RFileLock):
229class ResourceLock(RFileLock):
230    """Manages the VISA resource."""
231
232    class OpenResourceKwargs(TypedDict, total=False):
233        """Help MyPy understand this dict is for kwargs only."""
234
235        read_termination: str
236        write_termination: str
237
238    def __init__(self, resource_address: str | tuple[str, str, str], baud_rate: int = 9600):
239        self.kwargs: ResourceLock.OpenResourceKwargs = {}
240        if isinstance(resource_address, tuple):
241            resource_address, read_termination, write_termination = resource_address
242            self.kwargs["read_termination"] = read_termination
243            self.kwargs["write_termination"] = write_termination
244        lock_name = uuid.uuid5(uuid.NAMESPACE_URL, resource_address).hex.upper()
245        super().__init__(lock_name)
246        self._resource_address = resource_address
247        self._resource_manager = pyvisa.ResourceManager()
248        self.resource: MessageBasedResource | None = None
249        self.baud_rate = baud_rate
250
251    def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
252        """Acquire lock file, blocking."""
253        delay_sig.suppress()
254        if (result := super().acquire(blocking, shared)) and self._recursion_level == 1:
255            for i in range(1, MAX_ATTEMPTS + 1):
256                try:
257                    self.resource = cast(
258                        MessageBasedResource,
259                        self._resource_manager.open_resource(self._resource_address, **self.kwargs),
260                    )
261                except Exception:  # pylint: disable=broad-exception-caught
262                    logger.write_error_to_report(f"Error opening resource (attempt {i}): {traceback.format_exc()}")
263                    time.sleep(ATTEMPT_WAIT)
264                else:
265                    if isinstance(self.resource, pyvisa.resources.serial.SerialInstrument):
266                        self.resource.baud_rate = self.baud_rate
267                    break
268            else:
269                logger.write_critical_to_report(f"Failed to connect after {MAX_ATTEMPTS} attempts.")
270                raise RuntimeError("Could not get lock")
271        return result
272
273    def release(self):
274        """Release lock file."""
275        if self._recursion_level == 1:
276            self.resource.close()
277        super().release()
278        delay_sig.try_raise()

Manages the VISA resource.

ResourceLock(resource_address: str | tuple[str, str, str], baud_rate: int = 9600)
238    def __init__(self, resource_address: str | tuple[str, str, str], baud_rate: int = 9600):
239        self.kwargs: ResourceLock.OpenResourceKwargs = {}
240        if isinstance(resource_address, tuple):
241            resource_address, read_termination, write_termination = resource_address
242            self.kwargs["read_termination"] = read_termination
243            self.kwargs["write_termination"] = write_termination
244        lock_name = uuid.uuid5(uuid.NAMESPACE_URL, resource_address).hex.upper()
245        super().__init__(lock_name)
246        self._resource_address = resource_address
247        self._resource_manager = pyvisa.ResourceManager()
248        self.resource: MessageBasedResource | None = None
249        self.baud_rate = baud_rate
resource: pyvisa.resources.messagebased.MessageBasedResource | None
baud_rate
def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
251    def acquire(self, blocking: bool = True, shared: bool = False) -> bool:
252        """Acquire lock file, blocking."""
253        delay_sig.suppress()
254        if (result := super().acquire(blocking, shared)) and self._recursion_level == 1:
255            for i in range(1, MAX_ATTEMPTS + 1):
256                try:
257                    self.resource = cast(
258                        MessageBasedResource,
259                        self._resource_manager.open_resource(self._resource_address, **self.kwargs),
260                    )
261                except Exception:  # pylint: disable=broad-exception-caught
262                    logger.write_error_to_report(f"Error opening resource (attempt {i}): {traceback.format_exc()}")
263                    time.sleep(ATTEMPT_WAIT)
264                else:
265                    if isinstance(self.resource, pyvisa.resources.serial.SerialInstrument):
266                        self.resource.baud_rate = self.baud_rate
267                    break
268            else:
269                logger.write_critical_to_report(f"Failed to connect after {MAX_ATTEMPTS} attempts.")
270                raise RuntimeError("Could not get lock")
271        return result

Acquire lock file, blocking.

def release(self):
273    def release(self):
274        """Release lock file."""
275        if self._recursion_level == 1:
276            self.resource.close()
277        super().release()
278        delay_sig.try_raise()

Release lock file.

Inherited Members
FileLock
BASE_PATH
locked
class ResourceLock.OpenResourceKwargs(typing.TypedDict):
232    class OpenResourceKwargs(TypedDict, total=False):
233        """Help MyPy understand this dict is for kwargs only."""
234
235        read_termination: str
236        write_termination: str

Help MyPy understand this dict is for kwargs only.

read_termination: str
write_termination: str
Inherited Members
builtins.dict
get
setdefault
pop
popitem
keys
items
values
update
fromkeys
clear
copy