Module genshin.utility.concurrency

Utilities for concurrency optimizations.

Functions

def prevent_concurrency(func: CallableT) ‑> ~CallableT
Expand source code
def prevent_concurrency(func: CallableT) -> CallableT:
    """Prevent function from running concurrently.

    This should be done exclusively for functions that cache their result.
    """

    def wrapper(func: AnyCallable) -> AnyCallable:
        lock: typing.Optional[asyncio.Lock] = None

        @functools.wraps(func)
        async def inner(*args: typing.Any, **kwargs: typing.Any) -> typing.Any:
            nonlocal lock
            if lock is None:
                lock = asyncio.Lock()

            async with lock:
                return await func(*args, **kwargs)

        return inner

    return typing.cast("CallableT", MethodDecorator(func, wrapper))

Prevent function from running concurrently.

This should be done exclusively for functions that cache their result.