Module genshin.client.components.auth.server
Aiohttp webserver used for captcha solving and verification.
Functions
async def enter_code(*, port: int = 5000) ‑> str
-
Expand source code
async def enter_code(*, port: int = 5000) -> str: """Get email or phone number verification code from user.""" return await launch_webapp("enter-code", port=port)
Get email or phone number verification code from user.
async def launch_webapp(page: "typing.Literal['captcha', 'enter-code']",
*,
mmt: typing.Optional[typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT]] = None,
lang: typing.Optional[str] = None,
api_server: typing.Optional[str] = None,
port: int = 5000) ‑> MMTResult | MMTv4Result | SessionMMTResult | SessionMMTv4Result | RiskyCheckMMTResult | str-
Expand source code
async def launch_webapp( page: typing.Literal["captcha", "enter-code"], *, mmt: typing.Optional[typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT]] = None, lang: typing.Optional[str] = None, api_server: typing.Optional[str] = None, port: int = 5000, ) -> typing.Union[MMTResult, MMTv4Result, SessionMMTResult, SessionMMTv4Result, RiskyCheckMMTResult, str]: """Create and run a webapp to solve captcha or enter a verification code.""" routes = web.RouteTableDef() future: asyncio.Future[typing.Any] = asyncio.Future() @routes.get("/") async def index(request: web.Request) -> web.StreamResponse: body = PAGES[page] body = body.replace("{gt_version}", "4" if isinstance(mmt, MMTv4) else "3") body = body.replace("{api_server}", api_server or "api-na.geetest.com") body = body.replace("{lang}", lang or "en") return web.Response(body=body, content_type="text/html") @routes.get("/gt/{version}.js") async def gt(request: web.Request) -> web.StreamResponse: version = request.match_info.get("version", "v3") gt_url = GT_V4_URL if version == "v4" else GT_V3_URL async with aiohttp.ClientSession() as session: r = await session.get(gt_url) content = await r.read() return web.Response(body=content, content_type="text/javascript") @routes.get("/mmt") async def mmt_endpoint(request: web.Request) -> web.Response: return web.json_response(mmt.model_dump() if mmt else {}) @routes.post("/send-data") async def send_data_endpoint(request: web.Request) -> web.Response: result = await request.json() if "code" in result: result = result["code"] else: if isinstance(mmt, RiskyCheckMMT): result = RiskyCheckMMTResult(**result) elif isinstance(mmt, SessionMMT): result = SessionMMTResult(**result) elif isinstance(mmt, SessionMMTv4): result = SessionMMTv4Result(**result) elif isinstance(mmt, MMT): result = MMTResult(**result) elif isinstance(mmt, MMTv4): result = MMTv4Result(**result) future.set_result(result) return web.Response(status=204) app = web.Application() app.add_routes(routes) runner = web.AppRunner(app) await runner.setup() site = web.TCPSite(runner, host="localhost", port=port) print(f"Opening http://localhost:{port} in browser...") # noqa webbrowser.open_new_tab(f"http://localhost:{port}") await site.start() try: data = await future finally: await asyncio.sleep(0.3) await runner.shutdown() await runner.cleanup() return data
Create and run a webapp to solve captcha or enter a verification code.
async def solve_geetest(mmt: typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT],
*,
lang: str = 'en-us',
api_server: str = 'api-na.geetest.com',
port: int = 5000) ‑> MMTResult | MMTv4Result | SessionMMTResult | SessionMMTv4Result | RiskyCheckMMTResult-
Expand source code
async def solve_geetest( mmt: typing.Union[MMT, MMTv4, SessionMMT, SessionMMTv4, RiskyCheckMMT], *, lang: str = "en-us", api_server: str = "api-na.geetest.com", port: int = 5000, ) -> typing.Union[MMTResult, MMTv4Result, SessionMMTResult, SessionMMTv4Result, RiskyCheckMMTResult]: """Start a web server and manually solve geetest captcha.""" lang = auth_utility.lang_to_geetest_lang(lang) return await launch_webapp( "captcha", mmt=mmt, lang=lang, api_server=api_server, port=port, )
Start a web server and manually solve geetest captcha.