| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149 |
- import json
- import os
- import re
- import sys
- import time
- import uuid
- import random
- import string
- import secrets
- import hashlib
- import base64
- import argparse
- from pathlib import Path
- from datetime import datetime, timedelta
- from dataclasses import dataclass
- from typing import Any, Dict, Optional, List
- import urllib.parse
- import urllib.request
- import urllib.error
- import asyncio
- import requests as py_requests
- try:
- import aiohttp
- except ImportError:
- aiohttp = None
- from curl_cffi import requests
- OUT_DIR = Path(__file__).parent.resolve()
- UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36"
- AUTH_URL = "https://auth.openai.com/oauth/authorize"
- TOKEN_URL = "https://auth.openai.com/oauth/token"
- CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
- DEFAULT_REDIRECT_URI = "http://localhost:1455/auth/callback"
- DEFAULT_SCOPE = "openid email profile offline_access"
- # ========== 临时邮箱提供商:GPTMail + TempMail.lol ==========
- class GPTMailClient:
- def __init__(self, proxies: Any = None):
- self.session = requests.Session(proxies=proxies, impersonate="chrome")
- self.session.headers.update({
- "User-Agent": UA,
- "Accept": "application/json, text/plain, */*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Referer": "https://mail.chatgpt.org.uk/",
- })
- self.base_url = "https://mail.chatgpt.org.uk"
- def _init_browser_session(self):
- try:
- resp = self.session.get(self.base_url, timeout=15)
- gm_sid = self.session.cookies.get("gm_sid")
- if gm_sid:
- self.session.headers.update({"Cookie": f"gm_sid={gm_sid}"})
- token_match = re.search(r'(eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+)', resp.text)
- if token_match:
- self.session.headers.update({"x-inbox-token": token_match.group(1)})
- except Exception:
- pass
- def generate_email(self) -> str:
- self._init_browser_session()
- resp = self.session.get(f"{self.base_url}/api/generate-email", timeout=15)
- if resp.status_code == 200:
- data = resp.json()
- email = data["data"]["email"]
- self.session.headers.update({"x-inbox-token": data["auth"]["token"]})
- print(f"[+] 生成邮箱: {email} (GPTMail)")
- print("[*] 自动轮询已启动(GPTMail 会话已准备)")
- return email
- raise RuntimeError(f"GPTMail 生成失败: {resp.status_code}")
- def list_emails(self, email: str) -> List[Dict[str, Any]]:
- encoded_email = urllib.parse.quote(email)
- resp = self.session.get(f"{self.base_url}/api/emails?email={encoded_email}", timeout=15)
- if resp.status_code == 200:
- return resp.json().get("data", {}).get("emails", [])
- return []
- class Message:
- def __init__(self, data: dict):
- self.from_addr = data.get("from", "")
- self.subject = data.get("subject", "")
- self.body = data.get("body", "") or ""
- self.html_body = data.get("html", "") or ""
- class EMail:
- def __init__(self, proxies: Any = None):
- self.s = requests.Session(proxies=proxies, impersonate="chrome")
- self.s.headers.update({
- "User-Agent": UA,
- "Accept": "application/json",
- "Content-Type": "application/json",
- })
- r = self.s.post("https://api.tempmail.lol/v2/inbox/create", json={}, timeout=15)
- r.raise_for_status()
- data = r.json()
- self.address = data["address"]
- self.token = data["token"]
- print(f"[+] 生成邮箱: {self.address} (TempMail.lol)")
- print("[*] 自动轮询已启动(token 已保存)")
- def _get_messages(self) -> List[Dict[str, Any]]:
- r = self.s.get(f"https://api.tempmail.lol/v2/inbox?token={self.token}", timeout=15)
- r.raise_for_status()
- return r.json().get("emails", [])
- class PMailClient:
- def __init__(self, proxies: Any = None):
- self.session = requests.Session(proxies=proxies, impersonate="chrome")
- self.session.headers.update({
- "Accept": "application/json, text/plain, */*",
- "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
- "Cache-Control": "no-cache",
- "Content-Type": "application/json;charset=UTF-8;",
- "Lang": "zhCn",
- "Origin": "https://pmail.030208.xyz",
- "Pragma": "no-cache",
- "Priority": "u=1, i",
- "Referer": "https://pmail.030208.xyz/",
- "Sec-Ch-Ua": '"Chromium";v="146", "Not-A.Brand";v="24", "Microsoft"',
- "Sec-Ch-Ua-Mobile": "?0",
- "Sec-Ch-Ua-Platform": '"Windows"',
- "Sec-Fetch-Dest": "empty",
- "Sec-Fetch-Mode": "cors",
- "Sec-Fetch-Site": "same-origin",
- "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/146.0.0.0 Safari/537.36 Edg/146.0.0.0",
- "Cookie": "session=beG8AdZzfjgHWPwR0mrgoWGvjMU30zwPgvfNgT31sSk"
- })
- def list_emails(self, target_email: str) -> List[Dict[str, Any]]:
- url = "https://pmail.030208.xyz/api/email/list"
- payload = {"tag": {"type": 0, "status": -1, "group_id": 0}, "page_size": 10}
- try:
- resp = self.session.post(url, json=payload, timeout=15)
- if resp.status_code == 200:
- data = resp.json()
- emails_list = data.get("data", {}).get("list", [])
- results = []
- for item in emails_list[:3]:
- to_arr = item.get("to", [])
- if to_arr and len(to_arr) > 0:
- addr = to_arr[0].get("EmailAddress", "")
- if addr == target_email:
- results.append({
- "subject": item.get("title", ""),
- "txt": item.get("title", ""),
- "from": addr
- })
- return results
- except Exception:
- pass
- return []
- def get_email_and_code_fetcher(proxies: Any = None, provider: str = "auto"):
- provider = (provider or "auto").strip().lower()
- if provider not in {"auto", "gptmail", "tempmail", "pmail"}:
- raise ValueError(f"不支持的邮箱提供商: {provider}")
- def _build_tempmail_bundle():
- inbox = EMail(proxies)
- email = inbox.address
- def _extract_all_codes() -> List[str]:
- results: List[str] = []
- try:
- msgs = inbox._get_messages()
- for msg_data in msgs:
- msg = Message(msg_data)
- body = msg.body or msg.html_body or msg.subject or ""
- results.extend(re.findall(r"\b(\d{6})\b", body))
- except Exception:
- pass
- return results
- def fetch_code(timeout_sec: int = 180, poll: float = 6.0, exclude_codes: Optional[List[str]] = None) -> str | None:
- exclude = set(exclude_codes or [])
- start = time.monotonic()
- attempt = 0
- while time.monotonic() - start < timeout_sec:
- attempt += 1
- try:
- msgs = inbox._get_messages()
- print(f"[otp][tempmail] 轮询 #{attempt}, 收到 {len(msgs)} 封邮件, 目标: {email}")
- for msg_data in msgs:
- msg = Message(msg_data)
- body = msg.body or msg.html_body or msg.subject or ""
- for code in re.findall(r"\b(\d{6})\b", body):
- if code not in exclude:
- return code
- except Exception:
- pass
- time.sleep(poll)
- return None
- return email, _gen_password(), fetch_code, _extract_all_codes, "tempmail"
- def _build_gptmail_bundle():
- client = GPTMailClient(proxies)
- email = client.generate_email()
- def _extract_all_codes() -> List[str]:
- regex = r"(?<!\d)(\d{6})(?!\d)"
- results: List[str] = []
- try:
- summaries = client.list_emails(email)
- for s in summaries:
- body = " ".join([
- str(s.get("subject", "") or ""),
- str(s.get("text", "") or ""),
- str(s.get("body", "") or ""),
- str(s.get("html", "") or ""),
- json.dumps(s, ensure_ascii=False),
- ])
- results.extend(re.findall(regex, body))
- except Exception:
- pass
- return results
- def fetch_code(timeout_sec: int = 180, poll: float = 6.0, exclude_codes: Optional[List[str]] = None) -> str | None:
- exclude = set(exclude_codes or [])
- start = time.monotonic()
- attempt = 0
- while time.monotonic() - start < timeout_sec:
- attempt += 1
- try:
- summaries = client.list_emails(email)
- print(f"[otp][gptmail] 轮询 #{attempt}, 收到 {len(summaries)} 封邮件, 目标: {email}")
- for s in summaries:
- body = " ".join([
- str(s.get("subject", "") or ""),
- str(s.get("text", "") or ""),
- str(s.get("body", "") or ""),
- str(s.get("html", "") or ""),
- json.dumps(s, ensure_ascii=False),
- ])
- for code in re.findall(r"(?<!\d)(\d{6})(?!\d)", body):
- if code not in exclude:
- return code
- except Exception:
- pass
- time.sleep(poll)
- return None
- return email, _gen_password(), fetch_code, _extract_all_codes, "gptmail"
- def _build_pmail_bundle():
- client = PMailClient(proxies)
- email = _random_name().lower() + "@030208.xyz"
- print(f"[+] 分配自建域邮箱别名: {email} (PMail)")
- print("[*] 自动轮询已启动(关联至主信箱 husj@030208.xyz)")
- def _extract_all_codes() -> List[str]:
- results: List[str] = []
- try:
- summaries = client.list_emails(email)
- for s in summaries:
- body = f"{s.get('subject', '')} {s.get('txt', '')}"
- results.extend(re.findall(r"(?<!\d)(\d{6})(?!\d)", body))
- except Exception:
- pass
- return results
- def fetch_code(timeout_sec: int = 180, poll: float = 6.0, exclude_codes: Optional[List[str]] = None) -> str | None:
- exclude = set(exclude_codes or [])
- start = time.monotonic()
- attempt = 0
- while time.monotonic() - start < timeout_sec:
- attempt += 1
- try:
- summaries = client.list_emails(email)
- print(f"[otp][pmail] 轮询 #{attempt}, 收到 {len(summaries)} 封目标邮件, 目标: {email}")
- for s in summaries:
- body = f"{s.get('subject', '')} {s.get('txt', '')}"
- for code in re.findall(r"(?<!\d)(\d{6})(?!\d)", body):
- if code not in exclude:
- return code
- except Exception:
- pass
- time.sleep(poll)
- return None
- return email, _gen_password(), fetch_code, _extract_all_codes, "pmail"
- if provider == "pmail":
- return _build_pmail_bundle()
- if provider == "tempmail":
- return _build_tempmail_bundle()
- if provider == "gptmail":
- return _build_gptmail_bundle()
- try:
- return _build_pmail_bundle()
- except Exception as e:
- print(f"[邮箱] PMail 初始化失败,回退 TempMail: {e}")
- try:
- return _build_tempmail_bundle()
- except Exception as e2:
- print(f"[邮箱] TempMail 初始化失败,回退 GPTMail: {e2}")
- return _build_gptmail_bundle()
- # ========== OAuth 核心逻辑 (对齐原版的完美重定向流) ==========
- def _gen_password() -> str:
- alphabet = string.ascii_letters + string.digits
- special = "!@#$%^&*.-"
- base = [random.choice(string.ascii_lowercase), random.choice(string.ascii_uppercase),
- random.choice(string.digits), random.choice(special)]
- base += [random.choice(alphabet + special) for _ in range(12)]
- random.shuffle(base)
- return "".join(base)
- def _random_name() -> str:
- return ''.join(random.choice(string.ascii_lowercase) for _ in range(7)).capitalize()
- def _random_birthdate() -> str:
- start = datetime(1975, 1, 1); end = datetime(1999, 12, 31)
- d = start + timedelta(days=random.randrange((end - start).days + 1))
- return d.strftime('%Y-%m-%d')
- def _b64url_no_pad(raw: bytes) -> str:
- return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
- def _sha256_b64url_no_pad(s: str) -> str:
- return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
- def _pkce_verifier() -> str:
- return secrets.token_urlsafe(64)
- def _parse_callback_url(callback_url: str) -> Dict[str, Any]:
- candidate = (callback_url or "").strip()
- if not candidate:
- return {"code": "", "state": "", "error": "", "error_description": ""}
- if "://" not in candidate:
- if candidate.startswith("?"):
- candidate = f"http://localhost{candidate}"
- elif any(ch in candidate for ch in "/?#") or ":" in candidate:
- candidate = f"http://{candidate}"
- elif "=" in candidate:
- candidate = f"http://localhost/?{candidate}"
- parsed = urllib.parse.urlparse(candidate)
- query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
- fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
- for key, values in fragment.items():
- if key not in query or not query[key] or not (query[key][0] or "").strip():
- query[key] = values
- def get1(k: str) -> str:
- return (query.get(k, [""])[0] or "").strip()
- code = get1("code")
- state = get1("state")
- error = get1("error")
- error_description = get1("error_description")
- if code and not state and "#" in code:
- code, state = code.split("#", 1)
- if not error and error_description:
- error, error_description = error_description, ""
- return {"code": code, "state": state, "error": error, "error_description": error_description}
- def _decode_jwt_segment(seg: str) -> Dict[str, Any]:
- try:
- pad = "=" * ((4 - (len(seg) % 4)) % 4)
- return json.loads(base64.urlsafe_b64decode((seg + pad).encode("ascii")).decode("utf-8"))
- except Exception:
- return {}
- def _jwt_claims_no_verify(token: str) -> Dict[str, Any]:
- if not token or token.count(".") < 2:
- return {}
- return _decode_jwt_segment(token.split(".")[1])
- def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
- body = urllib.parse.urlencode(data).encode("utf-8")
- req = urllib.request.Request(
- url,
- data=body,
- method="POST",
- headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"},
- )
- try:
- with urllib.request.urlopen(req, timeout=timeout) as resp:
- raw = resp.read()
- if resp.status != 200:
- raise RuntimeError(f"Token 交换失败: {resp.status}: {raw.decode('utf-8', 'replace')}")
- return json.loads(raw.decode("utf-8"))
- except urllib.error.HTTPError as exc:
- raw = exc.read()
- raise RuntimeError(f"Token 交换失败: {exc.code}: {raw.decode('utf-8', 'replace')}") from exc
- def _to_int(v: Any) -> int:
- try:
- return int(v)
- except (TypeError, ValueError):
- return 0
- def _build_sentinel_payload(session, did: str, flow: str) -> str:
- req_body = json.dumps({"p": "", "id": did, "flow": flow})
- resp = session.post(
- "https://sentinel.openai.com/backend-api/sentinel/req",
- headers={
- "origin": "https://sentinel.openai.com",
- "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6",
- "content-type": "text/plain;charset=UTF-8",
- },
- data=req_body,
- timeout=15,
- )
- if resp.status_code != 200:
- raise RuntimeError(f"Sentinel 验证失败: {resp.status_code}: {resp.text[:200]}")
- token = (resp.json() or {}).get("token", "")
- return json.dumps({"p": "", "t": "", "c": token, "id": did, "flow": flow})
- @dataclass(frozen=True)
- class OAuthStart:
- auth_url: str; state: str; code_verifier: str; redirect_uri: str
- def generate_oauth_url(redirect_uri: str = DEFAULT_REDIRECT_URI) -> OAuthStart:
- state = secrets.token_urlsafe(16)
- verifier = _pkce_verifier()
- challenge = _sha256_b64url_no_pad(verifier)
- params = {
- "client_id": CLIENT_ID,
- "response_type": "code",
- "redirect_uri": redirect_uri,
- "scope": DEFAULT_SCOPE,
- "state": state,
- "code_challenge": challenge,
- "code_challenge_method": "S256",
- "prompt": "login",
- "id_token_add_organizations": "true",
- "codex_cli_simplified_flow": "true",
- }
- return OAuthStart(f"{AUTH_URL}?{urllib.parse.urlencode(params)}", state, verifier, redirect_uri)
- def fetch_sentinel_token(flow: str, did: str, proxies: Any = None) -> Optional[str]:
- try:
- session = requests.Session(proxies=proxies, impersonate="chrome")
- payload = _build_sentinel_payload(session, did, flow)
- return (json.loads(payload) or {}).get("c")
- except Exception:
- return None
- def submit_callback_url(callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str, session=None) -> str:
- cb = _parse_callback_url(callback_url)
- if cb.get("error"):
- raise RuntimeError(f"OAuth 错误: {cb['error']}: {cb.get('error_description', '')}".strip())
- if not cb.get("code"):
- raise ValueError("Callback URL 缺少 ?code=")
- if not cb.get("state"):
- raise ValueError("Callback URL 缺少 ?state=")
- if cb.get("state") != expected_state:
- raise ValueError("State 校验不匹配")
- token_data = {
- "grant_type": "authorization_code",
- "client_id": CLIENT_ID,
- "code": cb["code"],
- "redirect_uri": redirect_uri,
- "code_verifier": code_verifier,
- }
- if session is not None:
- resp = session.post(
- TOKEN_URL,
- data=token_data,
- headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"},
- timeout=30,
- )
- if resp.status_code != 200:
- raise RuntimeError(f"Token 交换失败: {resp.status_code}: {resp.text[:200]}")
- token_resp = resp.json()
- else:
- token_resp = _post_form(TOKEN_URL, token_data)
- access_token = str(token_resp.get("access_token") or "").strip()
- refresh_token = str(token_resp.get("refresh_token") or "").strip()
- id_token = str(token_resp.get("id_token") or "").strip()
- expires_in = _to_int(token_resp.get("expires_in"))
- claims = _jwt_claims_no_verify(id_token)
- auth_claims = claims.get("https://api.openai.com/auth") or {}
- now = int(time.time())
- config = {
- "id_token": id_token,
- "access_token": access_token,
- "refresh_token": refresh_token,
- "account_id": str(auth_claims.get("chatgpt_account_id") or "").strip(),
- "last_refresh": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)),
- "email": str(claims.get("email") or "").strip(),
- "type": "codex",
- "expired": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))),
- }
- return json.dumps(config, ensure_ascii=False, indent=2)
- # ========== 轻量版 CPA 维护实现(内嵌,不依赖项目包) ==========
- DEFAULT_MGMT_UA = "codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal"
- def _mgmt_headers(token: str) -> dict:
- clean = str(token or "").strip()
- if clean and not clean.lower().startswith("bearer "):
- clean = f"Bearer {clean}"
- return {"Authorization": clean, "Accept": "application/json"}
- def _join_mgmt_url(base_url: str, path: str) -> str:
- base = (base_url or "").rstrip("/")
- suffix = path if path.startswith("/") else f"/{path}"
- if base.endswith("/v0"):
- return f"{base}{suffix}"
- return f"{base}/v0{suffix}"
- def _safe_json(text: str):
- try:
- return json.loads(text)
- except Exception:
- return {}
- def _extract_account_id(item: dict):
- for key in ("chatgpt_account_id", "chatgptAccountId", "account_id", "accountId"):
- val = item.get(key)
- if val:
- return str(val)
- return None
- def _get_item_type(item: dict) -> str:
- return str(item.get("type") or item.get("typo") or "")
- class MiniPoolMaintainer:
- def __init__(self, base_url: str, token: str, target_type: str = "codex", used_percent_threshold: int = 95, user_agent: str = DEFAULT_MGMT_UA):
- self.base_url = (base_url or "").rstrip("/")
- self.token = token
- self.target_type = target_type
- self.used_percent_threshold = used_percent_threshold
- self.user_agent = user_agent
- def upload_token(self, filename: str, token_data: dict, proxy: str = "") -> bool:
- if not self.base_url or not self.token:
- return False
- content = json.dumps(token_data, ensure_ascii=False).encode("utf-8")
- files = {"file": (filename, content, "application/json")}
- headers = {"Authorization": f"Bearer {self.token}"}
- proxies = {"http": proxy, "https": proxy} if proxy else None
- for attempt in range(3):
- try:
- resp = py_requests.post(_join_mgmt_url(self.base_url, "/management/auth-files"), files=files, headers=headers, timeout=30, verify=False, proxies=proxies)
- if resp.status_code in (200, 201, 204):
- return True
- except Exception:
- pass
- if attempt < 2:
- time.sleep(2 ** attempt)
- return False
- def fetch_auth_files(self, timeout: int = 15):
- resp = py_requests.get(_join_mgmt_url(self.base_url, "/management/auth-files"), headers=_mgmt_headers(self.token), timeout=timeout)
- resp.raise_for_status()
- data = resp.json()
- return (data.get("files") if isinstance(data, dict) else []) or []
- async def probe_and_clean_async(self, workers: int = 20, timeout: int = 10, retries: int = 1):
- if aiohttp is None:
- raise RuntimeError("需要安装 aiohttp: pip install aiohttp")
- files = self.fetch_auth_files(timeout)
- candidates = [f for f in files if _get_item_type(f).lower() == self.target_type.lower()]
- if not candidates:
- return {"total": len(files), "candidates": 0, "invalid_count": 0, "deleted_ok": 0, "deleted_fail": 0}
- semaphore = asyncio.Semaphore(max(1, workers))
- connector = aiohttp.TCPConnector(limit=max(1, workers))
- client_timeout = aiohttp.ClientTimeout(total=max(1, timeout))
- async def probe_one(session, item):
- auth_index = item.get("auth_index")
- name = item.get("name") or item.get("id")
- res = {"name": name, "auth_index": auth_index, "invalid_401": False, "invalid_used_percent": False, "used_percent": None}
- if not auth_index:
- res["invalid_401"] = False
- return res
- account_id = _extract_account_id(item)
- header = {"Authorization": "Bearer $TOKEN$", "Content-Type": "application/json", "User-Agent": self.user_agent}
- if account_id:
- header["Chatgpt-Account-Id"] = account_id
- payload = {"authIndex": auth_index, "method": "GET", "url": "https://chatgpt.com/backend-api/wham/usage", "header": header}
- for attempt in range(retries + 1):
- try:
- async with semaphore:
- async with session.post(_join_mgmt_url(self.base_url, "/management/api-call"), headers={**_mgmt_headers(self.token), "Content-Type": "application/json"}, json=payload, timeout=timeout) as resp:
- text = await resp.text()
- if resp.status >= 400:
- raise RuntimeError(f"HTTP {resp.status}: {text[:200]}")
- data = _safe_json(text)
- sc = data.get("status_code")
- res["invalid_401"] = sc == 401
- if sc == 200:
- body = _safe_json(data.get("body", ""))
- used_pct = (body.get("rate_limit", {}).get("primary_window", {}).get("used_percent"))
- if used_pct is not None:
- res["used_percent"] = used_pct
- res["invalid_used_percent"] = used_pct >= self.used_percent_threshold
- return res
- except Exception as e:
- if attempt >= retries:
- res["error"] = str(e)
- return res
- return res
- async def delete_one(session, name: str):
- if not name:
- return False
- from urllib.parse import quote
- encoded = quote(name, safe="")
- try:
- async with semaphore:
- async with session.delete(f"{_join_mgmt_url(self.base_url, '/management/auth-files')}?name={encoded}", headers=_mgmt_headers(self.token), timeout=timeout) as resp:
- text = await resp.text()
- data = _safe_json(text)
- return resp.status == 200 and data.get("status") == "ok"
- except Exception:
- return False
- invalid_list = []
- async with aiohttp.ClientSession(connector=connector, timeout=client_timeout, trust_env=True) as session:
- tasks = [asyncio.create_task(probe_one(session, item)) for item in candidates]
- for task in asyncio.as_completed(tasks):
- r = await task
- if r.get("invalid_401") or r.get("invalid_used_percent"):
- invalid_list.append(r)
- delete_tasks = [asyncio.create_task(delete_one(session, r.get("name"))) for r in invalid_list if r.get("name")]
- deleted_ok = 0
- deleted_fail = 0
- for task in asyncio.as_completed(delete_tasks):
- if await task:
- deleted_ok += 1
- else:
- deleted_fail += 1
- return {
- "total": len(files),
- "candidates": len(candidates),
- "invalid_count": len(invalid_list),
- "deleted_ok": deleted_ok,
- "deleted_fail": deleted_fail,
- }
- def probe_and_clean_sync(self, workers: int = 20, timeout: int = 10, retries: int = 1):
- return asyncio.run(self.probe_and_clean_async(workers, timeout, retries))
- def _build_cpa_maintainer(args):
- base_url = (args.cpa_base_url or os.getenv("CPA_BASE_URL") or "").strip()
- token = (args.cpa_token or os.getenv("CPA_TOKEN") or "").strip()
- if not base_url or not token:
- print("[CPA] 未提供 cpa_base_url / cpa_token,跳过 CPA 上传/清理")
- return None
- try:
- return MiniPoolMaintainer(
- base_url,
- token,
- target_type="codex",
- used_percent_threshold=args.cpa_used_threshold,
- user_agent=DEFAULT_MGMT_UA,
- )
- except Exception as e:
- print(f"[CPA] 创建维护器失败: {e}")
- return None
- def _upload_token_to_cpa(pm, token_json: str, email: str, proxy: str = "") -> bool:
- if not pm:
- return False
- try:
- data = json.loads(token_json)
- except Exception as e:
- print(f"[CPA] 解析 token_json 失败: {e}")
- return False
- fname_email = email.replace("@", "_")
- filename = f"token_{fname_email}_{int(time.time())}.json"
- ok = pm.upload_token(filename=filename, token_data=data, proxy=proxy or "")
- if ok:
- print(f"[CPA] 已上传 {filename} 到 CPA")
- else:
- print("[CPA] 上传失败")
- return ok
- def _clean_invalid_in_cpa(pm, args):
- if not pm:
- return None
- try:
- res = pm.probe_and_clean_sync(
- workers=max(1, args.cpa_workers),
- timeout=max(5, args.cpa_timeout),
- retries=max(0, args.cpa_retries),
- )
- print(
- f"[CPA] 清理完成: total={res.get('total')} candidates={res.get('candidates')} "
- f"invalid={res.get('invalid_count')} deleted_ok={res.get('deleted_ok')} deleted_fail={res.get('deleted_fail')}"
- )
- return res
- except Exception as e:
- print(f"[CPA] 清理失败: {e}")
- return None
- def _count_valid_cpa_tokens(pm, args):
- if not pm:
- return 0
- try:
- files = pm.fetch_auth_files(timeout=max(5, args.cpa_timeout))
- target = pm.target_type.lower()
- valid = [f for f in files if _get_item_type(f).lower() == target]
- return len(valid)
- except Exception as e:
- print(f"[CPA] 统计 token 失败: {e}")
- return 0
- # 账号行清理:上传成功且开启 prune_local 后使用
- # 安全处理:文件不存在直接返回,写入保持末尾换行便于追加
- def _remove_account_entry(accounts_path: Path, email: str, real_pwd: str):
- if not accounts_path.exists():
- return
- try:
- lines = accounts_path.read_text(encoding="utf-8").splitlines()
- target = f"{email}----{real_pwd}"
- kept = [ln for ln in lines if ln.strip() != target]
- accounts_path.write_text("\n".join(kept) + ("\n" if kept else ""), encoding="utf-8")
- print(f"[本地清理] 已从 accounts.txt 移除: {email}")
- except Exception as e:
- print(f"[本地清理] 移除账号行失败: {e}")
- # ========== 主注册流程 (恢复详细日志与异常捕获) ==========
- def run(proxy: Optional[str], mail_provider: str = "auto"):
- proxies = {"http": proxy, "https": proxy} if proxy else None
- s = requests.Session(proxies=proxies, impersonate="chrome")
- s.headers.update({
- "user-agent": UA,
- "accept": "application/json, text/plain, */*",
- })
- print(f"\n{'='*20} 开启注册流程 {'='*20}")
- try:
- print(f"[步骤1] 正在初始化临时邮箱(provider={mail_provider})...")
- email, password, code_fetcher, extract_all_codes, actual_mail_provider = get_email_and_code_fetcher(proxies, provider=mail_provider)
- print(f"[*] 当前邮箱提供商: {actual_mail_provider}")
- if not email:
- print("[失败] 未能获取邮箱")
- return None
- print(f"[成功] 邮箱: {email} | 临时密码: {password}")
- print("[步骤2] 访问 OpenAI 授权页获取 Device ID...")
- oauth = generate_oauth_url()
- auth_page = s.get(oauth.auth_url, timeout=15)
- did = s.cookies.get("oai-did")
- if not did:
- print("[失败] 未能从 Cookie 获取 oai-did")
- return None
- print(f"[成功] Device ID: {did}")
- print("[步骤3] 获取 Sentinel 载荷并提交注册邮箱...")
- try:
- authorize_continue_sentinel = _build_sentinel_payload(s, did, "authorize_continue")
- except Exception as e:
- print(f"[失败] 获取 authorize_continue Sentinel 失败: {e}")
- return None
- continue_url = ""
- try:
- auth_json = auth_page.json() if hasattr(auth_page, "json") else {}
- continue_url = str((auth_json or {}).get("continue_url") or "").strip()
- except Exception:
- continue_url = ""
- if continue_url:
- try:
- s.get(continue_url, timeout=15)
- except Exception:
- pass
- signup_res = s.post(
- "https://auth.openai.com/api/accounts/authorize/continue",
- headers={
- "referer": "https://auth.openai.com/create-account",
- "accept": "application/json",
- "content-type": "application/json",
- "openai-sentinel-token": authorize_continue_sentinel,
- },
- data=json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "signup"}),
- timeout=15,
- )
- print(f"[日志] 邮箱提交状态: {signup_res.status_code}")
- if signup_res.status_code != 200:
- print(f"[失败] 邮箱提交失败: {signup_res.text[:200]}")
- return None
- print("[步骤4] 设置账户密码...")
- pwd_res = s.post(
- "https://auth.openai.com/api/accounts/user/register",
- headers={
- "referer": "https://auth.openai.com/create-account/password",
- "accept": "application/json",
- "content-type": "application/json",
- },
- json={"password": password, "username": email},
- timeout=15,
- )
- print(f"[日志] 密码设置状态: {pwd_res.status_code}")
- if pwd_res.status_code != 200:
- print(f"[失败] 密码设置失败: {pwd_res.text[:200]}")
- return None
- print("[步骤5] 触发 OpenAI 发送验证邮件...")
- s.get("https://auth.openai.com/create-account/password", timeout=15)
- otp_send_res = s.get(
- "https://auth.openai.com/api/accounts/email-otp/send",
- headers={"referer": "https://auth.openai.com/create-account/password", "accept": "application/json"},
- timeout=15,
- )
- print(f"[日志] 发送指令状态: {otp_send_res.status_code}")
- if otp_send_res.status_code != 200:
- print(f"[失败] 发送验证码失败: {otp_send_res.text[:200]}")
- return None
- print("[步骤6] 等待邮箱接收 6 位验证码...")
- code = code_fetcher()
- if not code:
- print("[失败] 邮箱长时间未收到验证码")
- return None
- print(f"[成功] 捕获验证码: {code}")
- print("[步骤7] 提交验证码至 OpenAI...")
- val_res = s.post(
- "https://auth.openai.com/api/accounts/email-otp/validate",
- headers={
- "referer": "https://auth.openai.com/email-verification",
- "accept": "application/json",
- "content-type": "application/json",
- },
- json={"code": code},
- timeout=15,
- )
- print(f"[日志] 验证码校验状态: {val_res.status_code}")
- if val_res.status_code != 200:
- print(f"[失败] 验证码校验失败: {val_res.text[:200]}")
- return None
- print("[步骤8] 完善账户基本信息...")
- try:
- create_account_sentinel = _build_sentinel_payload(s, did, "authorize_continue")
- except Exception as e:
- print(f"[失败] 获取 create_account Sentinel 失败: {e}")
- return None
- acc_res = s.post(
- "https://auth.openai.com/api/accounts/create_account",
- headers={
- "referer": "https://auth.openai.com/about-you",
- "accept": "application/json",
- "content-type": "application/json",
- "openai-sentinel-token": create_account_sentinel,
- },
- data=json.dumps({"name": _random_name(), "birthdate": _random_birthdate()}),
- timeout=15,
- )
- print(f"[日志] 账户创建状态: {acc_res.status_code}")
- if acc_res.status_code != 200:
- print(f"[失败] 账户创建失败: {acc_res.text[:200]}")
- return None
- print("[步骤9] 注册完成,重新走登录流程获取 Workspace / Token...")
- first_code = code
- for login_attempt in range(3):
- try:
- print(f"[*] 正在通过登录流程获取 Token...{f' (重试 {login_attempt}/3)' if login_attempt else ''}")
- s2 = requests.Session(proxies=proxies, impersonate="chrome")
- oauth2 = generate_oauth_url()
- s2.get(oauth2.auth_url, timeout=15)
- did2 = s2.cookies.get("oai-did")
- if not did2:
- print("[失败] 登录会话未能获取 oai-did")
- continue
- lc = s2.post(
- "https://auth.openai.com/api/accounts/authorize/continue",
- headers={
- "referer": "https://auth.openai.com/log-in",
- "accept": "application/json",
- "content-type": "application/json",
- "openai-sentinel-token": _build_sentinel_payload(s2, did2, "authorize_continue"),
- },
- data=json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "login"}),
- timeout=15,
- )
- print(f"[日志] 登录邮箱提交状态: {lc.status_code}")
- if lc.status_code != 200:
- print(f"[失败] 登录邮箱提交失败: {lc.text[:200]}")
- continue
- s2.get(str((lc.json() or {}).get("continue_url") or ""), timeout=15)
- pw = s2.post(
- "https://auth.openai.com/api/accounts/password/verify",
- headers={
- "referer": "https://auth.openai.com/log-in/password",
- "accept": "application/json",
- "content-type": "application/json",
- "openai-sentinel-token": _build_sentinel_payload(s2, did2, "authorize_continue"),
- },
- json={"password": password},
- timeout=15,
- )
- print(f"[日志] 登录密码验证状态: {pw.status_code}")
- if pw.status_code != 200:
- print(f"[失败] 登录密码验证失败: {pw.text[:200]}")
- continue
- existing_codes = list(extract_all_codes())
- s2.get(
- "https://auth.openai.com/email-verification",
- headers={"referer": "https://auth.openai.com/log-in/password"},
- timeout=15,
- )
- print("[*] 正在等待登录 OTP...")
- time.sleep(2)
- otp2 = None
- baseline_codes = set(existing_codes)
- baseline_codes.add(first_code)
- for _ in range(40):
- all_codes = extract_all_codes()
- new_codes = [c for c in all_codes if c not in baseline_codes]
- if new_codes:
- otp2 = new_codes[-1]
- break
- time.sleep(2)
- if not otp2:
- print("[失败] 未收到登录 OTP")
- continue
- print(f"[成功] 捕获登录 OTP: {otp2}")
- val2 = s2.post(
- "https://auth.openai.com/api/accounts/email-otp/validate",
- headers={
- "referer": "https://auth.openai.com/email-verification",
- "accept": "application/json",
- "content-type": "application/json",
- },
- json={"code": otp2},
- timeout=15,
- )
- print(f"[日志] 登录 OTP 校验状态: {val2.status_code}")
- if val2.status_code != 200:
- print(f"[失败] 登录 OTP 校验失败: {val2.text[:200]}")
- continue
- val2_data = val2.json() or {}
- print("[成功] 登录 OTP 验证成功")
- consent_url = str(val2_data.get("continue_url") or "").strip()
- if consent_url:
- s2.get(consent_url, timeout=15)
- auth_cookie = s2.cookies.get("oai-client-auth-session", domain=".auth.openai.com") or s2.cookies.get("oai-client-auth-session")
- if not auth_cookie:
- print("[失败] 登录后未能获取 oai-client-auth-session")
- continue
- auth_json = _decode_jwt_segment(auth_cookie.split(".")[0])
- if "workspaces" not in auth_json or not auth_json["workspaces"]:
- print(f"[失败] Cookie 中无 workspaces: {list(auth_json.keys())}")
- continue
- workspace_id = auth_json["workspaces"][0]["id"]
- print(f"[成功] Workspace ID: {workspace_id}")
- select_resp = s2.post(
- "https://auth.openai.com/api/accounts/workspace/select",
- headers={
- "referer": consent_url,
- "accept": "application/json",
- "content-type": "application/json",
- },
- json={"workspace_id": workspace_id},
- timeout=15,
- )
- print(f"[日志] Workspace 选择状态: {select_resp.status_code}")
- if select_resp.status_code != 200:
- print(f"[失败] Workspace 选择失败: {select_resp.text[:200]}")
- continue
- sel_data = select_resp.json() or {}
- if sel_data.get("page", {}).get("type", "") == "organization_select":
- orgs = sel_data.get("page", {}).get("payload", {}).get("data", {}).get("orgs", [])
- if orgs:
- org_sel = s2.post(
- "https://auth.openai.com/api/accounts/organization/select",
- headers={"accept": "application/json", "content-type": "application/json"},
- json={
- "org_id": orgs[0].get("id", ""),
- "project_id": orgs[0].get("default_project_id", ""),
- },
- timeout=15,
- )
- print(f"[日志] Organization 选择状态: {org_sel.status_code}")
- if org_sel.status_code != 200:
- print(f"[失败] Organization 选择失败: {org_sel.text[:200]}")
- continue
- sel_data = org_sel.json() or {}
- if "continue_url" not in sel_data:
- print(f"[失败] 未能获取 continue_url: {json.dumps(sel_data, ensure_ascii=False)[:500]}")
- continue
- print("[步骤10] 跟踪重定向并换取 Token...")
- r = s2.get(str(sel_data["continue_url"]), allow_redirects=False, timeout=15)
- cbk = None
- for i in range(20):
- loc = r.headers.get("Location", "")
- print(f" -> 重定向 #{i+1} 状态: {r.status_code} | 下一跳: {loc[:80] if loc else '无'}")
- if loc.startswith("http://localhost"):
- cbk = loc
- break
- if r.status_code not in (301, 302, 303) or not loc:
- break
- r = s2.get(loc, allow_redirects=False, timeout=15)
- if not cbk:
- print("[失败] 未能获取到 Callback URL")
- continue
- token_json = submit_callback_url(
- callback_url=cbk,
- expected_state=oauth2.state,
- code_verifier=oauth2.code_verifier,
- redirect_uri=oauth2.redirect_uri,
- session=s2,
- )
- print("[大功告成] 账号注册完毕!")
- return token_json, email, password
- except Exception as e:
- print(f"[失败] 登录补全流程异常: {e}")
- time.sleep(2)
- continue
- print("[失败] 登录补全流程 3 次均未完成。")
- return None
- except Exception as e:
- print(f"[致命错误] 流程崩溃: {e}")
- return None
- # ========== Main 保持原版完整结构与输出格式 ==========
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument("--proxy", help="代理地址")
- parser.add_argument("--mail-provider", choices=["auto", "gptmail", "tempmail"], default="auto", help="临时邮箱提供商:auto/gptmail/tempmail")
- parser.add_argument("--once", action="store_true", help="只运行一次")
- parser.add_argument("--sleep-min", type=int, default=5, help="最小间隔(秒)")
- parser.add_argument("--sleep-max", type=int, default=30, help="最大间隔(秒)")
- parser.add_argument("--cpa-base-url", default=os.getenv("CPA_BASE_URL"), help="CPA 基础地址")
- parser.add_argument("--cpa-token", default=os.getenv("CPA_TOKEN"), help="CPA 管理 token (Bearer)")
- parser.add_argument("--cpa-workers", type=int, default=20, help="CPA 清理并发")
- parser.add_argument("--cpa-timeout", type=int, default=12, help="CPA 请求超时")
- parser.add_argument("--cpa-retries", type=int, default=1, help="CPA 清理重试次数")
- parser.add_argument("--cpa-used-threshold", type=int, default=95, help="CPA used_percent 阈值")
- parser.add_argument("--cpa-clean", action="store_true", help="注册后自动清理 CPA 失效账号")
- parser.add_argument("--cpa-upload", action="store_true", help="注册后自动上传 CPA")
- parser.add_argument("--cpa-target-count", type=int, default=300, help="目标 token 数(有效)")
- parser.add_argument("--cpa-prune-local", action="store_true", help="上传成功后删除本地 token 文件与账号行")
- args = parser.parse_args()
- tokens_dir = OUT_DIR / "tokens"
- tokens_dir.mkdir(parents=True, exist_ok=True)
- pm = _build_cpa_maintainer(args)
- count = 0
- while True:
- count += 1
- print(f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 流程 #{count} <<<")
- if pm:
- if args.cpa_clean:
- _clean_invalid_in_cpa(pm, args)
- current_count = _count_valid_cpa_tokens(pm, args)
- print(f"[CPA] 当前有效 token: {current_count} / {args.cpa_target_count}")
- if current_count >= args.cpa_target_count:
- if args.once:
- break
- wait_time = random.randint(args.sleep_min, args.sleep_max)
- print(f"[*] 随机休息 {wait_time} 秒...")
- time.sleep(wait_time)
- continue
- res = run(args.proxy, args.mail_provider)
- if res:
- token_json, email, real_pwd = res
- print(f"[🎉] 成功! {email} ---- {real_pwd}")
- # 1. 保存账号密码到 tokens/accounts.txt
- with open(tokens_dir / "accounts.txt", "a", encoding="utf-8") as f:
- f.write(f"{email}----{real_pwd}\n")
- # 2. 保存详细 Token JSON
- fname_email = email.replace("@", "_")
- token_file = tokens_dir / f"token_{fname_email}_{int(time.time())}.json"
- token_file.write_text(token_json, encoding="utf-8")
- print(f"[*] Token 文件已保存: {token_file.name}")
- # 3. 上传 CPA(可选)
- upload_ok = False
- if args.cpa_upload:
- upload_ok = _upload_token_to_cpa(pm, token_json, email, proxy=args.proxy or "")
- # 4. 上传成功后按需删除本地文件/账号行
- if upload_ok and args.cpa_prune_local:
- try:
- if token_file.exists():
- token_file.unlink()
- print(f"[本地清理] 已删除 token 文件: {token_file.name}")
- except Exception as e:
- print(f"[本地清理] 删除 token 文件失败: {e}")
- _remove_account_entry(tokens_dir / "accounts.txt", email, real_pwd)
- # 5. 注册后再清理一次(可选)
- if pm and args.cpa_clean:
- _clean_invalid_in_cpa(pm, args)
- else:
- print("[-] 本次注册流程未能完成。")
- if args.once:
- break
- wait_time = random.randint(args.sleep_min, args.sleep_max)
- print(f"[*] 随机休息 {wait_time} 秒...")
- time.sleep(wait_time)
- if __name__ == "__main__":
- main()
|