Module genshin.client.components.auth.server

Aiohttp webserver used for captcha solving and verification.

Functions

async def enter_code(*, port: int = 5000) ‑> str
Expand source code
async def enter_code(*, port: int = 5000) -> str:
    """Get email or phone number verification code from user."""
    return await launch_webapp("enter-code", port=port)

Get email or phone number verification code from user.

async def launch_webapp(page: "typing.Literal['captcha', 'enter-code']",
*,
mmt: typing.Optional[typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT]] = None,
lang: typing.Optional[str] = None,
api_server: typing.Optional[str] = None,
port: int = 5000) ‑> MMTResult | MMTv4Result | SessionMMTResult | SessionMMTv4Result | RiskyCheckMMTResult | str
Expand source code
async def launch_webapp(
    page: typing.Literal["captcha", "enter-code"],
    *,
    mmt: typing.Optional[typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT]] = None,
    lang: typing.Optional[str] = None,
    api_server: typing.Optional[str] = None,
    port: int = 5000,
) -> typing.Union[MMTResult, MMTv4Result, SessionMMTResult, SessionMMTv4Result, RiskyCheckMMTResult, str]:
    """Create and run a webapp to solve captcha or enter a verification code."""
    routes = web.RouteTableDef()
    future: asyncio.Future[typing.Any] = asyncio.Future()

    @routes.get("/")
    async def index(request: web.Request) -> web.StreamResponse:
        body = PAGES[page]
        body = body.replace("{gt_version}", "4" if isinstance(mmt, MMTv4) else "3")
        body = body.replace("{api_server}", api_server or "api-na.geetest.com")
        body = body.replace("{lang}", lang or "en")
        return web.Response(body=body, content_type="text/html")

    @routes.get("/gt/{version}.js")
    async def gt(request: web.Request) -> web.StreamResponse:
        version = request.match_info.get("version", "v3")
        gt_url = GT_V4_URL if version == "v4" else GT_V3_URL

        async with aiohttp.ClientSession() as session:
            r = await session.get(gt_url)
            content = await r.read()

        return web.Response(body=content, content_type="text/javascript")

    @routes.get("/mmt")
    async def mmt_endpoint(request: web.Request) -> web.Response:
        return web.json_response(mmt.model_dump() if mmt else {})

    @routes.post("/send-data")
    async def send_data_endpoint(request: web.Request) -> web.Response:
        result = await request.json()
        if "code" in result:
            result = result["code"]
        else:
            if isinstance(mmt, RiskyCheckMMT):
                result = RiskyCheckMMTResult(**result)
            elif isinstance(mmt, SessionMMT):
                result = SessionMMTResult(**result)
            elif isinstance(mmt, SessionMMTv4):
                result = SessionMMTv4Result(**result)
            elif isinstance(mmt, MMT):
                result = MMTResult(**result)
            elif isinstance(mmt, MMTv4):
                result = MMTv4Result(**result)

        future.set_result(result)
        return web.Response(status=204)

    app = web.Application()
    app.add_routes(routes)

    runner = web.AppRunner(app)
    await runner.setup()

    site = web.TCPSite(runner, host="localhost", port=port)
    print(f"Opening http://localhost:{port} in browser...")  # noqa
    webbrowser.open_new_tab(f"http://localhost:{port}")

    await site.start()

    try:
        data = await future
    finally:
        await asyncio.sleep(0.3)
        await runner.shutdown()
        await runner.cleanup()

    return data

Create and run a webapp to solve captcha or enter a verification code.

async def solve_geetest(mmt: typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT],
*,
lang: str = 'en-us',
api_server: str = 'api-na.geetest.com',
port: int = 5000) ‑> MMTResult | MMTv4Result | SessionMMTResult | SessionMMTv4Result | RiskyCheckMMTResult
Expand source code
async def solve_geetest(
    mmt: typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT],
    *,
    lang: str = "en-us",
    api_server: str = "api-na.geetest.com",
    port: int = 5000,
) -> typing.Union[MMTResult, MMTv4Result, SessionMMTResult, SessionMMTv4Result, RiskyCheckMMTResult]:
    """Start a web server and manually solve geetest captcha."""
    lang = auth_utility.lang_to_geetest_lang(lang)
    return await launch_webapp(
        "captcha",
        mmt=mmt,
        lang=lang,
        api_server=api_server,
        port=port,
    )

Start a web server and manually solve geetest captcha.