Module genshin.client.components.auth.subclients.web

Web sub client for AuthClient.

Covers HoYoLAB and Miyoushe web auth endpoints.

Classes

class WebAuthClient (cookies: ForwardRef('http.cookies.BaseCookie[Any]') | Mapping[Any, Any] | str | Sequence[ForwardRef('http.cookies.BaseCookie[Any]') | Mapping[Any, Any] | str] | None = None,
*,
authkey: str | None = None,
lang: str = 'en-us',
region: Region = Region.OVERSEAS,
proxy: str | None = None,
game: Game | None = None,
uid: int | None = None,
hoyolab_id: int | None = None,
device_id: str | None = None,
device_fp: str | None = None,
headers: Mapping[str, str] | Mapping[multidict._multidict.istr, str] | multidict._multidict.CIMultiDict | multidict._multidict.CIMultiDictProxy | Iterable[Tuple[str | multidict._multidict.istr, str]] | None = None,
cache: BaseCache | None = None,
debug: bool = False)
Expand source code
class WebAuthClient(base.BaseClient):
    """Web sub client for AuthClient."""

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: SessionMMTResult,
    ) -> WebLoginResult: ...

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, WebLoginResult]: ...

    @base.region_specific(types.Region.OVERSEAS)
    async def _os_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        token_type: typing.Optional[int] = 6,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, WebLoginResult]:
        """Login with a password using web endpoint.

        Returns either data from aigis header or cookies.
        """
        headers = {**auth_utility.WEB_LOGIN_HEADERS}
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 1),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 1),
            "token_type": token_type,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()
                cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        if data["data"].get("stoken"):
            cookies["stoken"] = data["data"]["stoken"]

        self.set_cookies(cookies)
        return WebLoginResult(**cookies)

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: SessionMMTResult,
    ) -> CNWebLoginResult: ...

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]: ...

    @base.region_specific(types.Region.CHINESE)
    async def _cn_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]:
        """
        Login with account and password using Miyoushe loginByPassword endpoint.

        Returns data from aigis header or cookies.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 2),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.CN_WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3102:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return CNWebLoginResult(**cookies)

    async def _send_mobile_otp(
        self,
        mobile: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[None, SessionMMTv4]:
        """Attempt to send OTP to the provided mobile number.

        May return aigis headers if captcha is triggered, None otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_OTP_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMTv4(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        return None

    async def _login_with_mobile_otp(self, mobile: str, otp: str, *, encrypted: bool = False) -> MobileLoginResult:
        """Login with OTP and mobile number.

        Returns cookies if OTP matches the one sent, raises an error otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
            "captcha": otp,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return MobileLoginResult(**cookies)

Web sub client for AuthClient.

Ancestors

Subclasses

Class variables

var logger : logging.Logger

Instance variables

var authkeys : dict[Game, str]
Expand source code
class WebAuthClient(base.BaseClient):
    """Web sub client for AuthClient."""

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: SessionMMTResult,
    ) -> WebLoginResult: ...

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, WebLoginResult]: ...

    @base.region_specific(types.Region.OVERSEAS)
    async def _os_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        token_type: typing.Optional[int] = 6,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, WebLoginResult]:
        """Login with a password using web endpoint.

        Returns either data from aigis header or cookies.
        """
        headers = {**auth_utility.WEB_LOGIN_HEADERS}
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 1),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 1),
            "token_type": token_type,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()
                cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        if data["data"].get("stoken"):
            cookies["stoken"] = data["data"]["stoken"]

        self.set_cookies(cookies)
        return WebLoginResult(**cookies)

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: SessionMMTResult,
    ) -> CNWebLoginResult: ...

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]: ...

    @base.region_specific(types.Region.CHINESE)
    async def _cn_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]:
        """
        Login with account and password using Miyoushe loginByPassword endpoint.

        Returns data from aigis header or cookies.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 2),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.CN_WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3102:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return CNWebLoginResult(**cookies)

    async def _send_mobile_otp(
        self,
        mobile: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[None, SessionMMTv4]:
        """Attempt to send OTP to the provided mobile number.

        May return aigis headers if captcha is triggered, None otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_OTP_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMTv4(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        return None

    async def _login_with_mobile_otp(self, mobile: str, otp: str, *, encrypted: bool = False) -> MobileLoginResult:
        """Login with OTP and mobile number.

        Returns cookies if OTP matches the one sent, raises an error otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
            "captcha": otp,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return MobileLoginResult(**cookies)
var cacheBaseCache
Expand source code
class WebAuthClient(base.BaseClient):
    """Web sub client for AuthClient."""

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: SessionMMTResult,
    ) -> WebLoginResult: ...

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, WebLoginResult]: ...

    @base.region_specific(types.Region.OVERSEAS)
    async def _os_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        token_type: typing.Optional[int] = 6,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, WebLoginResult]:
        """Login with a password using web endpoint.

        Returns either data from aigis header or cookies.
        """
        headers = {**auth_utility.WEB_LOGIN_HEADERS}
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 1),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 1),
            "token_type": token_type,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()
                cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        if data["data"].get("stoken"):
            cookies["stoken"] = data["data"]["stoken"]

        self.set_cookies(cookies)
        return WebLoginResult(**cookies)

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: SessionMMTResult,
    ) -> CNWebLoginResult: ...

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]: ...

    @base.region_specific(types.Region.CHINESE)
    async def _cn_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]:
        """
        Login with account and password using Miyoushe loginByPassword endpoint.

        Returns data from aigis header or cookies.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 2),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.CN_WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3102:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return CNWebLoginResult(**cookies)

    async def _send_mobile_otp(
        self,
        mobile: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[None, SessionMMTv4]:
        """Attempt to send OTP to the provided mobile number.

        May return aigis headers if captcha is triggered, None otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_OTP_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMTv4(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        return None

    async def _login_with_mobile_otp(self, mobile: str, otp: str, *, encrypted: bool = False) -> MobileLoginResult:
        """Login with OTP and mobile number.

        Returns cookies if OTP matches the one sent, raises an error otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
            "captcha": otp,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return MobileLoginResult(**cookies)
var cookie_managerBaseCookieManager
Expand source code
class WebAuthClient(base.BaseClient):
    """Web sub client for AuthClient."""

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: SessionMMTResult,
    ) -> WebLoginResult: ...

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, WebLoginResult]: ...

    @base.region_specific(types.Region.OVERSEAS)
    async def _os_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        token_type: typing.Optional[int] = 6,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, WebLoginResult]:
        """Login with a password using web endpoint.

        Returns either data from aigis header or cookies.
        """
        headers = {**auth_utility.WEB_LOGIN_HEADERS}
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 1),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 1),
            "token_type": token_type,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()
                cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        if data["data"].get("stoken"):
            cookies["stoken"] = data["data"]["stoken"]

        self.set_cookies(cookies)
        return WebLoginResult(**cookies)

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: SessionMMTResult,
    ) -> CNWebLoginResult: ...

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]: ...

    @base.region_specific(types.Region.CHINESE)
    async def _cn_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]:
        """
        Login with account and password using Miyoushe loginByPassword endpoint.

        Returns data from aigis header or cookies.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 2),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.CN_WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3102:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return CNWebLoginResult(**cookies)

    async def _send_mobile_otp(
        self,
        mobile: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[None, SessionMMTv4]:
        """Attempt to send OTP to the provided mobile number.

        May return aigis headers if captcha is triggered, None otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_OTP_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMTv4(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        return None

    async def _login_with_mobile_otp(self, mobile: str, otp: str, *, encrypted: bool = False) -> MobileLoginResult:
        """Login with OTP and mobile number.

        Returns cookies if OTP matches the one sent, raises an error otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
            "captcha": otp,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return MobileLoginResult(**cookies)
var custom_headers : multidict._multidict.CIMultiDict[str]
Expand source code
class WebAuthClient(base.BaseClient):
    """Web sub client for AuthClient."""

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: SessionMMTResult,
    ) -> WebLoginResult: ...

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, WebLoginResult]: ...

    @base.region_specific(types.Region.OVERSEAS)
    async def _os_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        token_type: typing.Optional[int] = 6,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, WebLoginResult]:
        """Login with a password using web endpoint.

        Returns either data from aigis header or cookies.
        """
        headers = {**auth_utility.WEB_LOGIN_HEADERS}
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 1),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 1),
            "token_type": token_type,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()
                cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        if data["data"].get("stoken"):
            cookies["stoken"] = data["data"]["stoken"]

        self.set_cookies(cookies)
        return WebLoginResult(**cookies)

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: SessionMMTResult,
    ) -> CNWebLoginResult: ...

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]: ...

    @base.region_specific(types.Region.CHINESE)
    async def _cn_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]:
        """
        Login with account and password using Miyoushe loginByPassword endpoint.

        Returns data from aigis header or cookies.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 2),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.CN_WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3102:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return CNWebLoginResult(**cookies)

    async def _send_mobile_otp(
        self,
        mobile: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[None, SessionMMTv4]:
        """Attempt to send OTP to the provided mobile number.

        May return aigis headers if captcha is triggered, None otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_OTP_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMTv4(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        return None

    async def _login_with_mobile_otp(self, mobile: str, otp: str, *, encrypted: bool = False) -> MobileLoginResult:
        """Login with OTP and mobile number.

        Returns cookies if OTP matches the one sent, raises an error otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
            "captcha": otp,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return MobileLoginResult(**cookies)
var uids : dict[Game, int]
Expand source code
class WebAuthClient(base.BaseClient):
    """Web sub client for AuthClient."""

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: SessionMMTResult,
    ) -> WebLoginResult: ...

    @typing.overload
    async def _os_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        token_type: typing.Optional[int] = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, WebLoginResult]: ...

    @base.region_specific(types.Region.OVERSEAS)
    async def _os_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        token_type: typing.Optional[int] = 6,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, WebLoginResult]:
        """Login with a password using web endpoint.

        Returns either data from aigis header or cookies.
        """
        headers = {**auth_utility.WEB_LOGIN_HEADERS}
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 1),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 1),
            "token_type": token_type,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()
                cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        if data["data"].get("stoken"):
            cookies["stoken"] = data["data"]["stoken"]

        self.set_cookies(cookies)
        return WebLoginResult(**cookies)

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: SessionMMTResult,
    ) -> CNWebLoginResult: ...

    @typing.overload
    async def _cn_web_login(  # noqa: D102 missing docstring in overload?
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = ...,
        mmt_result: None = ...,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]: ...

    @base.region_specific(types.Region.CHINESE)
    async def _cn_web_login(
        self,
        account: str,
        password: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[SessionMMT, CNWebLoginResult]:
        """
        Login with account and password using Miyoushe loginByPassword endpoint.

        Returns data from aigis header or cookies.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "account": account if encrypted else auth_utility.encrypt_credentials(account, 2),
            "password": password if encrypted else auth_utility.encrypt_credentials(password, 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.CN_WEB_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3102:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMT(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return CNWebLoginResult(**cookies)

    async def _send_mobile_otp(
        self,
        mobile: str,
        *,
        encrypted: bool = False,
        mmt_result: typing.Optional[SessionMMTResult] = None,
    ) -> typing.Union[None, SessionMMTv4]:
        """Attempt to send OTP to the provided mobile number.

        May return aigis headers if captcha is triggered, None otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }
        if mmt_result:
            headers["x-rpc-aigis"] = mmt_result.to_aigis_header()

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_OTP_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if data["retcode"] == -3101:
            # Captcha triggered
            aigis = json.loads(r.headers["x-rpc-aigis"])
            return SessionMMTv4(**aigis)

        if not data["data"]:
            errors.raise_for_retcode(data)

        return None

    async def _login_with_mobile_otp(self, mobile: str, otp: str, *, encrypted: bool = False) -> MobileLoginResult:
        """Login with OTP and mobile number.

        Returns cookies if OTP matches the one sent, raises an error otherwise.
        """
        headers = {
            **auth_utility.CN_LOGIN_HEADERS,
            "ds": ds_utility.generate_dynamic_secret(constants.DS_SALT["cn_signin"]),
        }

        payload = {
            "mobile": mobile if encrypted else auth_utility.encrypt_credentials(mobile, 2),
            "area_code": auth_utility.encrypt_credentials("+86", 2),
            "captcha": otp,
        }

        async with self.cookie_manager.create_session() as session:
            async with session.post(
                routes.MOBILE_LOGIN_URL.get_url(),
                json=payload,
                headers=headers,
            ) as r:
                data = await r.json()

        if not data["data"]:
            errors.raise_for_retcode(data)

        cookies = {cookie.key: cookie.value for cookie in r.cookies.values()}
        self.set_cookies(cookies)

        return MobileLoginResult(**cookies)

Inherited members