| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057 |
- import json
- import os
- import re
- import sys
- import time
- import uuid
- import random
- import string
- import secrets
- import hashlib
- import base64
- import argparse
- from pathlib import Path
- from datetime import datetime, timedelta
- from dataclasses import dataclass
- from typing import Any, Dict, Optional, List
- import urllib.parse
- import urllib.request
- import urllib.error
- import asyncio
- import requests as py_requests
- try:
- import aiohttp
- except ImportError:
- aiohttp = None
- from curl_cffi import requests
- OUT_DIR = Path(__file__).parent.resolve()
- UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/134.0.0.0 Safari/537.36"
- AUTH_URL = "https://auth.openai.com/oauth/authorize"
- TOKEN_URL = "https://auth.openai.com/oauth/token"
- CLIENT_ID = "app_EMoamEEZ73f0CkXaXp7hrann"
- DEFAULT_REDIRECT_URI = "http://localhost:1455/auth/callback"
- DEFAULT_SCOPE = "openid email profile offline_access"
- # ========== 临时邮箱提供商:GPTMail + TempMail.lol ==========
- class GPTMailClient:
- def __init__(self, proxies: Any = None):
- self.session = requests.Session(proxies=proxies, impersonate="chrome")
- self.session.headers.update({
- "User-Agent": UA,
- "Accept": "application/json, text/plain, */*",
- "Accept-Language": "zh-CN,zh;q=0.9",
- "Referer": "https://mail.chatgpt.org.uk/",
- })
- self.base_url = "https://mail.chatgpt.org.uk"
- def _init_browser_session(self):
- try:
- resp = self.session.get(self.base_url, timeout=15)
- gm_sid = self.session.cookies.get("gm_sid")
- if gm_sid:
- self.session.headers.update({"Cookie": f"gm_sid={gm_sid}"})
- token_match = re.search(r'(eyJ[a-zA-Z0-9_-]+\.[a-zA-Z0-9_-]+)', resp.text)
- if token_match:
- self.session.headers.update({"x-inbox-token": token_match.group(1)})
- except Exception:
- pass
- def generate_email(self) -> str:
- self._init_browser_session()
- resp = self.session.get(f"{self.base_url}/api/generate-email", timeout=15)
- if resp.status_code == 200:
- data = resp.json()
- email = data["data"]["email"]
- self.session.headers.update({"x-inbox-token": data["auth"]["token"]})
- print(f"[+] 生成邮箱: {email} (GPTMail)")
- print("[*] 自动轮询已启动(GPTMail 会话已准备)")
- return email
- raise RuntimeError(f"GPTMail 生成失败: {resp.status_code}")
- def list_emails(self, email: str) -> List[Dict[str, Any]]:
- encoded_email = urllib.parse.quote(email)
- resp = self.session.get(f"{self.base_url}/api/emails?email={encoded_email}", timeout=15)
- if resp.status_code == 200:
- return resp.json().get("data", {}).get("emails", [])
- return []
- class Message:
- def __init__(self, data: dict):
- self.from_addr = data.get("from", "")
- self.subject = data.get("subject", "")
- self.body = data.get("body", "") or ""
- self.html_body = data.get("html", "") or ""
- class EMail:
- def __init__(self, proxies: Any = None):
- self.s = requests.Session(proxies=proxies, impersonate="chrome")
- self.s.headers.update({
- "User-Agent": UA,
- "Accept": "application/json",
- "Content-Type": "application/json",
- })
- r = self.s.post("https://api.tempmail.lol/v2/inbox/create", json={}, timeout=15)
- r.raise_for_status()
- data = r.json()
- self.address = data["address"]
- self.token = data["token"]
- print(f"[+] 生成邮箱: {self.address} (TempMail.lol)")
- print("[*] 自动轮询已启动(token 已保存)")
- def _get_messages(self) -> List[Dict[str, Any]]:
- r = self.s.get(f"https://api.tempmail.lol/v2/inbox?token={self.token}", timeout=15)
- r.raise_for_status()
- return r.json().get("emails", [])
- def get_email_and_code_fetcher(proxies: Any = None, provider: str = "auto"):
- provider = (provider or "auto").strip().lower()
- if provider not in {"auto", "gptmail", "tempmail"}:
- raise ValueError(f"不支持的邮箱提供商: {provider}")
- def _build_tempmail_bundle():
- inbox = EMail(proxies)
- email = inbox.address
- def _extract_all_codes() -> List[str]:
- results: List[str] = []
- try:
- msgs = inbox._get_messages()
- for msg_data in msgs:
- msg = Message(msg_data)
- body = msg.body or msg.html_body or msg.subject or ""
- results.extend(re.findall(r"\b(\d{6})\b", body))
- except Exception:
- pass
- return results
- def fetch_code(timeout_sec: int = 180, poll: float = 6.0, exclude_codes: Optional[List[str]] = None) -> str | None:
- exclude = set(exclude_codes or [])
- start = time.monotonic()
- attempt = 0
- while time.monotonic() - start < timeout_sec:
- attempt += 1
- try:
- msgs = inbox._get_messages()
- print(f"[otp][tempmail] 轮询 #{attempt}, 收到 {len(msgs)} 封邮件, 目标: {email}")
- for msg_data in msgs:
- msg = Message(msg_data)
- body = msg.body or msg.html_body or msg.subject or ""
- for code in re.findall(r"\b(\d{6})\b", body):
- if code not in exclude:
- return code
- except Exception:
- pass
- time.sleep(poll)
- return None
- return email, _gen_password(), fetch_code, _extract_all_codes, "tempmail"
- def _build_gptmail_bundle():
- client = GPTMailClient(proxies)
- email = client.generate_email()
- def _extract_all_codes() -> List[str]:
- regex = r"(?<!\d)(\d{6})(?!\d)"
- results: List[str] = []
- try:
- summaries = client.list_emails(email)
- for s in summaries:
- body = " ".join([
- str(s.get("subject", "") or ""),
- str(s.get("text", "") or ""),
- str(s.get("body", "") or ""),
- str(s.get("html", "") or ""),
- json.dumps(s, ensure_ascii=False),
- ])
- results.extend(re.findall(regex, body))
- except Exception:
- pass
- return results
- def fetch_code(timeout_sec: int = 180, poll: float = 6.0, exclude_codes: Optional[List[str]] = None) -> str | None:
- exclude = set(exclude_codes or [])
- start = time.monotonic()
- attempt = 0
- while time.monotonic() - start < timeout_sec:
- attempt += 1
- try:
- summaries = client.list_emails(email)
- print(f"[otp][gptmail] 轮询 #{attempt}, 收到 {len(summaries)} 封邮件, 目标: {email}")
- for s in summaries:
- body = " ".join([
- str(s.get("subject", "") or ""),
- str(s.get("text", "") or ""),
- str(s.get("body", "") or ""),
- str(s.get("html", "") or ""),
- json.dumps(s, ensure_ascii=False),
- ])
- for code in re.findall(r"(?<!\d)(\d{6})(?!\d)", body):
- if code not in exclude:
- return code
- except Exception:
- pass
- time.sleep(poll)
- return None
- return email, _gen_password(), fetch_code, _extract_all_codes, "gptmail"
- if provider == "tempmail":
- return _build_tempmail_bundle()
- if provider == "gptmail":
- return _build_gptmail_bundle()
- try:
- return _build_tempmail_bundle()
- except Exception as e:
- print(f"[邮箱] TempMail.lol 初始化失败,回退 GPTMail: {e}")
- return _build_gptmail_bundle()
- # ========== OAuth 核心逻辑 (对齐原版的完美重定向流) ==========
- def _gen_password() -> str:
- alphabet = string.ascii_letters + string.digits
- special = "!@#$%^&*.-"
- base = [random.choice(string.ascii_lowercase), random.choice(string.ascii_uppercase),
- random.choice(string.digits), random.choice(special)]
- base += [random.choice(alphabet + special) for _ in range(12)]
- random.shuffle(base)
- return "".join(base)
- def _random_name() -> str:
- return ''.join(random.choice(string.ascii_lowercase) for _ in range(7)).capitalize()
- def _random_birthdate() -> str:
- start = datetime(1975, 1, 1); end = datetime(1999, 12, 31)
- d = start + timedelta(days=random.randrange((end - start).days + 1))
- return d.strftime('%Y-%m-%d')
- def _b64url_no_pad(raw: bytes) -> str:
- return base64.urlsafe_b64encode(raw).decode("ascii").rstrip("=")
- def _sha256_b64url_no_pad(s: str) -> str:
- return _b64url_no_pad(hashlib.sha256(s.encode("ascii")).digest())
- def _pkce_verifier() -> str:
- return secrets.token_urlsafe(64)
- def _parse_callback_url(callback_url: str) -> Dict[str, Any]:
- candidate = (callback_url or "").strip()
- if not candidate:
- return {"code": "", "state": "", "error": "", "error_description": ""}
- if "://" not in candidate:
- if candidate.startswith("?"):
- candidate = f"http://localhost{candidate}"
- elif any(ch in candidate for ch in "/?#") or ":" in candidate:
- candidate = f"http://{candidate}"
- elif "=" in candidate:
- candidate = f"http://localhost/?{candidate}"
- parsed = urllib.parse.urlparse(candidate)
- query = urllib.parse.parse_qs(parsed.query, keep_blank_values=True)
- fragment = urllib.parse.parse_qs(parsed.fragment, keep_blank_values=True)
- for key, values in fragment.items():
- if key not in query or not query[key] or not (query[key][0] or "").strip():
- query[key] = values
- def get1(k: str) -> str:
- return (query.get(k, [""])[0] or "").strip()
- code = get1("code")
- state = get1("state")
- error = get1("error")
- error_description = get1("error_description")
- if code and not state and "#" in code:
- code, state = code.split("#", 1)
- if not error and error_description:
- error, error_description = error_description, ""
- return {"code": code, "state": state, "error": error, "error_description": error_description}
- def _decode_jwt_segment(seg: str) -> Dict[str, Any]:
- try:
- pad = "=" * ((4 - (len(seg) % 4)) % 4)
- return json.loads(base64.urlsafe_b64decode((seg + pad).encode("ascii")).decode("utf-8"))
- except Exception:
- return {}
- def _jwt_claims_no_verify(token: str) -> Dict[str, Any]:
- if not token or token.count(".") < 2:
- return {}
- return _decode_jwt_segment(token.split(".")[1])
- def _post_form(url: str, data: Dict[str, str], timeout: int = 30) -> Dict[str, Any]:
- body = urllib.parse.urlencode(data).encode("utf-8")
- req = urllib.request.Request(
- url,
- data=body,
- method="POST",
- headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"},
- )
- try:
- with urllib.request.urlopen(req, timeout=timeout) as resp:
- raw = resp.read()
- if resp.status != 200:
- raise RuntimeError(f"Token 交换失败: {resp.status}: {raw.decode('utf-8', 'replace')}")
- return json.loads(raw.decode("utf-8"))
- except urllib.error.HTTPError as exc:
- raw = exc.read()
- raise RuntimeError(f"Token 交换失败: {exc.code}: {raw.decode('utf-8', 'replace')}") from exc
- def _to_int(v: Any) -> int:
- try:
- return int(v)
- except (TypeError, ValueError):
- return 0
- def _build_sentinel_payload(session, did: str, flow: str) -> str:
- req_body = json.dumps({"p": "", "id": did, "flow": flow})
- resp = session.post(
- "https://sentinel.openai.com/backend-api/sentinel/req",
- headers={
- "origin": "https://sentinel.openai.com",
- "referer": "https://sentinel.openai.com/backend-api/sentinel/frame.html?sv=20260219f9f6",
- "content-type": "text/plain;charset=UTF-8",
- },
- data=req_body,
- timeout=15,
- )
- if resp.status_code != 200:
- raise RuntimeError(f"Sentinel 验证失败: {resp.status_code}: {resp.text[:200]}")
- token = (resp.json() or {}).get("token", "")
- return json.dumps({"p": "", "t": "", "c": token, "id": did, "flow": flow})
- @dataclass(frozen=True)
- class OAuthStart:
- auth_url: str; state: str; code_verifier: str; redirect_uri: str
- def generate_oauth_url(redirect_uri: str = DEFAULT_REDIRECT_URI) -> OAuthStart:
- state = secrets.token_urlsafe(16)
- verifier = _pkce_verifier()
- challenge = _sha256_b64url_no_pad(verifier)
- params = {
- "client_id": CLIENT_ID,
- "response_type": "code",
- "redirect_uri": redirect_uri,
- "scope": DEFAULT_SCOPE,
- "state": state,
- "code_challenge": challenge,
- "code_challenge_method": "S256",
- "prompt": "login",
- "id_token_add_organizations": "true",
- "codex_cli_simplified_flow": "true",
- }
- return OAuthStart(f"{AUTH_URL}?{urllib.parse.urlencode(params)}", state, verifier, redirect_uri)
- def fetch_sentinel_token(flow: str, did: str, proxies: Any = None) -> Optional[str]:
- try:
- session = requests.Session(proxies=proxies, impersonate="chrome")
- payload = _build_sentinel_payload(session, did, flow)
- return (json.loads(payload) or {}).get("c")
- except Exception:
- return None
- def submit_callback_url(callback_url: str, expected_state: str, code_verifier: str, redirect_uri: str, session=None) -> str:
- cb = _parse_callback_url(callback_url)
- if cb.get("error"):
- raise RuntimeError(f"OAuth 错误: {cb['error']}: {cb.get('error_description', '')}".strip())
- if not cb.get("code"):
- raise ValueError("Callback URL 缺少 ?code=")
- if not cb.get("state"):
- raise ValueError("Callback URL 缺少 ?state=")
- if cb.get("state") != expected_state:
- raise ValueError("State 校验不匹配")
- token_data = {
- "grant_type": "authorization_code",
- "client_id": CLIENT_ID,
- "code": cb["code"],
- "redirect_uri": redirect_uri,
- "code_verifier": code_verifier,
- }
- if session is not None:
- resp = session.post(
- TOKEN_URL,
- data=token_data,
- headers={"Content-Type": "application/x-www-form-urlencoded", "Accept": "application/json"},
- timeout=30,
- )
- if resp.status_code != 200:
- raise RuntimeError(f"Token 交换失败: {resp.status_code}: {resp.text[:200]}")
- token_resp = resp.json()
- else:
- token_resp = _post_form(TOKEN_URL, token_data)
- access_token = str(token_resp.get("access_token") or "").strip()
- refresh_token = str(token_resp.get("refresh_token") or "").strip()
- id_token = str(token_resp.get("id_token") or "").strip()
- expires_in = _to_int(token_resp.get("expires_in"))
- claims = _jwt_claims_no_verify(id_token)
- auth_claims = claims.get("https://api.openai.com/auth") or {}
- now = int(time.time())
- config = {
- "id_token": id_token,
- "access_token": access_token,
- "refresh_token": refresh_token,
- "account_id": str(auth_claims.get("chatgpt_account_id") or "").strip(),
- "last_refresh": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now)),
- "email": str(claims.get("email") or "").strip(),
- "type": "codex",
- "expired": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(now + max(expires_in, 0))),
- }
- return json.dumps(config, ensure_ascii=False, indent=2)
- # ========== 轻量版 CPA 维护实现(内嵌,不依赖项目包) ==========
- DEFAULT_MGMT_UA = "codex_cli_rs/0.76.0 (Debian 13.0.0; x86_64) WindowsTerminal"
- def _mgmt_headers(token: str) -> dict:
- clean = str(token or "").strip()
- if clean and not clean.lower().startswith("bearer "):
- clean = f"Bearer {clean}"
- return {"Authorization": clean, "Accept": "application/json"}
- def _join_mgmt_url(base_url: str, path: str) -> str:
- base = (base_url or "").rstrip("/")
- suffix = path if path.startswith("/") else f"/{path}"
- if base.endswith("/v0"):
- return f"{base}{suffix}"
- return f"{base}/v0{suffix}"
- def _safe_json(text: str):
- try:
- return json.loads(text)
- except Exception:
- return {}
- def _extract_account_id(item: dict):
- for key in ("chatgpt_account_id", "chatgptAccountId", "account_id", "accountId"):
- val = item.get(key)
- if val:
- return str(val)
- return None
- def _get_item_type(item: dict) -> str:
- return str(item.get("type") or item.get("typo") or "")
- class MiniPoolMaintainer:
- def __init__(self, base_url: str, token: str, target_type: str = "codex", used_percent_threshold: int = 95, user_agent: str = DEFAULT_MGMT_UA):
- self.base_url = (base_url or "").rstrip("/")
- self.token = token
- self.target_type = target_type
- self.used_percent_threshold = used_percent_threshold
- self.user_agent = user_agent
- def upload_token(self, filename: str, token_data: dict, proxy: str = "") -> bool:
- if not self.base_url or not self.token:
- return False
- content = json.dumps(token_data, ensure_ascii=False).encode("utf-8")
- files = {"file": (filename, content, "application/json")}
- headers = {"Authorization": f"Bearer {self.token}"}
- proxies = {"http": proxy, "https": proxy} if proxy else None
- for attempt in range(3):
- try:
- resp = py_requests.post(_join_mgmt_url(self.base_url, "/management/auth-files"), files=files, headers=headers, timeout=30, verify=False, proxies=proxies)
- if resp.status_code in (200, 201, 204):
- return True
- except Exception:
- pass
- if attempt < 2:
- time.sleep(2 ** attempt)
- return False
- def fetch_auth_files(self, timeout: int = 15):
- resp = py_requests.get(_join_mgmt_url(self.base_url, "/management/auth-files"), headers=_mgmt_headers(self.token), timeout=timeout)
- resp.raise_for_status()
- data = resp.json()
- return (data.get("files") if isinstance(data, dict) else []) or []
- async def probe_and_clean_async(self, workers: int = 20, timeout: int = 10, retries: int = 1):
- if aiohttp is None:
- raise RuntimeError("需要安装 aiohttp: pip install aiohttp")
- files = self.fetch_auth_files(timeout)
- candidates = [f for f in files if _get_item_type(f).lower() == self.target_type.lower()]
- if not candidates:
- return {"total": len(files), "candidates": 0, "invalid_count": 0, "deleted_ok": 0, "deleted_fail": 0}
- semaphore = asyncio.Semaphore(max(1, workers))
- connector = aiohttp.TCPConnector(limit=max(1, workers))
- client_timeout = aiohttp.ClientTimeout(total=max(1, timeout))
- async def probe_one(session, item):
- auth_index = item.get("auth_index")
- name = item.get("name") or item.get("id")
- res = {"name": name, "auth_index": auth_index, "invalid_401": False, "invalid_used_percent": False, "used_percent": None}
- if not auth_index:
- res["invalid_401"] = False
- return res
- account_id = _extract_account_id(item)
- header = {"Authorization": "Bearer $TOKEN$", "Content-Type": "application/json", "User-Agent": self.user_agent}
- if account_id:
- header["Chatgpt-Account-Id"] = account_id
- payload = {"authIndex": auth_index, "method": "GET", "url": "https://chatgpt.com/backend-api/wham/usage", "header": header}
- for attempt in range(retries + 1):
- try:
- async with semaphore:
- async with session.post(_join_mgmt_url(self.base_url, "/management/api-call"), headers={**_mgmt_headers(self.token), "Content-Type": "application/json"}, json=payload, timeout=timeout) as resp:
- text = await resp.text()
- if resp.status >= 400:
- raise RuntimeError(f"HTTP {resp.status}: {text[:200]}")
- data = _safe_json(text)
- sc = data.get("status_code")
- res["invalid_401"] = sc == 401
- if sc == 200:
- body = _safe_json(data.get("body", ""))
- used_pct = (body.get("rate_limit", {}).get("primary_window", {}).get("used_percent"))
- if used_pct is not None:
- res["used_percent"] = used_pct
- res["invalid_used_percent"] = used_pct >= self.used_percent_threshold
- return res
- except Exception as e:
- if attempt >= retries:
- res["error"] = str(e)
- return res
- return res
- async def delete_one(session, name: str):
- if not name:
- return False
- from urllib.parse import quote
- encoded = quote(name, safe="")
- try:
- async with semaphore:
- async with session.delete(f"{_join_mgmt_url(self.base_url, '/management/auth-files')}?name={encoded}", headers=_mgmt_headers(self.token), timeout=timeout) as resp:
- text = await resp.text()
- data = _safe_json(text)
- return resp.status == 200 and data.get("status") == "ok"
- except Exception:
- return False
- invalid_list = []
- async with aiohttp.ClientSession(connector=connector, timeout=client_timeout, trust_env=True) as session:
- tasks = [asyncio.create_task(probe_one(session, item)) for item in candidates]
- for task in asyncio.as_completed(tasks):
- r = await task
- if r.get("invalid_401") or r.get("invalid_used_percent"):
- invalid_list.append(r)
- delete_tasks = [asyncio.create_task(delete_one(session, r.get("name"))) for r in invalid_list if r.get("name")]
- deleted_ok = 0
- deleted_fail = 0
- for task in asyncio.as_completed(delete_tasks):
- if await task:
- deleted_ok += 1
- else:
- deleted_fail += 1
- return {
- "total": len(files),
- "candidates": len(candidates),
- "invalid_count": len(invalid_list),
- "deleted_ok": deleted_ok,
- "deleted_fail": deleted_fail,
- }
- def probe_and_clean_sync(self, workers: int = 20, timeout: int = 10, retries: int = 1):
- return asyncio.run(self.probe_and_clean_async(workers, timeout, retries))
- def _build_cpa_maintainer(args):
- base_url = (args.cpa_base_url or os.getenv("CPA_BASE_URL") or "").strip()
- token = (args.cpa_token or os.getenv("CPA_TOKEN") or "").strip()
- if not base_url or not token:
- print("[CPA] 未提供 cpa_base_url / cpa_token,跳过 CPA 上传/清理")
- return None
- try:
- return MiniPoolMaintainer(
- base_url,
- token,
- target_type="codex",
- used_percent_threshold=args.cpa_used_threshold,
- user_agent=DEFAULT_MGMT_UA,
- )
- except Exception as e:
- print(f"[CPA] 创建维护器失败: {e}")
- return None
- def _upload_token_to_cpa(pm, token_json: str, email: str, proxy: str = "") -> bool:
- if not pm:
- return False
- try:
- data = json.loads(token_json)
- except Exception as e:
- print(f"[CPA] 解析 token_json 失败: {e}")
- return False
- fname_email = email.replace("@", "_")
- filename = f"token_{fname_email}_{int(time.time())}.json"
- ok = pm.upload_token(filename=filename, token_data=data, proxy=proxy or "")
- if ok:
- print(f"[CPA] 已上传 {filename} 到 CPA")
- else:
- print("[CPA] 上传失败")
- return ok
- def _clean_invalid_in_cpa(pm, args):
- if not pm:
- return None
- try:
- res = pm.probe_and_clean_sync(
- workers=max(1, args.cpa_workers),
- timeout=max(5, args.cpa_timeout),
- retries=max(0, args.cpa_retries),
- )
- print(
- f"[CPA] 清理完成: total={res.get('total')} candidates={res.get('candidates')} "
- f"invalid={res.get('invalid_count')} deleted_ok={res.get('deleted_ok')} deleted_fail={res.get('deleted_fail')}"
- )
- return res
- except Exception as e:
- print(f"[CPA] 清理失败: {e}")
- return None
- def _count_valid_cpa_tokens(pm, args):
- if not pm:
- return 0
- try:
- files = pm.fetch_auth_files(timeout=max(5, args.cpa_timeout))
- target = pm.target_type.lower()
- valid = [f for f in files if _get_item_type(f).lower() == target]
- return len(valid)
- except Exception as e:
- print(f"[CPA] 统计 token 失败: {e}")
- return 0
- # 账号行清理:上传成功且开启 prune_local 后使用
- # 安全处理:文件不存在直接返回,写入保持末尾换行便于追加
- def _remove_account_entry(accounts_path: Path, email: str, real_pwd: str):
- if not accounts_path.exists():
- return
- try:
- lines = accounts_path.read_text(encoding="utf-8").splitlines()
- target = f"{email}----{real_pwd}"
- kept = [ln for ln in lines if ln.strip() != target]
- accounts_path.write_text("\n".join(kept) + ("\n" if kept else ""), encoding="utf-8")
- print(f"[本地清理] 已从 accounts.txt 移除: {email}")
- except Exception as e:
- print(f"[本地清理] 移除账号行失败: {e}")
- # ========== 主注册流程 (恢复详细日志与异常捕获) ==========
- def run(proxy: Optional[str], mail_provider: str = "auto"):
- proxies = {"http": proxy, "https": proxy} if proxy else None
- s = requests.Session(proxies=proxies, impersonate="chrome")
- s.headers.update({
- "user-agent": UA,
- "accept": "application/json, text/plain, */*",
- })
- print(f"\n{'='*20} 开启注册流程 {'='*20}")
- try:
- print(f"[步骤1] 正在初始化临时邮箱(provider={mail_provider})...")
- email, password, code_fetcher, extract_all_codes, actual_mail_provider = get_email_and_code_fetcher(proxies, provider=mail_provider)
- print(f"[*] 当前邮箱提供商: {actual_mail_provider}")
- if not email:
- print("[失败] 未能获取邮箱")
- return None
- print(f"[成功] 邮箱: {email} | 临时密码: {password}")
- print("[步骤2] 访问 OpenAI 授权页获取 Device ID...")
- oauth = generate_oauth_url()
- auth_page = s.get(oauth.auth_url, timeout=15)
- did = s.cookies.get("oai-did")
- if not did:
- print("[失败] 未能从 Cookie 获取 oai-did")
- return None
- print(f"[成功] Device ID: {did}")
- print("[步骤3] 获取 Sentinel 载荷并提交注册邮箱...")
- try:
- authorize_continue_sentinel = _build_sentinel_payload(s, did, "authorize_continue")
- except Exception as e:
- print(f"[失败] 获取 authorize_continue Sentinel 失败: {e}")
- return None
- continue_url = ""
- try:
- auth_json = auth_page.json() if hasattr(auth_page, "json") else {}
- continue_url = str((auth_json or {}).get("continue_url") or "").strip()
- except Exception:
- continue_url = ""
- if continue_url:
- try:
- s.get(continue_url, timeout=15)
- except Exception:
- pass
- signup_res = s.post(
- "https://auth.openai.com/api/accounts/authorize/continue",
- headers={
- "referer": "https://auth.openai.com/create-account",
- "accept": "application/json",
- "content-type": "application/json",
- "openai-sentinel-token": authorize_continue_sentinel,
- },
- data=json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "signup"}),
- timeout=15,
- )
- print(f"[日志] 邮箱提交状态: {signup_res.status_code}")
- if signup_res.status_code != 200:
- print(f"[失败] 邮箱提交失败: {signup_res.text[:200]}")
- return None
- print("[步骤4] 设置账户密码...")
- pwd_res = s.post(
- "https://auth.openai.com/api/accounts/user/register",
- headers={
- "referer": "https://auth.openai.com/create-account/password",
- "accept": "application/json",
- "content-type": "application/json",
- },
- json={"password": password, "username": email},
- timeout=15,
- )
- print(f"[日志] 密码设置状态: {pwd_res.status_code}")
- if pwd_res.status_code != 200:
- print(f"[失败] 密码设置失败: {pwd_res.text[:200]}")
- return None
- print("[步骤5] 触发 OpenAI 发送验证邮件...")
- s.get("https://auth.openai.com/create-account/password", timeout=15)
- otp_send_res = s.get(
- "https://auth.openai.com/api/accounts/email-otp/send",
- headers={"referer": "https://auth.openai.com/create-account/password", "accept": "application/json"},
- timeout=15,
- )
- print(f"[日志] 发送指令状态: {otp_send_res.status_code}")
- if otp_send_res.status_code != 200:
- print(f"[失败] 发送验证码失败: {otp_send_res.text[:200]}")
- return None
- print("[步骤6] 等待邮箱接收 6 位验证码...")
- code = code_fetcher()
- if not code:
- print("[失败] 邮箱长时间未收到验证码")
- return None
- print(f"[成功] 捕获验证码: {code}")
- print("[步骤7] 提交验证码至 OpenAI...")
- val_res = s.post(
- "https://auth.openai.com/api/accounts/email-otp/validate",
- headers={
- "referer": "https://auth.openai.com/email-verification",
- "accept": "application/json",
- "content-type": "application/json",
- },
- json={"code": code},
- timeout=15,
- )
- print(f"[日志] 验证码校验状态: {val_res.status_code}")
- if val_res.status_code != 200:
- print(f"[失败] 验证码校验失败: {val_res.text[:200]}")
- return None
- print("[步骤8] 完善账户基本信息...")
- try:
- create_account_sentinel = _build_sentinel_payload(s, did, "authorize_continue")
- except Exception as e:
- print(f"[失败] 获取 create_account Sentinel 失败: {e}")
- return None
- acc_res = s.post(
- "https://auth.openai.com/api/accounts/create_account",
- headers={
- "referer": "https://auth.openai.com/about-you",
- "accept": "application/json",
- "content-type": "application/json",
- "openai-sentinel-token": create_account_sentinel,
- },
- data=json.dumps({"name": _random_name(), "birthdate": _random_birthdate()}),
- timeout=15,
- )
- print(f"[日志] 账户创建状态: {acc_res.status_code}")
- if acc_res.status_code != 200:
- print(f"[失败] 账户创建失败: {acc_res.text[:200]}")
- return None
- print("[步骤9] 注册完成,重新走登录流程获取 Workspace / Token...")
- first_code = code
- for login_attempt in range(3):
- try:
- print(f"[*] 正在通过登录流程获取 Token...{f' (重试 {login_attempt}/3)' if login_attempt else ''}")
- s2 = requests.Session(proxies=proxies, impersonate="chrome")
- oauth2 = generate_oauth_url()
- s2.get(oauth2.auth_url, timeout=15)
- did2 = s2.cookies.get("oai-did")
- if not did2:
- print("[失败] 登录会话未能获取 oai-did")
- continue
- lc = s2.post(
- "https://auth.openai.com/api/accounts/authorize/continue",
- headers={
- "referer": "https://auth.openai.com/log-in",
- "accept": "application/json",
- "content-type": "application/json",
- "openai-sentinel-token": _build_sentinel_payload(s2, did2, "authorize_continue"),
- },
- data=json.dumps({"username": {"value": email, "kind": "email"}, "screen_hint": "login"}),
- timeout=15,
- )
- print(f"[日志] 登录邮箱提交状态: {lc.status_code}")
- if lc.status_code != 200:
- print(f"[失败] 登录邮箱提交失败: {lc.text[:200]}")
- continue
- s2.get(str((lc.json() or {}).get("continue_url") or ""), timeout=15)
- pw = s2.post(
- "https://auth.openai.com/api/accounts/password/verify",
- headers={
- "referer": "https://auth.openai.com/log-in/password",
- "accept": "application/json",
- "content-type": "application/json",
- "openai-sentinel-token": _build_sentinel_payload(s2, did2, "authorize_continue"),
- },
- json={"password": password},
- timeout=15,
- )
- print(f"[日志] 登录密码验证状态: {pw.status_code}")
- if pw.status_code != 200:
- print(f"[失败] 登录密码验证失败: {pw.text[:200]}")
- continue
- existing_codes = list(extract_all_codes())
- s2.get(
- "https://auth.openai.com/email-verification",
- headers={"referer": "https://auth.openai.com/log-in/password"},
- timeout=15,
- )
- print("[*] 正在等待登录 OTP...")
- time.sleep(2)
- otp2 = None
- baseline_codes = set(existing_codes)
- baseline_codes.add(first_code)
- for _ in range(40):
- all_codes = extract_all_codes()
- new_codes = [c for c in all_codes if c not in baseline_codes]
- if new_codes:
- otp2 = new_codes[-1]
- break
- time.sleep(2)
- if not otp2:
- print("[失败] 未收到登录 OTP")
- continue
- print(f"[成功] 捕获登录 OTP: {otp2}")
- val2 = s2.post(
- "https://auth.openai.com/api/accounts/email-otp/validate",
- headers={
- "referer": "https://auth.openai.com/email-verification",
- "accept": "application/json",
- "content-type": "application/json",
- },
- json={"code": otp2},
- timeout=15,
- )
- print(f"[日志] 登录 OTP 校验状态: {val2.status_code}")
- if val2.status_code != 200:
- print(f"[失败] 登录 OTP 校验失败: {val2.text[:200]}")
- continue
- val2_data = val2.json() or {}
- print("[成功] 登录 OTP 验证成功")
- consent_url = str(val2_data.get("continue_url") or "").strip()
- if consent_url:
- s2.get(consent_url, timeout=15)
- auth_cookie = s2.cookies.get("oai-client-auth-session", domain=".auth.openai.com") or s2.cookies.get("oai-client-auth-session")
- if not auth_cookie:
- print("[失败] 登录后未能获取 oai-client-auth-session")
- continue
- auth_json = _decode_jwt_segment(auth_cookie.split(".")[0])
- if "workspaces" not in auth_json or not auth_json["workspaces"]:
- print(f"[失败] Cookie 中无 workspaces: {list(auth_json.keys())}")
- continue
- workspace_id = auth_json["workspaces"][0]["id"]
- print(f"[成功] Workspace ID: {workspace_id}")
- select_resp = s2.post(
- "https://auth.openai.com/api/accounts/workspace/select",
- headers={
- "referer": consent_url,
- "accept": "application/json",
- "content-type": "application/json",
- },
- json={"workspace_id": workspace_id},
- timeout=15,
- )
- print(f"[日志] Workspace 选择状态: {select_resp.status_code}")
- if select_resp.status_code != 200:
- print(f"[失败] Workspace 选择失败: {select_resp.text[:200]}")
- continue
- sel_data = select_resp.json() or {}
- if sel_data.get("page", {}).get("type", "") == "organization_select":
- orgs = sel_data.get("page", {}).get("payload", {}).get("data", {}).get("orgs", [])
- if orgs:
- org_sel = s2.post(
- "https://auth.openai.com/api/accounts/organization/select",
- headers={"accept": "application/json", "content-type": "application/json"},
- json={
- "org_id": orgs[0].get("id", ""),
- "project_id": orgs[0].get("default_project_id", ""),
- },
- timeout=15,
- )
- print(f"[日志] Organization 选择状态: {org_sel.status_code}")
- if org_sel.status_code != 200:
- print(f"[失败] Organization 选择失败: {org_sel.text[:200]}")
- continue
- sel_data = org_sel.json() or {}
- if "continue_url" not in sel_data:
- print(f"[失败] 未能获取 continue_url: {json.dumps(sel_data, ensure_ascii=False)[:500]}")
- continue
- print("[步骤10] 跟踪重定向并换取 Token...")
- r = s2.get(str(sel_data["continue_url"]), allow_redirects=False, timeout=15)
- cbk = None
- for i in range(20):
- loc = r.headers.get("Location", "")
- print(f" -> 重定向 #{i+1} 状态: {r.status_code} | 下一跳: {loc[:80] if loc else '无'}")
- if loc.startswith("http://localhost"):
- cbk = loc
- break
- if r.status_code not in (301, 302, 303) or not loc:
- break
- r = s2.get(loc, allow_redirects=False, timeout=15)
- if not cbk:
- print("[失败] 未能获取到 Callback URL")
- continue
- token_json = submit_callback_url(
- callback_url=cbk,
- expected_state=oauth2.state,
- code_verifier=oauth2.code_verifier,
- redirect_uri=oauth2.redirect_uri,
- session=s2,
- )
- print("[大功告成] 账号注册完毕!")
- return token_json, email, password
- except Exception as e:
- print(f"[失败] 登录补全流程异常: {e}")
- time.sleep(2)
- continue
- print("[失败] 登录补全流程 3 次均未完成。")
- return None
- except Exception as e:
- print(f"[致命错误] 流程崩溃: {e}")
- return None
- # ========== Main 保持原版完整结构与输出格式 ==========
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument("--proxy", help="代理地址")
- parser.add_argument("--mail-provider", choices=["auto", "gptmail", "tempmail"], default="auto", help="临时邮箱提供商:auto/gptmail/tempmail")
- parser.add_argument("--once", action="store_true", help="只运行一次")
- parser.add_argument("--sleep-min", type=int, default=5, help="最小间隔(秒)")
- parser.add_argument("--sleep-max", type=int, default=30, help="最大间隔(秒)")
- parser.add_argument("--cpa-base-url", default=os.getenv("CPA_BASE_URL"), help="CPA 基础地址")
- parser.add_argument("--cpa-token", default=os.getenv("CPA_TOKEN"), help="CPA 管理 token (Bearer)")
- parser.add_argument("--cpa-workers", type=int, default=20, help="CPA 清理并发")
- parser.add_argument("--cpa-timeout", type=int, default=12, help="CPA 请求超时")
- parser.add_argument("--cpa-retries", type=int, default=1, help="CPA 清理重试次数")
- parser.add_argument("--cpa-used-threshold", type=int, default=95, help="CPA used_percent 阈值")
- parser.add_argument("--cpa-clean", action="store_true", help="注册后自动清理 CPA 失效账号")
- parser.add_argument("--cpa-upload", action="store_true", help="注册后自动上传 CPA")
- parser.add_argument("--cpa-target-count", type=int, default=300, help="目标 token 数(有效)")
- parser.add_argument("--cpa-prune-local", action="store_true", help="上传成功后删除本地 token 文件与账号行")
- args = parser.parse_args()
- tokens_dir = OUT_DIR / "tokens"
- tokens_dir.mkdir(parents=True, exist_ok=True)
- pm = _build_cpa_maintainer(args)
- count = 0
- while True:
- count += 1
- print(f"\n[{datetime.now().strftime('%H:%M:%S')}] >>> 流程 #{count} <<<")
- if pm:
- if args.cpa_clean:
- _clean_invalid_in_cpa(pm, args)
- current_count = _count_valid_cpa_tokens(pm, args)
- print(f"[CPA] 当前有效 token: {current_count} / {args.cpa_target_count}")
- if current_count >= args.cpa_target_count:
- if args.once:
- break
- wait_time = random.randint(args.sleep_min, args.sleep_max)
- print(f"[*] 随机休息 {wait_time} 秒...")
- time.sleep(wait_time)
- continue
- res = run(args.proxy, args.mail_provider)
- if res:
- token_json, email, real_pwd = res
- print(f"[🎉] 成功! {email} ---- {real_pwd}")
- # 1. 保存账号密码到 tokens/accounts.txt
- with open(tokens_dir / "accounts.txt", "a", encoding="utf-8") as f:
- f.write(f"{email}----{real_pwd}\n")
- # 2. 保存详细 Token JSON
- fname_email = email.replace("@", "_")
- token_file = tokens_dir / f"token_{fname_email}_{int(time.time())}.json"
- token_file.write_text(token_json, encoding="utf-8")
- print(f"[*] Token 文件已保存: {token_file.name}")
- # 3. 上传 CPA(可选)
- upload_ok = False
- if args.cpa_upload:
- upload_ok = _upload_token_to_cpa(pm, token_json, email, proxy=args.proxy or "")
- # 4. 上传成功后按需删除本地文件/账号行
- if upload_ok and args.cpa_prune_local:
- try:
- if token_file.exists():
- token_file.unlink()
- print(f"[本地清理] 已删除 token 文件: {token_file.name}")
- except Exception as e:
- print(f"[本地清理] 删除 token 文件失败: {e}")
- _remove_account_entry(tokens_dir / "accounts.txt", email, real_pwd)
- # 5. 注册后再清理一次(可选)
- if pm and args.cpa_clean:
- _clean_invalid_in_cpa(pm, args)
- else:
- print("[-] 本次注册流程未能完成。")
- if args.once:
- break
- wait_time = random.randint(args.sleep_min, args.sleep_max)
- print(f"[*] 随机休息 {wait_time} 秒...")
- time.sleep(wait_time)
- if __name__ == "__main__":
- main()
|