Module genshin.client.components.auth.client

Main auth client.

Classes

class AuthClient (cookies: Union[ForwardRef('http.cookies.BaseCookie[Any]'), Mapping[Any, Any], str, Sequence[Union[ForwardRef('http.cookies.BaseCookie[Any]'), Mapping[Any, Any], str]], ForwardRef(None)] = None, *, authkey: Optional[str] = None, lang: str = 'en-us', region: Region = Region.OVERSEAS, proxy: Optional[str] = None, game: Optional[Game] = None, uid: Optional[int] = None, hoyolab_id: Optional[int] = None, device_id: Optional[str] = None, device_fp: Optional[str] = None, headers: Union[Mapping[str, str], Mapping[multidict._multidict.istr, str], multidict._multidict.CIMultiDict, multidict._multidict.CIMultiDictProxy, Iterable[Tuple[Union[str, multidict._multidict.istr], str]], ForwardRef(None)] = None, cache: Optional[BaseCache] = None, debug: bool = False)

Auth client component.

Expand source code
class AuthClient(subclients.AppAuthClient, subclients.WebAuthClient, subclients.GameAuthClient):
    """Auth client component."""

    async def login_with_password(
        self,
        account: str,
        password: str,
        *,
        port: int = 5000,
        encrypted: bool = False,
        geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None,
    ) -> typing.Union[WebLoginResult, CNWebLoginResult]:
        """Login with a password via web endpoint.

        Endpoint is chosen based on client region.

        Note that this will start a webserver if captcha is
        triggered and `geetest_solver` is not passed.

        Raises
        ------
        - AccountLoginFail: Invalid password provided.
        - AccountDoesNotExist: Invalid email/username.
        """
        if self.region is types.Region.CHINESE:
            return await self.cn_login_with_password(
                account, password, encrypted=encrypted, port=port, geetest_solver=geetest_solver
            )

        return await self.os_login_with_password(
            account, password, port=port, encrypted=encrypted, geetest_solver=geetest_solver
        )

    @base.region_specific(types.Region.OVERSEAS)
    async def os_login_with_password(
        self,
        account: str,
        password: str,
        *,
        port: int = 5000,
        encrypted: bool = False,
        token_type: typing.Optional[int] = 6,
        geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None,
    ) -> WebLoginResult:
        """Login with a password via web endpoint.

        Note that this will start a webserver if captcha is
        triggered and `geetest_solver` is not passed.

        Raises
        ------
        - AccountLoginFail: Invalid password provided.
        - AccountDoesNotExist: Invalid email/username.
        """
        result = await self._os_web_login(account, password, encrypted=encrypted, token_type=token_type)

        if not isinstance(result, SessionMMT):
            # Captcha not triggered
            return result

        if geetest_solver:
            mmt_result = await geetest_solver(result)
        else:
            mmt_result = await server.solve_geetest(result, port=port)

        return await self._os_web_login(
            account, password, encrypted=encrypted, token_type=token_type, mmt_result=mmt_result
        )

    @base.region_specific(types.Region.CHINESE)
    async def cn_login_with_password(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        port: int = 5000,
        geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None,
    ) -> CNWebLoginResult:
        """Login with a password via Miyoushe loginByPassword endpoint.

        Note that this will start a webserver if captcha is
        triggered and `geetest_solver` is not passed.
        """
        result = await self._cn_web_login(account, password, encrypted=encrypted)

        if not isinstance(result, SessionMMT):
            # Captcha not triggered
            return result

        if geetest_solver:
            mmt_result = await geetest_solver(result)
        else:
            mmt_result = await server.solve_geetest(result, port=port)

        return await self._cn_web_login(account, password, encrypted=encrypted, mmt_result=mmt_result)

    @base.region_specific(types.Region.OVERSEAS)
    async def check_mobile_number_validity(self, mobile: str) -> bool:
        """Check if a mobile number is valid (it's registered on Miyoushe).

        Returns True if the mobile number is valid, False otherwise.
        """
        async with aiohttp.ClientSession() as session:
            async with session.get(
                routes.CHECK_MOBILE_VALIDITY_URL.get_url(),
                params={"mobile": mobile},
            ) as r:
                data = await r.json()

        return data["data"]["status"] != data["data"]["is_registable"]

    @base.region_specific(types.Region.CHINESE)
    async def login_with_mobile_number(
        self,
        mobile: str,
        *,
        encrypted: bool = False,
        port: int = 5000,
    ) -> MobileLoginResult:
        """Login with mobile number, returns cookies.

        Only works for Chinese region (Miyoushe) users, do not include
        area code (+86) in the mobile number.

        Steps:
        1. Sends OTP to the provided mobile number.
        2. If captcha is triggered, prompts the user to solve it.
        3. Lets user enter the OTP.
        4. Logs in with the OTP.
        5. Returns cookies.
        """
        result = await self._send_mobile_otp(mobile, encrypted=encrypted)

        if isinstance(result, SessionMMT):
            # Captcha triggered
            mmt_result = await server.solve_geetest(result, port=port)
            await self._send_mobile_otp(mobile, encrypted=encrypted, mmt_result=mmt_result)

        otp = await server.enter_code(port=port)
        return await self._login_with_mobile_otp(mobile, otp, encrypted=encrypted)

    @base.region_specific(types.Region.OVERSEAS)
    async def login_with_app_password(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        port: int = 5000,
        geetest_solver: typing.Optional[typing.Callable[[SessionMMT], typing.Awaitable[SessionMMTResult]]] = None,
    ) -> AppLoginResult:
        """Login with a password via HoYoLab app endpoint.

        Note that this will start a webserver if either of the
        following happens:

        1. Captcha is triggered and `geetest_solver` is not passed.
        2. Email verification is triggered (can happen if you
        first login with a new device).

        Raises
        ------
        - AccountLoginFail: Invalid password provided.
        - AccountDoesNotExist: Invalid email/username.
        - VerificationCodeRateLimited: Too many verification code requests.
        """
        result = await self._app_login(account, password, encrypted=encrypted)

        if isinstance(result, SessionMMT):
            # Captcha triggered
            if geetest_solver:
                mmt_result = await geetest_solver(result)
            else:
                mmt_result = await server.solve_geetest(result, port=port)

            result = await self._app_login(account, password, encrypted=encrypted, mmt_result=mmt_result)

        if isinstance(result, ActionTicket):
            # Email verification required
            mmt = await self._send_verification_email(result)
            if mmt:
                if geetest_solver:
                    mmt_result = await geetest_solver(mmt)
                else:
                    mmt_result = await server.solve_geetest(mmt, port=port)

                await asyncio.sleep(2)  # Add delay to prevent [-3206]
                await self._send_verification_email(result, mmt_result=mmt_result)

            code = await server.enter_code(port=port)
            await self._verify_email(code, result)

            result = await self._app_login(account, password, encrypted=encrypted, ticket=result)

        return result

    @base.region_specific(types.Region.CHINESE)
    async def login_with_qrcode(self) -> QRLoginResult:
        """Login with QR code, only available for Miyoushe users."""
        import qrcode
        import qrcode.image.pil
        from qrcode.constants import ERROR_CORRECT_L

        creation_result = await self._create_qrcode()
        qrcode_: qrcode.image.pil.PilImage = qrcode.make(creation_result.url, error_correction=ERROR_CORRECT_L)  # type: ignore
        qrcode_.show()

        scanned = False
        while True:
            status, cookies = await self._check_qrcode(creation_result.ticket)
            if status is QRCodeStatus.SCANNED and not scanned:
                LOGGER_.info("QR code scanned")
                scanned = True
            elif status is QRCodeStatus.CONFIRMED:
                LOGGER_.info("QR code login confirmed")
                break

            await asyncio.sleep(1)

        self.set_cookies(cookies)
        dict_cookies = {key: morsel.value for key, morsel in cookies.items()}
        return QRLoginResult(**dict_cookies)

    @managers.no_multi
    async def create_mmt(self) -> MMT:
        """Create a geetest challenge."""
        if self.default_game is None:
            raise ValueError("No default game set.")

        headers = {
            "DS": ds_utility.generate_geetest_ds(self.region),
            **auth_utility.CREATE_MMT_HEADERS[self.region],
        }

        url = routes.CREATE_MMT_URL.get_url(self.region)
        if self.region is types.Region.OVERSEAS:
            url = url.update_query(app_key=constants.GEETEST_RECORD_KEYS[self.default_game])

        assert isinstance(self.cookie_manager, managers.CookieManager)
        async with self.cookie_manager.create_session() as session:
            async with session.get(url, headers=headers, cookies=self.cookie_manager.cookies) as r:
                data = await r.json()

        if not data["data"]:
            errors.raise_for_retcode(data)

        return MMT(**data["data"])

    @base.region_specific(types.Region.OVERSEAS)
    @managers.no_multi
    async def verify_mmt(self, mmt_result: MMTResult) -> None:
        """Verify a geetest challenge."""
        if self.default_game is None:
            raise ValueError("No default game set.")

        headers = {
            "DS": ds_utility.generate_geetest_ds(self.region),
            **auth_utility.CREATE_MMT_HEADERS[self.region],
        }

        body = mmt_result.model_dump()
        body["app_key"] = constants.GEETEST_RECORD_KEYS[self.default_game]

        assert isinstance(self.cookie_manager, managers.CookieManager)
        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.VERIFY_MMT_URL.get_url(), json=body, headers=headers, cookies=self.cookie_manager.cookies
            ) as r:
                data = await r.json()

        if not data["data"]:
            errors.raise_for_retcode(data)

    async def os_game_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        port: int = 5000,
        geetest_solver: typing.Optional[typing.Callable[[RiskyCheckMMT], typing.Awaitable[RiskyCheckMMTResult]]] = None,
    ) -> GameLoginResult:
        """Perform a login to the game.

        Raises
        ------
        - IncorrectGameAccount: Invalid account provided.
        - IncorrectGamePassword: Invalid password provided.
        """
        api_server = "api.geetest.com" if self.region is types.Region.CHINESE else "api-na.geetest.com"

        result = await self._shield_login(account, password, encrypted=encrypted)

        if isinstance(result, RiskyCheckMMT):
            if geetest_solver:
                mmt_result = await geetest_solver(result)
            else:
                mmt_result = await server.solve_geetest(result, port=port, api_server=api_server)

            result = await self._shield_login(account, password, encrypted=encrypted, mmt_result=mmt_result)

        if not result.device_grant_required:
            return await self._os_game_login(result.account.uid, result.account.token)

        mmt = await self._send_game_verification_email(result.account.device_grant_ticket)
        if mmt:
            if geetest_solver:
                mmt_result = await geetest_solver(mmt)
            else:
                mmt_result = await server.solve_geetest(mmt, port=port, api_server=api_server)

            await self._send_game_verification_email(result.account.device_grant_ticket, mmt_result=mmt_result)

        code = await server.enter_code()
        verification_result = await self._verify_game_email(code, result.account.device_grant_ticket)

        return await self._os_game_login(result.account.uid, verification_result.game_token)

    def _gen_random_fp(self) -> str:
        """Generate a random device fingerprint used for generating authentic device fingerprint."""
        char = digits + "abcdef"
        return "".join(random.choices(char, k=13))

    def _gen_ext_fields(self, oaid: str, board: str) -> str:
        oaid_key = "oaid" if self.region is types.Region.CHINESE else "adid"
        ext_fields = {oaid_key: oaid, "board": board}
        return json.dumps(ext_fields)

    async def generate_fp(
        self,
        *,
        device_id: str,
        device_board: str,
        oaid: str,
    ) -> str:
        """Generate an authentic device fingerprint."""
        device_id_key = "bbs_device_id" if self.region is types.Region.CHINESE else "hoyolab_device_id"
        payload = {
            "device_id": device_id,
            "device_fp": self._gen_random_fp(),
            "seed_id": str(uuid.uuid4()).lower(),
            "seed_time": str(int(time.time() * 1000)),
            "platform": "2",
            "app_name": "bbs_cn" if self.region is types.Region.CHINESE else "bbs_oversea",
            "ext_fields": self._gen_ext_fields(oaid, device_board),
            device_id_key: str(uuid.uuid4()).lower(),
        }

        async with (
            aiohttp.ClientSession() as session,
            session.post(routes.GET_FP_URL.get_url(self.region), json=payload) as r,
        ):
            data = await r.json()

        if data["data"]["code"] != 200:
            raise errors.GenshinException(data, data["data"]["msg"])

        return data["data"]["device_fp"]

