Module genshin.utility.concurrency
Utilities for concurrency optimizations.
Functions
def prevent_concurrency(func: CallableT) ‑> ~CallableT
-
Expand source code
def prevent_concurrency(func: CallableT) -> CallableT: """Prevent function from running concurrently. This should be done exclusively for functions that cache their result. """ def wrapper(func: AnyCallable) -> AnyCallable: lock: typing.Optional[asyncio.Lock] = None @functools.wraps(func) async def inner(*args: typing.Any, **kwargs: typing.Any) -> typing.Any: nonlocal lock if lock is None: lock = asyncio.Lock() async with lock: return await func(*args, **kwargs) return inner return typing.cast("CallableT", MethodDecorator(func, wrapper))
Prevent function from running concurrently.
This should be done exclusively for functions that cache their result.