Module genshin.paginators.base
Base paginators.
Classes
class BufferedPaginator (*, limit: typing.Optional[int] = None)
-
Paginator with a support for buffers.
Expand source code
class BufferedPaginator(typing.Generic[T], Paginator[T], abc.ABC): """Paginator with a support for buffers.""" __slots__ = ("limit", "_buffer", "_counter") limit: typing.Optional[int] """Limit of items to be yielded.""" _buffer: typing.Optional[typing.Iterator[T]] """Item buffer. If none then exhausted.""" _counter: int """Amount of yielded items so far. No guarantee to be synchronized.""" def __init__(self, *, limit: typing.Optional[int] = None) -> None: self.limit = limit self._buffer = iter(()) self._counter = 0 @property def exhausted(self) -> bool: """Whether all pages have been fetched.""" return self._buffer is None def _complete(self) -> typing.NoReturn: self._buffer = None super()._complete() raise # pyright bug @abc.abstractmethod async def next_page(self) -> typing.Optional[typing.Iterable[T]]: """Get the next page of the paginator.""" async def __anext__(self) -> T: if not self._buffer: self._complete() if self.limit and self._counter >= self.limit: self._complete() self._counter += 1 try: return next(self._buffer) except StopIteration: pass buffer = await self.next_page() if not buffer: self._complete() self._buffer = iter(buffer) return next(self._buffer)
Ancestors
- Paginator
- typing.Generic
- abc.ABC
Subclasses
- genshin.paginators.api.APIPaginator
Instance variables
prop exhausted : bool
-
Whether all pages have been fetched.
Expand source code
@property def exhausted(self) -> bool: """Whether all pages have been fetched.""" return self._buffer is None
var limit : Optional[int]
-
Limit of items to be yielded.
Methods
async def next_page(self) ‑> Optional[Iterable[~T]]
-
Get the next page of the paginator.
Inherited members
class MergedPaginator (iterables: typing.Collection[typing.AsyncIterable[T]], *, key: typing.Optional[typing.Callable[[T], typing.Any]] = None, limit: typing.Optional[int] = None)
-
A paginator merging a collection of iterators.
Expand source code
class MergedPaginator(typing.Generic[T], Paginator[T]): """A paginator merging a collection of iterators.""" __slots__ = ("iterators", "_heap", "limit", "_key", "_prepared", "_counter") # TODO: Use named tuples for the heap iterators: typing.Sequence[typing.AsyncIterator[T]] """Entry iterators. Only used as pointers to a heap. """ _heap: list[tuple[typing.Any, int, T, typing.AsyncIterator[T]]] """Underlying heap queue. List of (comparable, unique order id, value, iterator) """ limit: typing.Optional[int] """Limit of items to be yielded""" _key: typing.Optional[typing.Callable[[T], typing.Any]] """Sorting key.""" _prepared: bool """Whether the paginator is prepared""" _counter: int """Amount of yielded items so far. No guarantee to be synchronized.""" def __init__( self, iterables: typing.Collection[typing.AsyncIterable[T]], *, key: typing.Optional[typing.Callable[[T], typing.Any]] = None, limit: typing.Optional[int] = None, ) -> None: self.iterators = [iterable.__aiter__() for iterable in iterables] self._key = key self.limit = limit self._prepared = False self._counter = 0 def _complete(self) -> typing.NoReturn: """Mark paginator as complete and clear memory.""" # free memory in heaps self._heap = [] self.iterators = [] super()._complete() raise # pyright bug def _create_heap_item( self, value: T, iterator: typing.AsyncIterator[T], order: typing.Optional[int] = None, ) -> tuple[typing.Any, int, T, typing.AsyncIterator[T]]: """Create a new item for the heap queue.""" sort_value = self._key(value) if self._key else value if order is None: order = random.getrandbits(16) return (sort_value, order, value, iterator) async def _prepare(self) -> None: """Prepare the heap queue by filling it with initial values.""" coros = (it.__anext__() for it in self.iterators) first_values = await asyncio.gather(*coros, return_exceptions=True) self._heap = [] for order, (it, value) in enumerate(zip(self.iterators, first_values)): if isinstance(value, BaseException): if isinstance(value, StopAsyncIteration): continue raise value heapq.heappush(self._heap, self._create_heap_item(value, iterator=it, order=order)) self._prepared = True async def __anext__(self) -> T: if not self._prepared: await self._prepare() if not self._heap: self._complete() if self.limit and self._counter >= self.limit: self._complete() self._counter += 1 _, order, value, it = self._heap[0] try: new_value = await it.__anext__() except StopAsyncIteration: heapq.heappop(self._heap) return value heapq.heapreplace(self._heap, self._create_heap_item(new_value, iterator=it, order=order)) return value async def flatten(self, *, lazy: bool = False) -> typing.Sequence[T]: """Flatten the paginator.""" if self.limit is not None and lazy: return [item async for item in self] coros = (flatten(i) for i in self.iterators) lists: typing.Sequence[typing.Sequence[T]] = await asyncio.gather(*coros) # pyright: ignore return list(heapq.merge(*lists, key=self._key))[: self.limit] # pyright: ignore
Ancestors
- Paginator
- typing.Generic
- abc.ABC
Instance variables
var iterators : Sequence[AsyncIterator[~T]]
-
Entry iterators.
Only used as pointers to a heap.
var limit : Optional[int]
-
Limit of items to be yielded
Inherited members
class Paginator
-
Base paginator.
Expand source code
class Paginator(typing.Generic[T], abc.ABC): """Base paginator.""" __slots__ = () @property def _repr_attributes(self) -> typing.Sequence[str]: """Attributes to be used in repr.""" return [ attribute for subclass in self.__class__.__mro__ for attribute in getattr(subclass, "__slots__", ()) if not attribute.startswith("_") ] def __repr__(self) -> str: kwargs = ", ".join(f"{name}={getattr(self, name, 'undefined')!r}" for name in self._repr_attributes) return f"{self.__class__.__name__}({kwargs})" def __pretty__( self, fmt: typing.Callable[[typing.Any], str], **kwargs: typing.Any, ) -> typing.Iterator[typing.Any]: """Devtools pretty formatting.""" yield self.__class__.__name__ yield "(" yield 1 for name in self._repr_attributes: yield name yield "=" if hasattr(self, name): yield fmt(getattr(self, name)) else: yield "<undefined>" yield 0 yield -1 yield ")" async def next(self) -> T: """Return the next element.""" try: return await self.__anext__() except StopAsyncIteration: raise LookupError("No elements were found") from None def _complete(self) -> typing.NoReturn: """Mark paginator as complete and clear memory.""" raise StopAsyncIteration("No more items exist in this paginator. It has been exhausted.") from None def __aiter__(self) -> Paginator[T]: return self async def flatten(self) -> typing.Sequence[T]: """Flatten the paginator.""" return [item async for item in self] def __await__(self) -> typing.Generator[None, None, typing.Sequence[T]]: return self.flatten().__await__() @abc.abstractmethod async def __anext__(self) -> T: ...
Ancestors
- typing.Generic
- abc.ABC
Subclasses
- genshin.paginators.base.BasicPaginator
- BufferedPaginator
- MergedPaginator
Methods
async def flatten(self) ‑> Sequence[~T]
-
Flatten the paginator.
async def next(self) ‑> ~T
-
Return the next element.