Ancestors

Subclasses

Class variables

var logger : logging.Logger

Instance variables

var authkeys : dict[Game, str]
var cacheBaseCache
var cookie_managerBaseCookieManager
var custom_headers : multidict._multidict.CIMultiDict[str]
var uids : dict[Game, int]

Methods

async def check_mobile_number_validity(self, mobile: str) ‑> bool

Check if a mobile number is valid (it's registered on Miyoushe).

Returns True if the mobile number is valid, False otherwise.

async def cn_login_with_password(self, account: str, password: str, *, encrypted: bool = False, port: int = 5000, geetest_solver: Optional[Callable[[SessionMMT], Awaitable[SessionMMTResult]]] = None) ‑> CNWebLoginResult

Login with a password via Miyoushe loginByPassword endpoint.

Note that this will start a webserver if captcha is triggered and geetest_solver is not passed.

async def create_mmt(self) ‑> MMT

Create a geetest challenge.

async def generate_fp(self, *, device_id: str, device_board: str, oaid: str) ‑> str

Generate an authentic device fingerprint.

async def login_with_app_password(self, account: str, password: str, *, encrypted: bool = False, port: int = 5000, geetest_solver: Optional[Callable[[SessionMMT], Awaitable[SessionMMTResult]]] = None) ‑> AppLoginResult

Login with a password via HoYoLab app endpoint.

Note that this will start a webserver if either of the following happens:

  1. Captcha is triggered and geetest_solver is not passed.
  2. Email verification is triggered (can happen if you first login with a new device).

Raises

  • AccountLoginFail: Invalid password provided.
  • AccountDoesNotExist: Invalid email/username.
  • VerificationCodeRateLimited: Too many verification code requests.
async def login_with_mobile_number(self, mobile: str, *, encrypted: bool = False, port: int = 5000) ‑> MobileLoginResult

Login with mobile number, returns cookies.

Only works for Chinese region (Miyoushe) users, do not include area code (+86) in the mobile number.

Steps: 1. Sends OTP to the provided mobile number. 2. If captcha is triggered, prompts the user to solve it. 3. Lets user enter the OTP. 4. Logs in with the OTP. 5. Returns cookies.

async def login_with_password(self, account: str, password: str, *, port: int = 5000, encrypted: bool = False, geetest_solver: Optional[Callable[[SessionMMT], Awaitable[SessionMMTResult]]] = None) ‑> Union[WebLoginResultCNWebLoginResult]

Login with a password via web endpoint.

Endpoint is chosen based on client region.

Note that this will start a webserver if captcha is triggered and geetest_solver is not passed.

Raises

  • AccountLoginFail: Invalid password provided.
  • AccountDoesNotExist: Invalid email/username.
async def login_with_qrcode(self) ‑> QRLoginResult

Login with QR code, only available for Miyoushe users.

async def os_game_login(self, account: str, password: str, *, encrypted: bool = False, port: int = 5000, geetest_solver: Optional[Callable[[RiskyCheckMMT], Awaitable[RiskyCheckMMTResult]]] = None) ‑> GameLoginResult

Perform a login to the game.

Raises

  • IncorrectGameAccount: Invalid account provided.
  • IncorrectGamePassword: Invalid password provided.
async def os_login_with_password(self, account: str, password: str, *, port: int = 5000, encrypted: bool = False, token_type: Optional[int] = 6, geetest_solver: Optional[Callable[[SessionMMT], Awaitable[SessionMMTResult]]] = None) ‑> WebLoginResult

Login with a password via web endpoint.

Note that this will start a webserver if captcha is triggered and geetest_solver is not passed.

Raises

  • AccountLoginFail: Invalid password provided.
  • AccountDoesNotExist: Invalid email/username.
async def verify_mmt(self, mmt_result: MMTResult) ‑> None

Verify a geetest challenge.

Inherited members