import json import os import re import sys import time import uuid import random import string import secrets import hashlib import base64 import argparse from pathlib import Path from datetime import datetime, timedelta from dataclasses import dataclass from typing import Any, Dict, Optional, List import urllib.parse import urllib.request import urllib.error import asyncio import requests as py_requests try: import aiohttp except ImportError: aiohttp = None from curl_cffi import requests OUT_DIR = Path(__file__).parent.resolve() UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36" AUTH_URL = "https://auth.openai.com/oauth/authorize" TOKEN_URL = "https://auth.openai.com/oauth/token" CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann" DEFAULT_REDIRECT_URI = "http://localhost:1455/auth/callback" DEFAULT_SCOPE = "openid email profile offline_access" # ========== 临时邮箱提供商:GPTMail + TempMail.lol ========== class GPTMailClient: def __init__(self, proxies: Any = None): self.session = requests.Session(proxies=proxies, impersonate="chrome") self.session.headers.update({ "User-Agent": UA, "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9", "Referer": "https://mail.chatgpt.org.uk/", }) self.base_url = "https://mail.chatgpt.org.uk" def _init_browser_session(self): try: resp = self.session.get(self.base_url, timeout=15) gm_sid = self.session.cookies.get("gm_sid") if gm_sid: self.session.headers.update({"Cookie": f"gm_sid={gm_sid}"}) token_match = re.search(r'(eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+)', resp.text) if token_match: self.session.headers.update({"x-inbox-token": token_match.group(1)}) except Exception: pass def generate_email(self) -> str: self._init_browser_session() resp = self.session.get(f"{self.base_url}/api/generate-email", timeout=15) if resp.status_code == 200: data = resp.json() email = data["data"]["email"] self.session.headers.update({"x-inbox-token": data["auth"]["token"]}) print(f"[+] 生成邮箱: {email} (GPTMail)") print("[*] 自动轮询已启动(GPTMail 会话已准备)") return email raise RuntimeError(f"GPTMail 生成失败: {resp.status_code}") def list_emails(self, email: str) -> List[Dict[str, Any]]: encoded_email = urllib.parse.quote(email) resp = self.session.get(f"{self.base_url}/api/emails?email={encoded_email}", timeout=15) if resp.status_code == 200: return resp.json().get("data", {}).get("emails", []) return [] class Message: def __init__(self, data: dict): self.from_addr = data.get("from", "") self.subject = data.get("subject", "") self.body = data.get("body", "") or "" self.html_body = data.get("html", "") or "" class EMail: def __init__(self, proxies: Any = None): self.s = requests.Session(proxies=proxies, impersonate="chrome") self.s.headers.update({ "User-Agent": UA, "Accept": "application/json", "Content-Type": "application/json", }) r = self.s.post("https://api.tempmail.lol/v2/inbox/create", json={}, timeout=15) r.raise_for_status() data = r.json() self.address = data["address"] self.token = data["token"] print(f"[+] 生成邮箱: {self.address} (TempMail.lol)") print("[*] 自动轮询已启动(token 已保存)") def _get_messages(self) -> List[Dict[str, Any]]: r = self.s.get(f"https://api.tempmail.lol/v2/inbox?token={self.token}", timeout=15) r.raise_for_status() return r.json().get("emails", []) class PMailClient: def __init__(self, proxies: Any = None): self.session = requests.Session(proxies=proxies, impersonate="chrome") self.session.headers.update({ "Accept": "application/json, text/plain, */*", "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6", "Cache-Control": "no-cache", "Content-Type": "application/json;charset=UTF-8;", "Lang": "zhCn", "Origin": "https://pmail.030208.xyz", "Pragma": "no-cache", "Priority": "u=1, i", "Referer": "https://pmail.030208.xyz/", "Sec-Ch-Ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Microsoft"', "Sec-Ch-Ua-Mobile": "?0", "Sec-Ch-Ua-Platform": '"Windows"', "Sec-Fetch-Dest": "empty", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Site": "same-origin", "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0", "Cookie": "session=beG8AdZzfjgHWPwR0mrgoWGvjMU30zwPgvfNgT31sSk" }) def list_emails(self, target_email: str) -> List[Dict[str, Any]]: url = "https://pmail.030208.xyz/api/email/list" payload = {"tag": {"type": 0, "status": -1, "group_id": 0}, "page_size": 10} try: resp = self.session.post(url, json=payload, timeout=15) if resp.status_code == 200: data = resp.json() emails_list = data.get("data", {}).get("list", []) results = [] for item in emails_list[:3]: to_arr = item.get("to", []) if to_arr and len(to_arr) > 0: addr = to_arr[0].get("EmailAddress", "") if addr == target_email: results.append({ "subject": item.get("title", ""), "txt": item.get("title", ""), "from": addr }) return results except Exception: pass return [] def get_email_and_code_fetcher(proxies: Any = None, provider: str = "auto"): provider = (provider or "auto").strip().lower() if provider not in {"auto", "gptmail", "tempmail", "pmail"}: raise ValueError(f"不支持的邮箱提供商: {provider}") def _build_tempmail_bundle(): inbox = EMail(proxies) email = inbox.address def _extract_all_codes() -> List[str]: results: List[str] = [] try: msgs = inbox._get_messages() for msg_data in msgs: msg = Message(msg_data) body = msg.body or msg.html_body or msg.subject or "" results.extend(re.findall(r"\b(\d{6})\b", body)) except Exception: pass return results def fetch_code(timeout_sec: int = 180, poll: float = 6.0, exclude_codes: Optional[List[str]] = None) -> str | None: exclude = set(exclude_codes or []) start = time.monotonic() attempt = 0 while time.monotonic() - start < timeout_sec: attempt += 1 try: msgs = inbox._get_messages() print(f"[otp][tempmail] 轮询 #{attempt}, 收到 {len(msgs)} 封邮件, 目标: {email}") for msg_data in msgs: msg = Message(msg_data) body = msg.body or msg.html_body or msg.subject or "" for code in re.findall(r"\b(\d{6})\b", body): if code not in exclude: return code except Exception: pass time.sleep(poll) return None return email, _gen_password(), fetch_code, _extract_all_codes, "tempmail" def _build_gptmail_bundle(): client = GPTMailClient(proxies) email = client.generate_email() def _extract_all_codes() -> List[str]: regex = r"(? str | None: exclude = set(exclude_codes or []) start = time.monotonic() attempt = 0 while time.monotonic() - start < timeout_sec: attempt += 1 try: summaries = client.list_emails(email) print(f"[otp][gptmail] 轮询 #{attempt}, 收到 {len(summaries)} 封邮件, 目标: {email}") for s in summaries: body = " ".join([ str(s.get("subject", "") or ""), str(s.get("text", "") or ""), str(s.get("body", "") or ""), str(s.get("html", "") or ""), json.dumps(s, ensure_ascii=False), ]) for code in re.findall(r"(? List[str]: results: List[str] = [] try: summaries = client.list_emails(email) for s in summaries: body = f"{s.get('subject', '')} {s.get('txt', '')}" results.extend(re.findall(r"(? str | None: exclude = set(exclude_codes or []) start = time.monotonic() attempt = 0 while time.monotonic() - start < timeout_sec: attempt += 1 try: summaries = client.list_emails(email) print(f"[otp][pmail] 轮询 #{attempt}, 收到 {len(summaries)} 封目标邮件, 目标: {email}") for s in summaries: body = f"{s.get('subject', '')} {s.get('txt', '')}" for code in re.findall(r"(? str: alphabet = string.ascii_letters + string.digits special = "!@#$%^&*.-" base = [random.choice(string.ascii_lowercase), random.choice(string.ascii_uppercase), random.choice(string.digits), random.choice(special)] base += [random.choice(alphabet + special) for _ in range(12)] random.shuffle(base) return "".join(base) def _random_name() -> str: return ''.join(random.choice(string.ascii_lowercase) for _ in range(7)).capitalize() def _random_birthdate() -> str: start = datetime(1975, 1, 1); end = datetime(1999, 12, 31) d = start + timedelta(days=random.randrange((end - start).days + 1)) return d.strftime('%Y-%m-%d') def _b64url_no_pad(raw: bytes) -> str: return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=") def _sha256_b64url_no_pad(s: str) -> str: return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest()) def _pkce_verifier() -> str: return secrets.token_urlsafe(64) def _parse_callback_url(callback_url: str) -> Dict[str, Any]: candidate = (callback_url or "").strip() if not candidate: return {"code": "", "state": "", "error": "", "error_description": ""} if "://" not in candidate: if candidate.startswith("?"): candidate = f"http://localhost{candidate}" elif any(ch in candidate for ch in "/?#") or ":" in candidate: candidate = f"http://{candidate}" elif "=" in candidate: candidate = f"http://localhost/?{candidate}" parsed = urllib.parse.urlparse(candidate) query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True) fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True) for key, values in fragment.items(): if key not in query or not query[key] or not (query[key][0] or "").strip(): query[key] = values def get1(k: str) -> str: return (query.get(k, [""])[0] or "").strip() code = get1("code") state = get1("state") error = get1("error") error_description = get1("error_description") if code and not state and "#" in code: code, state = code.split("#", 1) if not error and error_description: error, error_description = error_description, "" return {"code": code, "state": state, "error": error, "error_description": error_description} def _decode_jwt_segment(seg: str) -> Dict[str, Any]: try: pad = "=" * ((4 - (len(seg) % 4)) % 4) return json.loads(base64.urlsafe_b64decode((seg + pad).encode("ascii")).decode("utf-8")) except Exception: return {} def _jwt_claims_no_verify(token: str) -> Dict[str, Any]: if not token or token.count(".") < 2: return {} return _decode_jwt_segment(token.split(".")[1]) def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]: body = urllib.parse.urlencode(data).encode("utf-8") req = urllib.request.Request( url, data=body, method="POST", headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"}, ) try: with urllib.request.urlopen(req, timeout=timeout) as resp: raw = resp.read() if resp.status != 200: raise RuntimeError(f"Token 交换失败: {resp.status}: {raw.decode('utf-8', 'replace')}") return json.loads(raw.decode("utf-8")) except urllib.error.HTTPError as exc: raw = exc.read() raise RuntimeError(f"Token 交换失败: {exc.code}: {raw.decode('utf-8', 'replace')}") from exc def _to_int(v: Any) -> int: try: return int(v) except (TypeError, ValueError): return 0 def _build_sentinel_payload(session, did: str, flow: str) -> str: req_body = json.dumps({"p": "", "id": did, "flow": flow}) resp = session.post( "https://sentinel.openai.com/backend-api/sentinel/req", headers={ "origin": "https://sentinel.openai.com", "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6", "content-type": "text/plain;charset=UTF-8", }, data=req_body, timeout=15, ) if resp.status_code != 200: raise RuntimeError(f"Sentinel 验证失败: {resp.status_code}: {resp.text[:200]}") token = (resp.json() or {}).get("token", "") return json.dumps({"p": "", "t": "", "c": token, "id": did, "flow": flow}) @dataclass(frozen=True) class OAuthStart: auth_url: str; state: str; code_verifier: str; redirect_uri: str def generate_oauth_url(redirect_uri: str = DEFAULT_REDIRECT_URI) -> OAuthStart: state = secrets.token_urlsafe(16) verifier = _pkce_verifier() challenge = _sha256_b64url_no_pad(verifier) params = { "client_id": CLIENT_ID, "response_type": "code", "redirect_uri": redirect_uri, "scope": DEFAULT_SCOPE, "state": state, "code_challenge": challenge, "code_challenge_method": "S256", "prompt": "login", "id_token_add_organizations": "true", "codex_cli_simplified_flow": "true", } return OAuthStart(f"{AUTH_URL}?{urllib.parse.urlencode(params)}", state, verifier, redirect_uri) def fetch_sentinel_token(flow: str, did: str, proxies: Any = None) -> Optional[str]: try: session = requests.Session(proxies=proxies, impersonate="chrome") payload = _build_sentinel_payload(session, did, flow) return (json.loads(payload) or {}).get("c") except Exception: return None def submit_callback_url(callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str, session=None) -> str: cb = _parse_callback_url(callback_url) if cb.get("error"): raise RuntimeError(f"OAuth 错误: {cb['error']}: {cb.get('error_description', '')}".strip()) if not cb.get("code"): raise ValueError("Callback URL 缺少 ?code=") if not cb.get("state"): raise ValueError("Callback URL 缺少 ?state=") if cb.get("state") != expected_state: raise ValueError("State 校验不匹配") token_data = { "grant_type": "authorization_code", "client_id": CLIENT_ID, "code": cb["code"], "redirect_uri": redirect_uri, "code_verifier": code_verifier, } if session is not None: resp = session.post( TOKEN_URL, data=token_data, headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"}, timeout=30, ) if resp.status_code != 200: raise RuntimeError(f"Token 交换失败: {resp.status_code}: {resp.text[:200]}") token_resp = resp.json() else: token_resp = _post_form(TOKEN_URL, token_data) access_token = str(token_resp.get("access_token") or "").strip() refresh_token = str(token_resp.get("refresh_token") or "").strip() id_token = str(token_resp.get("id_token") or "").strip() expires_in = _to_int(token_resp.get("expires_in")) claims = _jwt_claims_no_verify(id_token) auth_claims = claims.get("https://api.openai.com/auth") or {} now = int(time.time()) config = { "id_token": id_token, "access_token": access_token, "refresh_token": refresh_token, "account_id": str(auth_claims.get("chatgpt_account_id") or "").strip(), "last_refresh": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)), "email": str(claims.get("email") or "").strip(), "type": "codex", "expired": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))), } return json.dumps(config, ensure_ascii=False, indent=2) # ========== 轻量版 CPA 维护实现(内嵌,不依赖项目包) ========== DEFAULT_MGMT_UA = "codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal" def _mgmt_headers(token: str) -> dict: clean = str(token or "").strip() if clean and not clean.lower().startswith("bearer "): clean = f"Bearer {clean}" return {"Authorization": clean, "Accept": "application/json"} def _join_mgmt_url(base_url: str, path: str) -> str: base = (base_url or "").rstrip("/") suffix = path if path.startswith("/") else f"/{path}" if base.endswith("/v0"): return f"{base}{suffix}" return f"{base}/v0{suffix}" def _safe_json(text: str): try: return json.loads(text) except Exception: return {} def _extract_account_id(item: dict): for key in ("chatgpt_account_id", "chatgptAccountId", "account_id", "accountId"): val = item.get(key) if val: return str(val) return None def _get_item_type(item: dict) -> str: return str(item.get("type") or item.get("typo") or "") class MiniPoolMaintainer: def __init__(self, base_url: str, token: str, target_type: str = "codex", used_percent_threshold: int = 95, user_agent: str = DEFAULT_MGMT_UA): self.base_url = (base_url or "").rstrip("/") self.token = token self.target_type = target_type self.used_percent_threshold = used_percent_threshold self.user_agent = user_agent def upload_token(self, filename: str, token_data: dict, proxy: str = "") -> bool: if not self.base_url or not self.token: return False content = json.dumps(token_data, ensure_ascii=False).encode("utf-8") files = {"file": (filename, content, "application/json")} headers = {"Authorization": f"Bearer {self.token}"} proxies = {"http": proxy, "https": proxy} if proxy else None for attempt in range(3): try: resp = py_requests.post(_join_mgmt_url(self.base_url, "/management/auth-files"), files=files, headers=headers, timeout=30, verify=False, proxies=proxies) if resp.status_code in (200, 201, 204): return True except Exception: pass if attempt < 2: time.sleep(2 ** attempt) return False def fetch_auth_files(self, timeout: int = 15): resp = py_requests.get(_join_mgmt_url(self.base_url, "/management/auth-files"), headers=_mgmt_headers(self.token), timeout=timeout) resp.raise_for_status() data = resp.json() return (data.get("files") if isinstance(data, dict) else []) or [] async def probe_and_clean_async(self, workers: int = 20, timeout: int = 10, retries: int = 1): if aiohttp is None: raise RuntimeError("需要安装 aiohttp: pip install aiohttp") files = self.fetch_auth_files(timeout) candidates = [f for f in files if _get_item_type(f).lower() == self.target_type.lower()] if not candidates: return {"total": len(files), "candidates": 0, "invalid_count": 0, "deleted_ok": 0, "deleted_fail": 0} semaphore = asyncio.Semaphore(max(1, workers)) connector = aiohttp.TCPConnector(limit=max(1, workers)) client_timeout = aiohttp.ClientTimeout(total=max(1, timeout)) async def probe_one(session, item): auth_index = item.get("auth_index") name = item.get("name") or item.get("id") res = {"name": name, "auth_index": auth_index, "invalid_401": False, "invalid_used_percent": False, "used_percent": None} if not auth_index: res["invalid_401"] = False return res account_id = _extract_account_id(item) header = {"Authorization": "Bearer $TOKEN$", "Content-Type": "application/json", "User-Agent": self.user_agent} if account_id: header["Chatgpt-Account-Id"] = account_id payload = {"authIndex": auth_index, "method": "GET", "url": "https://chatgpt.com/backend-api/wham/usage", "header": header} for attempt in range(retries + 1): try: async with semaphore: async with session.post(_join_mgmt_url(self.base_url, "/management/api-call"), headers={**_mgmt_headers(self.token), "Content-Type": "application/json"}, json=payload, timeout=timeout) as resp: text = await resp.text() if resp.status >= 400: raise RuntimeError(f"HTTP {resp.status}: {text[:200]}") data = _safe_json(text) sc = data.get("status_code") res["invalid_401"] = sc == 401 if sc == 200: body = _safe_json(data.get("body", "")) used_pct = (body.get("rate_limit", {}).get("primary_window", {}).get("used_percent")) if used_pct is not None: res["used_percent"] = used_pct res["invalid_used_percent"] = used_pct >= self.used_percent_threshold return res except Exception as e: if attempt >= retries: res["error"] = str(e) return res return res async def delete_one(session, name: str): if not name: return False from urllib.parse import quote encoded = quote(name, safe="") try: async with semaphore: async with session.delete(f"{_join_mgmt_url(self.base_url, '/management/auth-files')}?name={encoded}", headers=_mgmt_headers(self.token), timeout=timeout) as resp: text = await resp.text() data = _safe_json(text) return resp.status == 200 and data.get("status") == "ok" except Exception: return False invalid_list = [] async with aiohttp.ClientSession(connector=connector, timeout=client_timeout, trust_env=True) as session: tasks = [asyncio.create_task(probe_one(session, item)) for item in candidates] for task in asyncio.as_completed(tasks): r = await task if r.get("invalid_401") or r.get("invalid_used_percent"): invalid_list.append(r) delete_tasks = [asyncio.create_task(delete_one(session, r.get("name"))) for r in invalid_list if r.get("name")] deleted_ok = 0 deleted_fail = 0 for task in asyncio.as_completed(delete_tasks): if await task: deleted_ok += 1 else: deleted_fail += 1 return { "total": len(files), "candidates": len(candidates), "invalid_count": len(invalid_list), "deleted_ok": deleted_ok, "deleted_fail": deleted_fail, } def probe_and_clean_sync(self, workers: int = 20, timeout: int = 10, retries: int = 1): return asyncio.run(self.probe_and_clean_async(workers, timeout, retries)) def _build_cpa_maintainer(args): base_url = (args.cpa_base_url or os.getenv("CPA_BASE_URL") or "").strip() token = (args.cpa_token or os.getenv("CPA_TOKEN") or "").strip() if not base_url or not token: print("[CPA] 未提供 cpa_base_url / cpa_token,跳过 CPA 上传/清理") return None try: return MiniPoolMaintainer( base_url, token, target_type="codex", used_percent_threshold=args.cpa_used_threshold, user_agent=DEFAULT_MGMT_UA, ) except Exception as e: print(f"[CPA] 创建维护器失败: {e}") return None def _upload_token_to_cpa(pm, token_json: str, email: str, proxy: str = "") -> bool: if not pm: return False try: data = json.loads(token_json) except Exception as e: print(f"[CPA] 解析 token_json 失败: {e}") return False fname_email = email.replace("@", "_") filename = f"token_{fname_email}_{int(time.time())}.json" ok = pm.upload_token(filename=filename, token_data=data, proxy=proxy or "") if ok: print(f"[CPA] 已上传 {filename} 到 CPA") else: print("[CPA] 上传失败") return ok def _clean_invalid_in_cpa(pm, args): if not pm: return None try: res = pm.probe_and_clean_sync( workers=max(1, args.cpa_workers), timeout=max(5, args.cpa_timeout), retries=max(0, args.cpa_retries), ) print( f"[CPA] 清理完成: total={res.get('total')} candidates={res.get('candidates')} " f"invalid={res.get('invalid_count')} deleted_ok={res.get('deleted_ok')} deleted_fail={res.get('deleted_fail')}" ) return res except Exception as e: print(f"[CPA] 清理失败: {e}") return None def _count_valid_cpa_tokens(pm, args): if not pm: return 0 try: files = pm.fetch_auth_files(timeout=max(5, args.cpa_timeout)) target = pm.target_type.lower() valid = [f for f in files if _get_item_type(f).lower() == target] return len(valid) except Exception as e: print(f"[CPA] 统计 token 失败: {e}") return 0 # 账号行清理:上传成功且开启 prune_local 后使用 # 安全处理:文件不存在直接返回,写入保持末尾换行便于追加 def _remove_account_entry(accounts_path: Path, email: str, real_pwd: str): if not accounts_path.exists(): return try: lines = accounts_path.read_text(encoding="utf-8").splitlines() target = f"{email}----{real_pwd}" kept = [ln for ln in lines if ln.strip() != target] accounts_path.write_text("\n".join(kept) + ("\n" if kept else ""), encoding="utf-8") print(f"[本地清理] 已从 accounts.txt 移除: {email}") except Exception as e: print(f"[本地清理] 移除账号行失败: {e}") # ========== 主注册流程 (恢复详细日志与异常捕获) ========== def run(proxy: Optional[str], mail_provider: str = "auto"): proxies = {"http": proxy, "https": proxy} if proxy else None s = requests.Session(proxies=proxies, impersonate="chrome") s.headers.update({ "user-agent": UA, "accept": "application/json, text/plain, */*", }) print(f"\n{'='*20} 开启注册流程 {'='*20}") try: print(f"[步骤1] 正在初始化临时邮箱(provider={mail_provider})...") email, password, code_fetcher, extract_all_codes, actual_mail_provider = get_email_and_code_fetcher(proxies, provider=mail_provider) print(f"[*] 当前邮箱提供商: {actual_mail_provider}") if not email: print("[失败] 未能获取邮箱") return None print(f"[成功] 邮箱: {email} | 临时密码: {password}") print("[步骤2] 访问 OpenAI 授权页获取 Device ID...") oauth = generate_oauth_url() auth_page = s.get(oauth.auth_url, timeout=15) did = s.cookies.get("oai-did") if not did: print("[失败] 未能从 Cookie 获取 oai-did") return None print(f"[成功] Device ID: {did}") print("[步骤3] 获取 Sentinel 载荷并提交注册邮箱...") try: authorize_continue_sentinel = _build_sentinel_payload(s, did, "authorize_continue") except Exception as e: print(f"[失败] 获取 authorize_continue Sentinel 失败: {e}") return None continue_url = "" try: auth_json = auth_page.json() if hasattr(auth_page, "json") else {} continue_url = str((auth_json or {}).get("continue_url") or "").strip() except Exception: continue_url = "" if continue_url: try: s.get(continue_url, timeout=15) except Exception: pass signup_res = s.post( "https://auth.openai.com/api/accounts/authorize/continue", headers={ "referer": "https://auth.openai.com/create-account", "accept": "application/json", "content-type": "application/json", "openai-sentinel-token": authorize_continue_sentinel, }, data=json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "signup"}), timeout=15, ) print(f"[日志] 邮箱提交状态: {signup_res.status_code}") if signup_res.status_code != 200: print(f"[失败] 邮箱提交失败: {signup_res.text[:200]}") return None print("[步骤4] 设置账户密码...") pwd_res = s.post( "https://auth.openai.com/api/accounts/user/register", headers={ "referer": "https://auth.openai.com/create-account/password", "accept": "application/json", "content-type": "application/json", }, json={"password": password, "username": email}, timeout=15, ) print(f"[日志] 密码设置状态: {pwd_res.status_code}") if pwd_res.status_code != 200: print(f"[失败] 密码设置失败: {pwd_res.text[:200]}") return None print("[步骤5] 触发 OpenAI 发送验证邮件...") s.get("https://auth.openai.com/create-account/password", timeout=15) otp_send_res = s.get( "https://auth.openai.com/api/accounts/email-otp/send", headers={"referer": "https://auth.openai.com/create-account/password", "accept": "application/json"}, timeout=15, ) print(f"[日志] 发送指令状态: {otp_send_res.status_code}") if otp_send_res.status_code != 200: print(f"[失败] 发送验证码失败: {otp_send_res.text[:200]}") return None print("[步骤6] 等待邮箱接收 6 位验证码...") code = code_fetcher() if not code: print("[失败] 邮箱长时间未收到验证码") return None print(f"[成功] 捕获验证码: {code}") print("[步骤7] 提交验证码至 OpenAI...") val_res = s.post( "https://auth.openai.com/api/accounts/email-otp/validate", headers={ "referer": "https://auth.openai.com/email-verification", "accept": "application/json", "content-type": "application/json", }, json={"code": code}, timeout=15, ) print(f"[日志] 验证码校验状态: {val_res.status_code}") if val_res.status_code != 200: print(f"[失败] 验证码校验失败: {val_res.text[:200]}") return None print("[步骤8] 完善账户基本信息...") try: create_account_sentinel = _build_sentinel_payload(s, did, "authorize_continue") except Exception as e: print(f"[失败] 获取 create_account Sentinel 失败: {e}") return None acc_res = s.post( "https://auth.openai.com/api/accounts/create_account", headers={ "referer": "https://auth.openai.com/about-you", "accept": "application/json", "content-type": "application/json", "openai-sentinel-token": create_account_sentinel, }, data=json.dumps({"name": _random_name(), "birthdate": _random_birthdate()}), timeout=15, ) print(f"[日志] 账户创建状态: {acc_res.status_code}") if acc_res.status_code != 200: print(f"[失败] 账户创建失败: {acc_res.text[:200]}") return None print("[步骤9] 注册完成,重新走登录流程获取 Workspace / Token...") first_code = code for login_attempt in range(3): try: print(f"[*] 正在通过登录流程获取 Token...{f' (重试 {login_attempt}/3)' if login_attempt else ''}") s2 = requests.Session(proxies=proxies, impersonate="chrome") oauth2 = generate_oauth_url() s2.get(oauth2.auth_url, timeout=15) did2 = s2.cookies.get("oai-did") if not did2: print("[失败] 登录会话未能获取 oai-did") continue lc = s2.post( "https://auth.openai.com/api/accounts/authorize/continue", headers={ "referer": "https://auth.openai.com/log-in", "accept": "application/json", "content-type": "application/json", "openai-sentinel-token": _build_sentinel_payload(s2, did2, "authorize_continue"), }, data=json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "login"}), timeout=15, ) print(f"[日志] 登录邮箱提交状态: {lc.status_code}") if lc.status_code != 200: print(f"[失败] 登录邮箱提交失败: {lc.text[:200]}") continue s2.get(str((lc.json() or {}).get("continue_url") or ""), timeout=15) pw = s2.post( "https://auth.openai.com/api/accounts/password/verify", headers={ "referer": "https://auth.openai.com/log-in/password", "accept": "application/json", "content-type": "application/json", "openai-sentinel-token": _build_sentinel_payload(s2, did2, "authorize_continue"), }, json={"password": password}, timeout=15, ) print(f"[日志] 登录密码验证状态: {pw.status_code}") if pw.status_code != 200: print(f"[失败] 登录密码验证失败: {pw.text[:200]}") continue existing_codes = list(extract_all_codes()) s2.get( "https://auth.openai.com/email-verification", headers={"referer": "https://auth.openai.com/log-in/password"}, timeout=15, ) print("[*] 正在等待登录 OTP...") time.sleep(2) otp2 = None baseline_codes = set(existing_codes) baseline_codes.add(first_code) for _ in range(40): all_codes = extract_all_codes() new_codes = [c for c in all_codes if c not in baseline_codes] if new_codes: otp2 = new_codes[-1] break time.sleep(2) if not otp2: print("[失败] 未收到登录 OTP") continue print(f"[成功] 捕获登录 OTP: {otp2}") val2 = s2.post( "https://auth.openai.com/api/accounts/email-otp/validate", headers={ "referer": "https://auth.openai.com/email-verification", "accept": "application/json", "content-type": "application/json", }, json={"code": otp2}, timeout=15, ) print(f"[日志] 登录 OTP 校验状态: {val2.status_code}") if val2.status_code != 200: print(f"[失败] 登录 OTP 校验失败: {val2.text[:200]}") continue val2_data = val2.json() or {} print("[成功] 登录 OTP 验证成功") consent_url = str(val2_data.get("continue_url") or "").strip() if consent_url: s2.get(consent_url, timeout=15) auth_cookie = s2.cookies.get("oai-client-auth-session", domain=".auth.openai.com") or s2.cookies.get("oai-client-auth-session") if not auth_cookie: print("[失败] 登录后未能获取 oai-client-auth-session") continue auth_json = _decode_jwt_segment(auth_cookie.split(".")[0]) if "workspaces" not in auth_json or not auth_json["workspaces"]: print(f"[失败] Cookie 中无 workspaces: {list(auth_json.keys())}") continue workspace_id = auth_json["workspaces"][0]["id"] print(f"[成功] Workspace ID: {workspace_id}") select_resp = s2.post( "https://auth.openai.com/api/accounts/workspace/select", headers={ "referer": consent_url, "accept": "application/json", "content-type": "application/json", }, json={"workspace_id": workspace_id}, timeout=15, ) print(f"[日志] Workspace 选择状态: {select_resp.status_code}") if select_resp.status_code != 200: print(f"[失败] Workspace 选择失败: {select_resp.text[:200]}") continue sel_data = select_resp.json() or {} if sel_data.get("page", {}).get("type", "") == "organization_select": orgs = sel_data.get("page", {}).get("payload", {}).get("data", {}).get("orgs", []) if orgs: org_sel = s2.post( "https://auth.openai.com/api/accounts/organization/select", headers={"accept": "application/json", "content-type": "application/json"}, json={ "org_id": orgs[0].get("id", ""), "project_id": orgs[0].get("default_project_id", ""), }, timeout=15, ) print(f"[日志] Organization 选择状态: {org_sel.status_code}") if org_sel.status_code != 200: print(f"[失败] Organization 选择失败: {org_sel.text[:200]}") continue sel_data = org_sel.json() or {} if "continue_url" not in sel_data: print(f"[失败] 未能获取 continue_url: {json.dumps(sel_data, ensure_ascii=False)[:500]}") continue print("[步骤10] 跟踪重定向并换取 Token...") r = s2.get(str(sel_data["continue_url"]), allow_redirects=False, timeout=15) cbk = None for i in range(20): loc = r.headers.get("Location", "") print(f" -> 重定向 #{i+1} 状态: {r.status_code} | 下一跳: {loc[:80] if loc else '无'}") if loc.startswith("http://localhost"): cbk = loc break if r.status_code not in (301, 302, 303) or not loc: break r = s2.get(loc, allow_redirects=False, timeout=15) if not cbk: print("[失败] 未能获取到 Callback URL") continue token_json = submit_callback_url( callback_url=cbk, expected_state=oauth2.state, code_verifier=oauth2.code_verifier, redirect_uri=oauth2.redirect_uri, session=s2, ) print("[大功告成] 账号注册完毕!") return token_json, email, password except Exception as e: print(f"[失败] 登录补全流程异常: {e}") time.sleep(2) continue print("[失败] 登录补全流程 3 次均未完成。") return None except Exception as e: print(f"[致命错误] 流程崩溃: {e}") return None # ========== Main 保持原版完整结构与输出格式 ========== def main(): parser = argparse.ArgumentParser() parser.add_argument("--proxy", help="代理地址") parser.add_argument("--mail-provider", choices=["auto", "gptmail", "tempmail"], default="auto", help="临时邮箱提供商:auto/gptmail/tempmail") parser.add_argument("--once", action="store_true", help="只运行一次") parser.add_argument("--sleep-min", type=int, default=5, help="最小间隔(秒)") parser.add_argument("--sleep-max", type=int, default=30, help="最大间隔(秒)") parser.add_argument("--cpa-base-url", default=os.getenv("CPA_BASE_URL"), help="CPA 基础地址") parser.add_argument("--cpa-token", default=os.getenv("CPA_TOKEN"), help="CPA 管理 token (Bearer)") parser.add_argument("--cpa-workers", type=int, default=20, help="CPA 清理并发") parser.add_argument("--cpa-timeout", type=int, default=12, help="CPA 请求超时") parser.add_argument("--cpa-retries", type=int, default=1, help="CPA 清理重试次数") parser.add_argument("--cpa-used-threshold", type=int, default=95, help="CPA used_percent 阈值") parser.add_argument("--cpa-clean", action="store_true", help="注册后自动清理 CPA 失效账号") parser.add_argument("--cpa-upload", action="store_true", help="注册后自动上传 CPA") parser.add_argument("--cpa-target-count", type=int, default=300, help="目标 token 数(有效)") parser.add_argument("--cpa-prune-local", action="store_true", help="上传成功后删除本地 token 文件与账号行") args = parser.parse_args() tokens_dir = OUT_DIR / "tokens" tokens_dir.mkdir(parents=True, exist_ok=True) pm = _build_cpa_maintainer(args) count = 0 while True: count += 1 print(f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 流程 #{count} <<<") if pm: if args.cpa_clean: _clean_invalid_in_cpa(pm, args) current_count = _count_valid_cpa_tokens(pm, args) print(f"[CPA] 当前有效 token: {current_count} / {args.cpa_target_count}") if current_count >= args.cpa_target_count: if args.once: break wait_time = random.randint(args.sleep_min, args.sleep_max) print(f"[*] 随机休息 {wait_time} 秒...") time.sleep(wait_time) continue res = run(args.proxy, args.mail_provider) if res: token_json, email, real_pwd = res print(f"[🎉] 成功! {email} ---- {real_pwd}") # 1. 保存账号密码到 tokens/accounts.txt with open(tokens_dir / "accounts.txt", "a", encoding="utf-8") as f: f.write(f"{email}----{real_pwd}\n") # 2. 保存详细 Token JSON fname_email = email.replace("@", "_") token_file = tokens_dir / f"token_{fname_email}_{int(time.time())}.json" token_file.write_text(token_json, encoding="utf-8") print(f"[*] Token 文件已保存: {token_file.name}") # 3. 上传 CPA(可选) upload_ok = False if args.cpa_upload: upload_ok = _upload_token_to_cpa(pm, token_json, email, proxy=args.proxy or "") # 4. 上传成功后按需删除本地文件/账号行 if upload_ok and args.cpa_prune_local: try: if token_file.exists(): token_file.unlink() print(f"[本地清理] 已删除 token 文件: {token_file.name}") except Exception as e: print(f"[本地清理] 删除 token 文件失败: {e}") _remove_account_entry(tokens_dir / "accounts.txt", email, real_pwd) # 5. 注册后再清理一次(可选) if pm and args.cpa_clean: _clean_invalid_in_cpa(pm, args) else: print("[-] 本次注册流程未能完成。") if args.once: break wait_time = random.randint(args.sleep_min, args.sleep_max) print(f"[*] 随机休息 {wait_time} 秒...") time.sleep(wait_time) if __name__ == "__main__": main()