husj 1 ヶ月 前
コミット
e9b0ed230a
1 ファイル変更1149 行追加0 行削除
  1. 1149 0
      openai_register.py

+ 1149 - 0
openai_register.py

@@ -0,0 +1,1149 @@
+import json
+import os
+import re
+import sys
+import time
+import uuid
+import random
+import string
+import secrets
+import hashlib
+import base64
+import argparse
+from pathlib import Path
+from datetime import datetime, timedelta
+from dataclasses import dataclass
+from typing import Any, Dict, Optional, List
+import urllib.parse
+import urllib.request
+import urllib.error
+
+import asyncio
+import requests as py_requests
+try:
+    import aiohttp
+except ImportError:
+    aiohttp = None
+
+from curl_cffi import requests
+
+OUT_DIR = Path(__file__).parent.resolve()
+UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36"
+AUTH_URL = "https://auth.openai.com/oauth/authorize"
+TOKEN_URL = "https://auth.openai.com/oauth/token"
+CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
+DEFAULT_REDIRECT_URI = "http://localhost:1455/auth/callback"
+DEFAULT_SCOPE = "openid email profile offline_access"
+
+# ========== 临时邮箱提供商:GPTMail + TempMail.lol ==========
+
+class GPTMailClient:
+    def __init__(self, proxies: Any = None):
+        self.session = requests.Session(proxies=proxies, impersonate="chrome")
+        self.session.headers.update({
+            "User-Agent": UA,
+            "Accept": "application/json, text/plain, */*",
+            "Accept-Language": "zh-CN,zh;q=0.9",
+            "Referer": "https://mail.chatgpt.org.uk/",
+        })
+        self.base_url = "https://mail.chatgpt.org.uk"
+
+    def _init_browser_session(self):
+        try:
+            resp = self.session.get(self.base_url, timeout=15)
+            gm_sid = self.session.cookies.get("gm_sid")
+            if gm_sid:
+                self.session.headers.update({"Cookie": f"gm_sid={gm_sid}"})
+            token_match = re.search(r'(eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+)', resp.text)
+            if token_match:
+                self.session.headers.update({"x-inbox-token": token_match.group(1)})
+        except Exception:
+            pass
+
+    def generate_email(self) -> str:
+        self._init_browser_session()
+        resp = self.session.get(f"{self.base_url}/api/generate-email", timeout=15)
+        if resp.status_code == 200:
+            data = resp.json()
+            email = data["data"]["email"]
+            self.session.headers.update({"x-inbox-token": data["auth"]["token"]})
+            print(f"[+] 生成邮箱: {email} (GPTMail)")
+            print("[*] 自动轮询已启动(GPTMail 会话已准备)")
+            return email
+        raise RuntimeError(f"GPTMail 生成失败: {resp.status_code}")
+
+    def list_emails(self, email: str) -> List[Dict[str, Any]]:
+        encoded_email = urllib.parse.quote(email)
+        resp = self.session.get(f"{self.base_url}/api/emails?email={encoded_email}", timeout=15)
+        if resp.status_code == 200:
+            return resp.json().get("data", {}).get("emails", [])
+        return []
+
+
+class Message:
+    def __init__(self, data: dict):
+        self.from_addr = data.get("from", "")
+        self.subject = data.get("subject", "")
+        self.body = data.get("body", "") or ""
+        self.html_body = data.get("html", "") or ""
+
+
+class EMail:
+    def __init__(self, proxies: Any = None):
+        self.s = requests.Session(proxies=proxies, impersonate="chrome")
+        self.s.headers.update({
+            "User-Agent": UA,
+            "Accept": "application/json",
+            "Content-Type": "application/json",
+        })
+        r = self.s.post("https://api.tempmail.lol/v2/inbox/create", json={}, timeout=15)
+        r.raise_for_status()
+        data = r.json()
+        self.address = data["address"]
+        self.token = data["token"]
+        print(f"[+] 生成邮箱: {self.address} (TempMail.lol)")
+        print("[*] 自动轮询已启动(token 已保存)")
+
+    def _get_messages(self) -> List[Dict[str, Any]]:
+        r = self.s.get(f"https://api.tempmail.lol/v2/inbox?token={self.token}", timeout=15)
+        r.raise_for_status()
+        return r.json().get("emails", [])
+
+
+class PMailClient:
+    def __init__(self, proxies: Any = None):
+        self.session = requests.Session(proxies=proxies, impersonate="chrome")
+        self.session.headers.update({
+            "Accept": "application/json, text/plain, */*",
+            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
+            "Cache-Control": "no-cache",
+            "Content-Type": "application/json;charset=UTF-8;",
+            "Lang": "zhCn",
+            "Origin": "https://pmail.030208.xyz",
+            "Pragma": "no-cache",
+            "Priority": "u=1, i",
+            "Referer": "https://pmail.030208.xyz/",
+            "Sec-Ch-Ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Microsoft"',
+            "Sec-Ch-Ua-Mobile": "?0",
+            "Sec-Ch-Ua-Platform": '"Windows"',
+            "Sec-Fetch-Dest": "empty",
+            "Sec-Fetch-Mode": "cors",
+            "Sec-Fetch-Site": "same-origin",
+            "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0",
+            "Cookie": "session=beG8AdZzfjgHWPwR0mrgoWGvjMU30zwPgvfNgT31sSk"
+        })
+
+    def list_emails(self, target_email: str) -> List[Dict[str, Any]]:
+        url = "https://pmail.030208.xyz/api/email/list"
+        payload = {"tag": {"type": 0, "status": -1, "group_id": 0}, "page_size": 10}
+        try:
+            resp = self.session.post(url, json=payload, timeout=15)
+            if resp.status_code == 200:
+                data = resp.json()
+                emails_list = data.get("data", {}).get("list", [])
+                results = []
+                for item in emails_list[:3]:
+                    to_arr = item.get("to", [])
+                    if to_arr and len(to_arr) > 0:
+                        addr = to_arr[0].get("EmailAddress", "")
+                        if addr == target_email:
+                            results.append({
+                                "subject": item.get("title", ""),
+                                "txt": item.get("title", ""),
+                                "from": addr
+                            })
+                return results
+        except Exception:
+            pass
+        return []
+
+
+def get_email_and_code_fetcher(proxies: Any = None, provider: str = "auto"):
+    provider = (provider or "auto").strip().lower()
+    if provider not in {"auto", "gptmail", "tempmail", "pmail"}:
+        raise ValueError(f"不支持的邮箱提供商: {provider}")
+
+    def _build_tempmail_bundle():
+        inbox = EMail(proxies)
+        email = inbox.address
+
+        def _extract_all_codes() -> List[str]:
+            results: List[str] = []
+            try:
+                msgs = inbox._get_messages()
+                for msg_data in msgs:
+                    msg = Message(msg_data)
+                    body = msg.body or msg.html_body or msg.subject or ""
+                    results.extend(re.findall(r"\b(\d{6})\b", body))
+            except Exception:
+                pass
+            return results
+
+        def fetch_code(timeout_sec: int = 180, poll: float = 6.0, exclude_codes: Optional[List[str]] = None) -> str | None:
+            exclude = set(exclude_codes or [])
+            start = time.monotonic()
+            attempt = 0
+            while time.monotonic() - start < timeout_sec:
+                attempt += 1
+                try:
+                    msgs = inbox._get_messages()
+                    print(f"[otp][tempmail] 轮询 #{attempt}, 收到 {len(msgs)} 封邮件, 目标: {email}")
+                    for msg_data in msgs:
+                        msg = Message(msg_data)
+                        body = msg.body or msg.html_body or msg.subject or ""
+                        for code in re.findall(r"\b(\d{6})\b", body):
+                            if code not in exclude:
+                                return code
+                except Exception:
+                    pass
+                time.sleep(poll)
+            return None
+
+        return email, _gen_password(), fetch_code, _extract_all_codes, "tempmail"
+
+    def _build_gptmail_bundle():
+        client = GPTMailClient(proxies)
+        email = client.generate_email()
+
+        def _extract_all_codes() -> List[str]:
+            regex = r"(?<!\d)(\d{6})(?!\d)"
+            results: List[str] = []
+            try:
+                summaries = client.list_emails(email)
+                for s in summaries:
+                    body = " ".join([
+                        str(s.get("subject", "") or ""),
+                        str(s.get("text", "") or ""),
+                        str(s.get("body", "") or ""),
+                        str(s.get("html", "") or ""),
+                        json.dumps(s, ensure_ascii=False),
+                    ])
+                    results.extend(re.findall(regex, body))
+            except Exception:
+                pass
+            return results
+
+        def fetch_code(timeout_sec: int = 180, poll: float = 6.0, exclude_codes: Optional[List[str]] = None) -> str | None:
+            exclude = set(exclude_codes or [])
+            start = time.monotonic()
+            attempt = 0
+            while time.monotonic() - start < timeout_sec:
+                attempt += 1
+                try:
+                    summaries = client.list_emails(email)
+                    print(f"[otp][gptmail] 轮询 #{attempt}, 收到 {len(summaries)} 封邮件, 目标: {email}")
+                    for s in summaries:
+                        body = " ".join([
+                            str(s.get("subject", "") or ""),
+                            str(s.get("text", "") or ""),
+                            str(s.get("body", "") or ""),
+                            str(s.get("html", "") or ""),
+                            json.dumps(s, ensure_ascii=False),
+                        ])
+                        for code in re.findall(r"(?<!\d)(\d{6})(?!\d)", body):
+                            if code not in exclude:
+                                return code
+                except Exception:
+                    pass
+                time.sleep(poll)
+            return None
+
+        return email, _gen_password(), fetch_code, _extract_all_codes, "gptmail"
+
+    def _build_pmail_bundle():
+        client = PMailClient(proxies)
+        email = _random_name().lower() + "@030208.xyz"
+        print(f"[+] 分配自建域邮箱别名: {email} (PMail)")
+        print("[*] 自动轮询已启动(关联至主信箱 husj@030208.xyz)")
+
+        def _extract_all_codes() -> List[str]:
+            results: List[str] = []
+            try:
+                summaries = client.list_emails(email)
+                for s in summaries:
+                    body = f"{s.get('subject', '')} {s.get('txt', '')}"
+                    results.extend(re.findall(r"(?<!\d)(\d{6})(?!\d)", body))
+            except Exception:
+                pass
+            return results
+
+        def fetch_code(timeout_sec: int = 180, poll: float = 6.0, exclude_codes: Optional[List[str]] = None) -> str | None:
+            exclude = set(exclude_codes or [])
+            start = time.monotonic()
+            attempt = 0
+            while time.monotonic() - start < timeout_sec:
+                attempt += 1
+                try:
+                    summaries = client.list_emails(email)
+                    print(f"[otp][pmail] 轮询 #{attempt}, 收到 {len(summaries)} 封目标邮件, 目标: {email}")
+                    for s in summaries:
+                        body = f"{s.get('subject', '')} {s.get('txt', '')}"
+                        for code in re.findall(r"(?<!\d)(\d{6})(?!\d)", body):
+                            if code not in exclude:
+                                return code
+                except Exception:
+                    pass
+                time.sleep(poll)
+            return None
+
+        return email, _gen_password(), fetch_code, _extract_all_codes, "pmail"
+
+    if provider == "pmail":
+        return _build_pmail_bundle()
+    if provider == "tempmail":
+        return _build_tempmail_bundle()
+    if provider == "gptmail":
+        return _build_gptmail_bundle()
+
+    try:
+        return _build_pmail_bundle()
+    except Exception as e:
+        print(f"[邮箱] PMail 初始化失败,回退 TempMail: {e}")
+        try:
+            return _build_tempmail_bundle()
+        except Exception as e2:
+            print(f"[邮箱] TempMail 初始化失败,回退 GPTMail: {e2}")
+            return _build_gptmail_bundle()
+
+# ========== OAuth 核心逻辑 (对齐原版的完美重定向流) ==========
+
+def _gen_password() -> str:
+    alphabet = string.ascii_letters + string.digits
+    special = "!@#$%^&*.-"
+    base = [random.choice(string.ascii_lowercase), random.choice(string.ascii_uppercase),
+            random.choice(string.digits), random.choice(special)]
+    base += [random.choice(alphabet + special) for _ in range(12)]
+    random.shuffle(base)
+    return "".join(base)
+
+def _random_name() -> str:
+    return ''.join(random.choice(string.ascii_lowercase) for _ in range(7)).capitalize()
+
+def _random_birthdate() -> str:
+    start = datetime(1975, 1, 1); end = datetime(1999, 12, 31)
+    d = start + timedelta(days=random.randrange((end - start).days + 1))
+    return d.strftime('%Y-%m-%d')
+
+def _b64url_no_pad(raw: bytes) -> str:
+    return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
+
+def _sha256_b64url_no_pad(s: str) -> str:
+    return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
+
+def _pkce_verifier() -> str:
+    return secrets.token_urlsafe(64)
+
+def _parse_callback_url(callback_url: str) -> Dict[str, Any]:
+    candidate = (callback_url or "").strip()
+    if not candidate:
+        return {"code": "", "state": "", "error": "", "error_description": ""}
+    if "://" not in candidate:
+        if candidate.startswith("?"):
+            candidate = f"http://localhost{candidate}"
+        elif any(ch in candidate for ch in "/?#") or ":" in candidate:
+            candidate = f"http://{candidate}"
+        elif "=" in candidate:
+            candidate = f"http://localhost/?{candidate}"
+    parsed = urllib.parse.urlparse(candidate)
+    query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
+    fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
+    for key, values in fragment.items():
+        if key not in query or not query[key] or not (query[key][0] or "").strip():
+            query[key] = values
+    def get1(k: str) -> str:
+        return (query.get(k, [""])[0] or "").strip()
+    code = get1("code")
+    state = get1("state")
+    error = get1("error")
+    error_description = get1("error_description")
+    if code and not state and "#" in code:
+        code, state = code.split("#", 1)
+    if not error and error_description:
+        error, error_description = error_description, ""
+    return {"code": code, "state": state, "error": error, "error_description": error_description}
+
+def _decode_jwt_segment(seg: str) -> Dict[str, Any]:
+    try:
+        pad = "=" * ((4 - (len(seg) % 4)) % 4)
+        return json.loads(base64.urlsafe_b64decode((seg + pad).encode("ascii")).decode("utf-8"))
+    except Exception:
+        return {}
+
+def _jwt_claims_no_verify(token: str) -> Dict[str, Any]:
+    if not token or token.count(".") < 2:
+        return {}
+    return _decode_jwt_segment(token.split(".")[1])
+
+def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
+    body = urllib.parse.urlencode(data).encode("utf-8")
+    req = urllib.request.Request(
+        url,
+        data=body,
+        method="POST",
+        headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"},
+    )
+    try:
+        with urllib.request.urlopen(req, timeout=timeout) as resp:
+            raw = resp.read()
+            if resp.status != 200:
+                raise RuntimeError(f"Token 交换失败: {resp.status}: {raw.decode('utf-8', 'replace')}")
+            return json.loads(raw.decode("utf-8"))
+    except urllib.error.HTTPError as exc:
+        raw = exc.read()
+        raise RuntimeError(f"Token 交换失败: {exc.code}: {raw.decode('utf-8', 'replace')}") from exc
+
+def _to_int(v: Any) -> int:
+    try:
+        return int(v)
+    except (TypeError, ValueError):
+        return 0
+
+def _build_sentinel_payload(session, did: str, flow: str) -> str:
+    req_body = json.dumps({"p": "", "id": did, "flow": flow})
+    resp = session.post(
+        "https://sentinel.openai.com/backend-api/sentinel/req",
+        headers={
+            "origin": "https://sentinel.openai.com",
+            "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6",
+            "content-type": "text/plain;charset=UTF-8",
+        },
+        data=req_body,
+        timeout=15,
+    )
+    if resp.status_code != 200:
+        raise RuntimeError(f"Sentinel 验证失败: {resp.status_code}: {resp.text[:200]}")
+    token = (resp.json() or {}).get("token", "")
+    return json.dumps({"p": "", "t": "", "c": token, "id": did, "flow": flow})
+
+@dataclass(frozen=True)
+class OAuthStart:
+    auth_url: str; state: str; code_verifier: str; redirect_uri: str
+
+def generate_oauth_url(redirect_uri: str = DEFAULT_REDIRECT_URI) -> OAuthStart:
+    state = secrets.token_urlsafe(16)
+    verifier = _pkce_verifier()
+    challenge = _sha256_b64url_no_pad(verifier)
+
+    params = {
+        "client_id": CLIENT_ID,
+        "response_type": "code",
+        "redirect_uri": redirect_uri,
+        "scope": DEFAULT_SCOPE,
+        "state": state,
+        "code_challenge": challenge,
+        "code_challenge_method": "S256",
+        "prompt": "login",
+        "id_token_add_organizations": "true",
+        "codex_cli_simplified_flow": "true",
+    }
+    return OAuthStart(f"{AUTH_URL}?{urllib.parse.urlencode(params)}", state, verifier, redirect_uri)
+
+def fetch_sentinel_token(flow: str, did: str, proxies: Any = None) -> Optional[str]:
+    try:
+        session = requests.Session(proxies=proxies, impersonate="chrome")
+        payload = _build_sentinel_payload(session, did, flow)
+        return (json.loads(payload) or {}).get("c")
+    except Exception:
+        return None
+
+def submit_callback_url(callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str, session=None) -> str:
+    cb = _parse_callback_url(callback_url)
+    if cb.get("error"):
+        raise RuntimeError(f"OAuth 错误: {cb['error']}: {cb.get('error_description', '')}".strip())
+    if not cb.get("code"):
+        raise ValueError("Callback URL 缺少 ?code=")
+    if not cb.get("state"):
+        raise ValueError("Callback URL 缺少 ?state=")
+    if cb.get("state") != expected_state:
+        raise ValueError("State 校验不匹配")
+    token_data = {
+        "grant_type": "authorization_code",
+        "client_id": CLIENT_ID,
+        "code": cb["code"],
+        "redirect_uri": redirect_uri,
+        "code_verifier": code_verifier,
+    }
+    if session is not None:
+        resp = session.post(
+            TOKEN_URL,
+            data=token_data,
+            headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"},
+            timeout=30,
+        )
+        if resp.status_code != 200:
+            raise RuntimeError(f"Token 交换失败: {resp.status_code}: {resp.text[:200]}")
+        token_resp = resp.json()
+    else:
+        token_resp = _post_form(TOKEN_URL, token_data)
+
+    access_token = str(token_resp.get("access_token") or "").strip()
+    refresh_token = str(token_resp.get("refresh_token") or "").strip()
+    id_token = str(token_resp.get("id_token") or "").strip()
+    expires_in = _to_int(token_resp.get("expires_in"))
+    claims = _jwt_claims_no_verify(id_token)
+    auth_claims = claims.get("https://api.openai.com/auth") or {}
+
+    now = int(time.time())
+    config = {
+        "id_token": id_token,
+        "access_token": access_token,
+        "refresh_token": refresh_token,
+        "account_id": str(auth_claims.get("chatgpt_account_id") or "").strip(),
+        "last_refresh": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)),
+        "email": str(claims.get("email") or "").strip(),
+        "type": "codex",
+        "expired": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))),
+    }
+    return json.dumps(config, ensure_ascii=False, indent=2)
+
+
+# ========== 轻量版 CPA 维护实现(内嵌,不依赖项目包) ==========
+DEFAULT_MGMT_UA = "codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal"
+
+def _mgmt_headers(token: str) -> dict:
+    clean = str(token or "").strip()
+    if clean and not clean.lower().startswith("bearer "):
+        clean = f"Bearer {clean}"
+    return {"Authorization": clean, "Accept": "application/json"}
+
+
+def _join_mgmt_url(base_url: str, path: str) -> str:
+    base = (base_url or "").rstrip("/")
+    suffix = path if path.startswith("/") else f"/{path}"
+    if base.endswith("/v0"):
+        return f"{base}{suffix}"
+    return f"{base}/v0{suffix}"
+
+
+def _safe_json(text: str):
+    try:
+        return json.loads(text)
+    except Exception:
+        return {}
+
+
+def _extract_account_id(item: dict):
+    for key in ("chatgpt_account_id", "chatgptAccountId", "account_id", "accountId"):
+        val = item.get(key)
+        if val:
+            return str(val)
+    return None
+
+
+def _get_item_type(item: dict) -> str:
+    return str(item.get("type") or item.get("typo") or "")
+
+
+class MiniPoolMaintainer:
+    def __init__(self, base_url: str, token: str, target_type: str = "codex", used_percent_threshold: int = 95, user_agent: str = DEFAULT_MGMT_UA):
+        self.base_url = (base_url or "").rstrip("/")
+        self.token = token
+        self.target_type = target_type
+        self.used_percent_threshold = used_percent_threshold
+        self.user_agent = user_agent
+
+    def upload_token(self, filename: str, token_data: dict, proxy: str = "") -> bool:
+        if not self.base_url or not self.token:
+            return False
+        content = json.dumps(token_data, ensure_ascii=False).encode("utf-8")
+        files = {"file": (filename, content, "application/json")}
+        headers = {"Authorization": f"Bearer {self.token}"}
+        proxies = {"http": proxy, "https": proxy} if proxy else None
+        for attempt in range(3):
+            try:
+                resp = py_requests.post(_join_mgmt_url(self.base_url, "/management/auth-files"), files=files, headers=headers, timeout=30, verify=False, proxies=proxies)
+                if resp.status_code in (200, 201, 204):
+                    return True
+            except Exception:
+                pass
+            if attempt < 2:
+                time.sleep(2 ** attempt)
+        return False
+
+    def fetch_auth_files(self, timeout: int = 15):
+        resp = py_requests.get(_join_mgmt_url(self.base_url, "/management/auth-files"), headers=_mgmt_headers(self.token), timeout=timeout)
+        resp.raise_for_status()
+        data = resp.json()
+        return (data.get("files") if isinstance(data, dict) else []) or []
+
+    async def probe_and_clean_async(self, workers: int = 20, timeout: int = 10, retries: int = 1):
+        if aiohttp is None:
+            raise RuntimeError("需要安装 aiohttp: pip install aiohttp")
+        files = self.fetch_auth_files(timeout)
+        candidates = [f for f in files if _get_item_type(f).lower() == self.target_type.lower()]
+        if not candidates:
+            return {"total": len(files), "candidates": 0, "invalid_count": 0, "deleted_ok": 0, "deleted_fail": 0}
+
+        semaphore = asyncio.Semaphore(max(1, workers))
+        connector = aiohttp.TCPConnector(limit=max(1, workers))
+        client_timeout = aiohttp.ClientTimeout(total=max(1, timeout))
+
+        async def probe_one(session, item):
+            auth_index = item.get("auth_index")
+            name = item.get("name") or item.get("id")
+            res = {"name": name, "auth_index": auth_index, "invalid_401": False, "invalid_used_percent": False, "used_percent": None}
+            if not auth_index:
+                res["invalid_401"] = False
+                return res
+            account_id = _extract_account_id(item)
+            header = {"Authorization": "Bearer $TOKEN$", "Content-Type": "application/json", "User-Agent": self.user_agent}
+            if account_id:
+                header["Chatgpt-Account-Id"] = account_id
+            payload = {"authIndex": auth_index, "method": "GET", "url": "https://chatgpt.com/backend-api/wham/usage", "header": header}
+            for attempt in range(retries + 1):
+                try:
+                    async with semaphore:
+                        async with session.post(_join_mgmt_url(self.base_url, "/management/api-call"), headers={**_mgmt_headers(self.token), "Content-Type": "application/json"}, json=payload, timeout=timeout) as resp:
+                            text = await resp.text()
+                            if resp.status >= 400:
+                                raise RuntimeError(f"HTTP {resp.status}: {text[:200]}")
+                            data = _safe_json(text)
+                            sc = data.get("status_code")
+                            res["invalid_401"] = sc == 401
+                            if sc == 200:
+                                body = _safe_json(data.get("body", ""))
+                                used_pct = (body.get("rate_limit", {}).get("primary_window", {}).get("used_percent"))
+                                if used_pct is not None:
+                                    res["used_percent"] = used_pct
+                                    res["invalid_used_percent"] = used_pct >= self.used_percent_threshold
+                            return res
+                except Exception as e:
+                    if attempt >= retries:
+                        res["error"] = str(e)
+                        return res
+            return res
+
+        async def delete_one(session, name: str):
+            if not name:
+                return False
+            from urllib.parse import quote
+            encoded = quote(name, safe="")
+            try:
+                async with semaphore:
+                    async with session.delete(f"{_join_mgmt_url(self.base_url, '/management/auth-files')}?name={encoded}", headers=_mgmt_headers(self.token), timeout=timeout) as resp:
+                        text = await resp.text()
+                        data = _safe_json(text)
+                        return resp.status == 200 and data.get("status") == "ok"
+            except Exception:
+                return False
+
+        invalid_list = []
+        async with aiohttp.ClientSession(connector=connector, timeout=client_timeout, trust_env=True) as session:
+            tasks = [asyncio.create_task(probe_one(session, item)) for item in candidates]
+            for task in asyncio.as_completed(tasks):
+                r = await task
+                if r.get("invalid_401") or r.get("invalid_used_percent"):
+                    invalid_list.append(r)
+
+            delete_tasks = [asyncio.create_task(delete_one(session, r.get("name"))) for r in invalid_list if r.get("name")]
+            deleted_ok = 0
+            deleted_fail = 0
+            for task in asyncio.as_completed(delete_tasks):
+                if await task:
+                    deleted_ok += 1
+                else:
+                    deleted_fail += 1
+
+        return {
+            "total": len(files),
+            "candidates": len(candidates),
+            "invalid_count": len(invalid_list),
+            "deleted_ok": deleted_ok,
+            "deleted_fail": deleted_fail,
+        }
+
+    def probe_and_clean_sync(self, workers: int = 20, timeout: int = 10, retries: int = 1):
+        return asyncio.run(self.probe_and_clean_async(workers, timeout, retries))
+
+
+def _build_cpa_maintainer(args):
+    base_url = (args.cpa_base_url or os.getenv("CPA_BASE_URL") or "").strip()
+    token = (args.cpa_token or os.getenv("CPA_TOKEN") or "").strip()
+    if not base_url or not token:
+        print("[CPA] 未提供 cpa_base_url / cpa_token,跳过 CPA 上传/清理")
+        return None
+    try:
+        return MiniPoolMaintainer(
+            base_url,
+            token,
+            target_type="codex",
+            used_percent_threshold=args.cpa_used_threshold,
+            user_agent=DEFAULT_MGMT_UA,
+        )
+    except Exception as e:
+        print(f"[CPA] 创建维护器失败: {e}")
+        return None
+
+
+def _upload_token_to_cpa(pm, token_json: str, email: str, proxy: str = "") -> bool:
+    if not pm:
+        return False
+    try:
+        data = json.loads(token_json)
+    except Exception as e:
+        print(f"[CPA] 解析 token_json 失败: {e}")
+        return False
+    fname_email = email.replace("@", "_")
+    filename = f"token_{fname_email}_{int(time.time())}.json"
+    ok = pm.upload_token(filename=filename, token_data=data, proxy=proxy or "")
+    if ok:
+        print(f"[CPA] 已上传 {filename} 到 CPA")
+    else:
+        print("[CPA] 上传失败")
+    return ok
+
+
+def _clean_invalid_in_cpa(pm, args):
+    if not pm:
+        return None
+    try:
+        res = pm.probe_and_clean_sync(
+            workers=max(1, args.cpa_workers),
+            timeout=max(5, args.cpa_timeout),
+            retries=max(0, args.cpa_retries),
+        )
+        print(
+            f"[CPA] 清理完成: total={res.get('total')} candidates={res.get('candidates')} "
+            f"invalid={res.get('invalid_count')} deleted_ok={res.get('deleted_ok')} deleted_fail={res.get('deleted_fail')}"
+        )
+        return res
+    except Exception as e:
+        print(f"[CPA] 清理失败: {e}")
+        return None
+
+
+def _count_valid_cpa_tokens(pm, args):
+    if not pm:
+        return 0
+    try:
+        files = pm.fetch_auth_files(timeout=max(5, args.cpa_timeout))
+        target = pm.target_type.lower()
+        valid = [f for f in files if _get_item_type(f).lower() == target]
+        return len(valid)
+    except Exception as e:
+        print(f"[CPA] 统计 token 失败: {e}")
+        return 0
+
+
+# 账号行清理:上传成功且开启 prune_local 后使用
+# 安全处理:文件不存在直接返回,写入保持末尾换行便于追加
+
+def _remove_account_entry(accounts_path: Path, email: str, real_pwd: str):
+    if not accounts_path.exists():
+        return
+    try:
+        lines = accounts_path.read_text(encoding="utf-8").splitlines()
+        target = f"{email}----{real_pwd}"
+        kept = [ln for ln in lines if ln.strip() != target]
+        accounts_path.write_text("\n".join(kept) + ("\n" if kept else ""), encoding="utf-8")
+        print(f"[本地清理] 已从 accounts.txt 移除: {email}")
+    except Exception as e:
+        print(f"[本地清理] 移除账号行失败: {e}")
+
+# ========== 主注册流程 (恢复详细日志与异常捕获) ==========
+
+def run(proxy: Optional[str], mail_provider: str = "auto"):
+    proxies = {"http": proxy, "https": proxy} if proxy else None
+    s = requests.Session(proxies=proxies, impersonate="chrome")
+    s.headers.update({
+        "user-agent": UA,
+        "accept": "application/json, text/plain, */*",
+    })
+
+    print(f"\n{'='*20} 开启注册流程 {'='*20}")
+    try:
+        print(f"[步骤1] 正在初始化临时邮箱(provider={mail_provider})...")
+        email, password, code_fetcher, extract_all_codes, actual_mail_provider = get_email_and_code_fetcher(proxies, provider=mail_provider)
+        print(f"[*] 当前邮箱提供商: {actual_mail_provider}")
+        if not email:
+            print("[失败] 未能获取邮箱")
+            return None
+        print(f"[成功] 邮箱: {email} | 临时密码: {password}")
+
+        print("[步骤2] 访问 OpenAI 授权页获取 Device ID...")
+        oauth = generate_oauth_url()
+        auth_page = s.get(oauth.auth_url, timeout=15)
+        did = s.cookies.get("oai-did")
+        if not did:
+            print("[失败] 未能从 Cookie 获取 oai-did")
+            return None
+        print(f"[成功] Device ID: {did}")
+
+        print("[步骤3] 获取 Sentinel 载荷并提交注册邮箱...")
+        try:
+            authorize_continue_sentinel = _build_sentinel_payload(s, did, "authorize_continue")
+        except Exception as e:
+            print(f"[失败] 获取 authorize_continue Sentinel 失败: {e}")
+            return None
+
+        continue_url = ""
+        try:
+            auth_json = auth_page.json() if hasattr(auth_page, "json") else {}
+            continue_url = str((auth_json or {}).get("continue_url") or "").strip()
+        except Exception:
+            continue_url = ""
+        if continue_url:
+            try:
+                s.get(continue_url, timeout=15)
+            except Exception:
+                pass
+
+        signup_res = s.post(
+            "https://auth.openai.com/api/accounts/authorize/continue",
+            headers={
+                "referer": "https://auth.openai.com/create-account",
+                "accept": "application/json",
+                "content-type": "application/json",
+                "openai-sentinel-token": authorize_continue_sentinel,
+            },
+            data=json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "signup"}),
+            timeout=15,
+        )
+        print(f"[日志] 邮箱提交状态: {signup_res.status_code}")
+        if signup_res.status_code != 200:
+            print(f"[失败] 邮箱提交失败: {signup_res.text[:200]}")
+            return None
+
+        print("[步骤4] 设置账户密码...")
+        pwd_res = s.post(
+            "https://auth.openai.com/api/accounts/user/register",
+            headers={
+                "referer": "https://auth.openai.com/create-account/password",
+                "accept": "application/json",
+                "content-type": "application/json",
+            },
+            json={"password": password, "username": email},
+            timeout=15,
+        )
+        print(f"[日志] 密码设置状态: {pwd_res.status_code}")
+        if pwd_res.status_code != 200:
+            print(f"[失败] 密码设置失败: {pwd_res.text[:200]}")
+            return None
+
+        print("[步骤5] 触发 OpenAI 发送验证邮件...")
+        s.get("https://auth.openai.com/create-account/password", timeout=15)
+        otp_send_res = s.get(
+            "https://auth.openai.com/api/accounts/email-otp/send",
+            headers={"referer": "https://auth.openai.com/create-account/password", "accept": "application/json"},
+            timeout=15,
+        )
+        print(f"[日志] 发送指令状态: {otp_send_res.status_code}")
+        if otp_send_res.status_code != 200:
+            print(f"[失败] 发送验证码失败: {otp_send_res.text[:200]}")
+            return None
+
+        print("[步骤6] 等待邮箱接收 6 位验证码...")
+        code = code_fetcher()
+        if not code:
+            print("[失败] 邮箱长时间未收到验证码")
+            return None
+        print(f"[成功] 捕获验证码: {code}")
+
+        print("[步骤7] 提交验证码至 OpenAI...")
+        val_res = s.post(
+            "https://auth.openai.com/api/accounts/email-otp/validate",
+            headers={
+                "referer": "https://auth.openai.com/email-verification",
+                "accept": "application/json",
+                "content-type": "application/json",
+            },
+            json={"code": code},
+            timeout=15,
+        )
+        print(f"[日志] 验证码校验状态: {val_res.status_code}")
+        if val_res.status_code != 200:
+            print(f"[失败] 验证码校验失败: {val_res.text[:200]}")
+            return None
+
+        print("[步骤8] 完善账户基本信息...")
+        try:
+            create_account_sentinel = _build_sentinel_payload(s, did, "authorize_continue")
+        except Exception as e:
+            print(f"[失败] 获取 create_account Sentinel 失败: {e}")
+            return None
+
+        acc_res = s.post(
+            "https://auth.openai.com/api/accounts/create_account",
+            headers={
+                "referer": "https://auth.openai.com/about-you",
+                "accept": "application/json",
+                "content-type": "application/json",
+                "openai-sentinel-token": create_account_sentinel,
+            },
+            data=json.dumps({"name": _random_name(), "birthdate": _random_birthdate()}),
+            timeout=15,
+        )
+        print(f"[日志] 账户创建状态: {acc_res.status_code}")
+        if acc_res.status_code != 200:
+            print(f"[失败] 账户创建失败: {acc_res.text[:200]}")
+            return None
+
+        print("[步骤9] 注册完成,重新走登录流程获取 Workspace / Token...")
+        first_code = code
+        for login_attempt in range(3):
+            try:
+                print(f"[*] 正在通过登录流程获取 Token...{f' (重试 {login_attempt}/3)' if login_attempt else ''}")
+                s2 = requests.Session(proxies=proxies, impersonate="chrome")
+                oauth2 = generate_oauth_url()
+                s2.get(oauth2.auth_url, timeout=15)
+                did2 = s2.cookies.get("oai-did")
+                if not did2:
+                    print("[失败] 登录会话未能获取 oai-did")
+                    continue
+
+                lc = s2.post(
+                    "https://auth.openai.com/api/accounts/authorize/continue",
+                    headers={
+                        "referer": "https://auth.openai.com/log-in",
+                        "accept": "application/json",
+                        "content-type": "application/json",
+                        "openai-sentinel-token": _build_sentinel_payload(s2, did2, "authorize_continue"),
+                    },
+                    data=json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "login"}),
+                    timeout=15,
+                )
+                print(f"[日志] 登录邮箱提交状态: {lc.status_code}")
+                if lc.status_code != 200:
+                    print(f"[失败] 登录邮箱提交失败: {lc.text[:200]}")
+                    continue
+                s2.get(str((lc.json() or {}).get("continue_url") or ""), timeout=15)
+
+                pw = s2.post(
+                    "https://auth.openai.com/api/accounts/password/verify",
+                    headers={
+                        "referer": "https://auth.openai.com/log-in/password",
+                        "accept": "application/json",
+                        "content-type": "application/json",
+                        "openai-sentinel-token": _build_sentinel_payload(s2, did2, "authorize_continue"),
+                    },
+                    json={"password": password},
+                    timeout=15,
+                )
+                print(f"[日志] 登录密码验证状态: {pw.status_code}")
+                if pw.status_code != 200:
+                    print(f"[失败] 登录密码验证失败: {pw.text[:200]}")
+                    continue
+
+                existing_codes = list(extract_all_codes())
+                s2.get(
+                    "https://auth.openai.com/email-verification",
+                    headers={"referer": "https://auth.openai.com/log-in/password"},
+                    timeout=15,
+                )
+                print("[*] 正在等待登录 OTP...")
+                time.sleep(2)
+
+                otp2 = None
+                baseline_codes = set(existing_codes)
+                baseline_codes.add(first_code)
+                for _ in range(40):
+                    all_codes = extract_all_codes()
+                    new_codes = [c for c in all_codes if c not in baseline_codes]
+                    if new_codes:
+                        otp2 = new_codes[-1]
+                        break
+                    time.sleep(2)
+
+                if not otp2:
+                    print("[失败] 未收到登录 OTP")
+                    continue
+                print(f"[成功] 捕获登录 OTP: {otp2}")
+
+                val2 = s2.post(
+                    "https://auth.openai.com/api/accounts/email-otp/validate",
+                    headers={
+                        "referer": "https://auth.openai.com/email-verification",
+                        "accept": "application/json",
+                        "content-type": "application/json",
+                    },
+                    json={"code": otp2},
+                    timeout=15,
+                )
+                print(f"[日志] 登录 OTP 校验状态: {val2.status_code}")
+                if val2.status_code != 200:
+                    print(f"[失败] 登录 OTP 校验失败: {val2.text[:200]}")
+                    continue
+                val2_data = val2.json() or {}
+                print("[成功] 登录 OTP 验证成功")
+
+                consent_url = str(val2_data.get("continue_url") or "").strip()
+                if consent_url:
+                    s2.get(consent_url, timeout=15)
+
+                auth_cookie = s2.cookies.get("oai-client-auth-session", domain=".auth.openai.com") or s2.cookies.get("oai-client-auth-session")
+                if not auth_cookie:
+                    print("[失败] 登录后未能获取 oai-client-auth-session")
+                    continue
+                auth_json = _decode_jwt_segment(auth_cookie.split(".")[0])
+
+                if "workspaces" not in auth_json or not auth_json["workspaces"]:
+                    print(f"[失败] Cookie 中无 workspaces: {list(auth_json.keys())}")
+                    continue
+                workspace_id = auth_json["workspaces"][0]["id"]
+                print(f"[成功] Workspace ID: {workspace_id}")
+
+                select_resp = s2.post(
+                    "https://auth.openai.com/api/accounts/workspace/select",
+                    headers={
+                        "referer": consent_url,
+                        "accept": "application/json",
+                        "content-type": "application/json",
+                    },
+                    json={"workspace_id": workspace_id},
+                    timeout=15,
+                )
+                print(f"[日志] Workspace 选择状态: {select_resp.status_code}")
+                if select_resp.status_code != 200:
+                    print(f"[失败] Workspace 选择失败: {select_resp.text[:200]}")
+                    continue
+                sel_data = select_resp.json() or {}
+
+                if sel_data.get("page", {}).get("type", "") == "organization_select":
+                    orgs = sel_data.get("page", {}).get("payload", {}).get("data", {}).get("orgs", [])
+                    if orgs:
+                        org_sel = s2.post(
+                            "https://auth.openai.com/api/accounts/organization/select",
+                            headers={"accept": "application/json", "content-type": "application/json"},
+                            json={
+                                "org_id": orgs[0].get("id", ""),
+                                "project_id": orgs[0].get("default_project_id", ""),
+                            },
+                            timeout=15,
+                        )
+                        print(f"[日志] Organization 选择状态: {org_sel.status_code}")
+                        if org_sel.status_code != 200:
+                            print(f"[失败] Organization 选择失败: {org_sel.text[:200]}")
+                            continue
+                        sel_data = org_sel.json() or {}
+
+                if "continue_url" not in sel_data:
+                    print(f"[失败] 未能获取 continue_url: {json.dumps(sel_data, ensure_ascii=False)[:500]}")
+                    continue
+
+                print("[步骤10] 跟踪重定向并换取 Token...")
+                r = s2.get(str(sel_data["continue_url"]), allow_redirects=False, timeout=15)
+                cbk = None
+                for i in range(20):
+                    loc = r.headers.get("Location", "")
+                    print(f"  -> 重定向 #{i+1} 状态: {r.status_code} | 下一跳: {loc[:80] if loc else '无'}")
+                    if loc.startswith("http://localhost"):
+                        cbk = loc
+                        break
+                    if r.status_code not in (301, 302, 303) or not loc:
+                        break
+                    r = s2.get(loc, allow_redirects=False, timeout=15)
+
+                if not cbk:
+                    print("[失败] 未能获取到 Callback URL")
+                    continue
+
+                token_json = submit_callback_url(
+                    callback_url=cbk,
+                    expected_state=oauth2.state,
+                    code_verifier=oauth2.code_verifier,
+                    redirect_uri=oauth2.redirect_uri,
+                    session=s2,
+                )
+                print("[大功告成] 账号注册完毕!")
+                return token_json, email, password
+            except Exception as e:
+                print(f"[失败] 登录补全流程异常: {e}")
+                time.sleep(2)
+                continue
+
+        print("[失败] 登录补全流程 3 次均未完成。")
+        return None
+    except Exception as e:
+        print(f"[致命错误] 流程崩溃: {e}")
+        return None
+
+# ========== Main 保持原版完整结构与输出格式 ==========
+
+def main():
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--proxy", help="代理地址")
+    parser.add_argument("--mail-provider", choices=["auto", "gptmail", "tempmail"], default="auto", help="临时邮箱提供商:auto/gptmail/tempmail")
+    parser.add_argument("--once", action="store_true", help="只运行一次")
+    parser.add_argument("--sleep-min", type=int, default=5, help="最小间隔(秒)")
+    parser.add_argument("--sleep-max", type=int, default=30, help="最大间隔(秒)")
+
+    parser.add_argument("--cpa-base-url", default=os.getenv("CPA_BASE_URL"), help="CPA 基础地址")
+    parser.add_argument("--cpa-token", default=os.getenv("CPA_TOKEN"), help="CPA 管理 token (Bearer)")
+    parser.add_argument("--cpa-workers", type=int, default=20, help="CPA 清理并发")
+    parser.add_argument("--cpa-timeout", type=int, default=12, help="CPA 请求超时")
+    parser.add_argument("--cpa-retries", type=int, default=1, help="CPA 清理重试次数")
+    parser.add_argument("--cpa-used-threshold", type=int, default=95, help="CPA used_percent 阈值")
+    parser.add_argument("--cpa-clean", action="store_true", help="注册后自动清理 CPA 失效账号")
+    parser.add_argument("--cpa-upload", action="store_true", help="注册后自动上传 CPA")
+    parser.add_argument("--cpa-target-count", type=int, default=300, help="目标 token 数(有效)")
+    parser.add_argument("--cpa-prune-local", action="store_true", help="上传成功后删除本地 token 文件与账号行")
+    args = parser.parse_args()
+
+    tokens_dir = OUT_DIR / "tokens"
+    tokens_dir.mkdir(parents=True, exist_ok=True)
+
+    pm = _build_cpa_maintainer(args)
+
+    count = 0
+    while True:
+        count += 1
+        print(f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 流程 #{count} <<<")
+
+        if pm:
+            if args.cpa_clean:
+                _clean_invalid_in_cpa(pm, args)
+            current_count = _count_valid_cpa_tokens(pm, args)
+            print(f"[CPA] 当前有效 token: {current_count} / {args.cpa_target_count}")
+            if current_count >= args.cpa_target_count:
+                if args.once:
+                    break
+                wait_time = random.randint(args.sleep_min, args.sleep_max)
+                print(f"[*] 随机休息 {wait_time} 秒...")
+                time.sleep(wait_time)
+                continue
+
+        res = run(args.proxy, args.mail_provider)
+        if res:
+            token_json, email, real_pwd = res
+            print(f"[🎉] 成功! {email} ---- {real_pwd}")
+
+            # 1. 保存账号密码到 tokens/accounts.txt
+            with open(tokens_dir / "accounts.txt", "a", encoding="utf-8") as f:
+                f.write(f"{email}----{real_pwd}\n")
+
+            # 2. 保存详细 Token JSON
+            fname_email = email.replace("@", "_")
+            token_file = tokens_dir / f"token_{fname_email}_{int(time.time())}.json"
+            token_file.write_text(token_json, encoding="utf-8")
+            print(f"[*] Token 文件已保存: {token_file.name}")
+
+            # 3. 上传 CPA(可选)
+            upload_ok = False
+            if args.cpa_upload:
+                upload_ok = _upload_token_to_cpa(pm, token_json, email, proxy=args.proxy or "")
+
+            # 4. 上传成功后按需删除本地文件/账号行
+            if upload_ok and args.cpa_prune_local:
+                try:
+                    if token_file.exists():
+                        token_file.unlink()
+                        print(f"[本地清理] 已删除 token 文件: {token_file.name}")
+                except Exception as e:
+                    print(f"[本地清理] 删除 token 文件失败: {e}")
+                _remove_account_entry(tokens_dir / "accounts.txt", email, real_pwd)
+
+            # 5. 注册后再清理一次(可选)
+            if pm and args.cpa_clean:
+                _clean_invalid_in_cpa(pm, args)
+        else:
+            print("[-] 本次注册流程未能完成。")
+
+        if args.once:
+            break
+
+        wait_time = random.randint(args.sleep_min, args.sleep_max)
+        print(f"[*] 随机休息 {wait_time} 秒...")
+        time.sleep(wait_time)
+
+if __name__ == "__main__":
+    main()