×

🏬 多店铺淘宝订单统一拉取:TOP API + 分布式任务调度实战(附Python源码)

万邦科技Lex 万邦科技Lex 发表于2026-07-01 11:30:23 浏览20 评论0

抢沙发发表评论

🏬 多店铺淘宝订单统一拉取:TOP API + 分布式任务调度实战(附Python源码)

多淘宝/天猫店铺订单集中同步到一套ERP,核心是每个店铺一个 Seller AccessToken → 统一增量时间窗拉取 → 以 tid 幂等入库 → 用分布式锁/原子标记防多节点重复跑。下面给你可直接用的 Python 实现。

一、多店铺同步架构

┌──────────────────────────────────────────────────────┐
│               调度器 (APScheduler / Celery Beat)      │
│            获取 shop_list → 分布式锁(shop_id+date)   │
└──────────────────────┬───────────────────────────────┘
         ┌─────────────┼─────────────┐
         ▼             ▼             ▼
   Shop_A Token   Shop_B Token   Shop_C Token
         │             │             │
   taobao.trades.sold.get (modified 时间窗)
         │
         ▼
   taobao.trade.fullinfo.get (逐单)
         │
         ▼
   ERP订单表 (UNIQUE(tid, shop_id) 幂等)
⚠️ 每个店铺必须在开放平台分别 OAuth 授权,得到对应 access_token/refresh_token,存 shop_auth表。

二、完整 Python 多店铺同步模块(含分布式文件锁示意)

# top_multi_shop_sync.py
"""
多店铺淘宝订单统一拉取
- 支持 N 个店铺(Seller AccessToken 不同)
- 增量按 modified 时间窗
- 文件锁防同机多进程重复跑(分布式子可以用 Redis SET NX EX)
- 幂等入库用 (tid, shop_id) UNIQUE
依赖: requests apscheduler  (pip install requests apscheduler)
"""
import hashlib
import time
import requests
import sqlite3
import os
import fcntl
from datetime import datetime, timedelta
from typing import Dict, List


# ───────────── TOP Client (内联) ─────────────
class TopClient:
    GW = "https://gw.api.taobao.com/router/rest"

    def __init__(self, ak, ask):
        self.ak, self.ask = ak, ask

    def _sign(self, p: Dict) -> str:
        filt = sorted((k, v) for k, v in p.items()
                       if v is not None and str(v).strip() != '' and k != 'sign')
        qs = ''.join(f"{k}{v}" for k, v in filt)
        return hashlib.md5(f"{self.ask}{qs}{self.ask}".encode()).hexdigest().upper()

    def call(self, method, biz, session):
        p = {"method": method, "app_key": self.ak,
             "timestamp": str(int(time.time() * 1000)),
             "format": "json", "v": "2.0", "sign_method": "md5",
             "session": session}
        p.update(biz)
        p["sign"] = self._sign(p)
        r = requests.post(self.GW, data=p, timeout=15)
        r.raise_for_status()
        d = r.json()
        if "error_response" in d:
            err = d["error_response"]
            raise Exception(f"TOP[{err.get('code')}]:{err.get('msg')} {err.get('sub_msg','')}")
        return d.get(list(d.keys() - {"error_response"})[0], {})


# ───────────── 本地DB (shop_auth + erp_order) ─────────────
def init_db(db="multi_shop_erp.db"):
    conn = sqlite3.connect(db)
    conn.execute("""CREATE TABLE IF NOT EXISTS shop_auth(
        shop_id TEXT PRIMARY KEY,
        shop_name TEXT,
        access_token TEXT,
        refresh_token TEXT,
        expires_at TEXT)""")
    conn.execute("""CREATE TABLE IF NOT EXISTS erp_order(
        tid TEXT, shop_id TEXT,
        status TEXT, payment REAL, buyer_nick TEXT, created TEXT,
        PRIMARY KEY(tid, shop_id))""")
    conn.commit()
    return conn


def load_shops(conn) -> List[Dict]:
    cur = conn.execute("SELECT shop_id,shop_name,access_token FROM shop_auth")
    return [{"shop_id": r[0], "shop_name": r[1], "token": r[2]}
            for r in cur.fetchall()]


def upsert_order(conn, shop_id, trade: Dict):
    conn.execute("""INSERT INTO erp_order(tid,shop_id,status,payment,buyer_nick,created)
        VALUES(?,?,?,?,?,?)
        ON CONFLICT(tid,shop_id) DO UPDATE SET
        status=excluded.status,payment=excluded.payment,
        buyer_nick=excluded.buyer_nick,created=excluded.created""",
        (str(trade["tid"]), shop_id,
         trade.get("status"), float(trade.get("payment") or 0),
         trade.get("buyer_nick",""), trade.get("created","")))
    conn.commit()


# ───────────── 分布式文件锁 (单进程/单机示意) ─────────────
LOCK_FILE = "/tmp/top_multishop_sync.lock"


def acquire_lock():
    fd = open(LOCK_FILE, "w")
    try:
        fcntl.flock(fd, fcntl.LOCK_EX | fcntl.LOCK_NB)  # 非阻塞
        return fd
    except (IOError, BlockingIOError):
        fd.close()
        raise RuntimeError("另一实例正在运行中,退出")


def release_lock(fd):
    fcntl.flock(fd, fcntl.LOCK_UN)
    fd.close()
    try:
        os.remove(LOCK_FILE)
    except OSError:
        pass
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex

# ───────────── 核心同步逻辑 ─────────────
class MultiShopOrderSync:
    def __init__(self, ak, ask, conn):
        self.top = TopClient(ak, ask)
        self.conn = conn

    def sync_one_shop(self, shop: Dict, minutes=30):
        token = shop["token"]
        shop_id = shop["shop_id"]
        now = datetime.now()
        start = (now - timedelta(minutes=minutes)).strftime("%Y-%m-%d %H:%M:%S")
        end = now.strftime("%Y-%m-%d %H:%M:%S")

        # ① 列表
        r = self.top.call("taobao.trades.sold.get", {
            "fields": "tid,status,payment,modified,buyer_nick,created",
            "start_modified": start, "end_modified": end,
            "page_no": 1, "page_size": 40
        }, token)
        tids = [t["tid"] for t in (r.get("trades", []) or [])]

        for tid in tids:
            # ② 明细
            detail = self.top.call("taobao.trade.fullinfo.get", {
                "tid": str(tid),
                "fields": "tid,status,payment,buyer_nick,created"
            }, token).get("trade", {})
            upsert_order(self.conn, shop_id, detail)
            time.sleep(0.15)   # QPS 保护

        print(f"  ✅ 店铺[{shop['shop_name']}] 同步订单: {len(tids)} 笔")

    def sync_all(self, minutes=30):
        shops = load_shops(self.conn)
        if not shops:
            print("⚠️  无店铺配置,请在 shop_auth 表录入 shop_id/shop_name/access_token")
            return
        for s in shops:
            try:
                self.sync_one_shop(s, minutes)
            except Exception as e:
                print(f"  ❌ 店铺[{s['shop_name']}] 失败: {e}")
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex

# ======================= 定时入口 =======================
if __name__ == "__main__":
    APP_KEY = "YOUR_ENTERPRISE_APP_KEY"
    APP_SECRET = "YOUR_APP_SECRET"

    conn = init_db()
    # ★ 首次运行前手动 INSERT shop_auth 或写一小段初始化
    # conn.execute("INSERT OR IGNORE INTO shop_auth VALUES('SHOP001','旗舰店A','SELLER_TOKEN_A','',NULL)")

    syncer = MultiShopOrderSync(APP_KEY, APP_SECRET, conn)

    try:
        lock_fd = acquire_lock()
        print(f"▶ 多店铺订单同步开始 @ {datetime.now().strftime('%H:%M:%S')}")
        syncer.sync_all(minutes=30)
        print("▶ 同步完成")
    except RuntimeError as e:
        print(e)
    finally:
        try:
            release_lock(lock_fd)
        except Exception:
            pass

三、shop_auth 表初始化示例(SQLite)

INSERT OR IGNORE INTO shop_auth(shop_id,shop_name,access_token)
VALUES
('SHOP001','天猫旗舰店A','SELLER_ACCESS_TOKEN_A'),
('SHOP002','淘宝企业店B','SELLER_ACCESS_TOKEN_B');
AccessToken 通过各店铺卖家账号 OAuth2 授权换取(同之前讲过的 oauth.taobao.com/token流程),每个店铺 token 不同

四、分布式调度建议

部署形态
防重手段
单机多进程
文件锁 fcntl.flock(已演示)
多机 / K8s
Redis SET key value NX EX 300 获取锁 → 执行 → DEL;失败跳过
定时触发
APScheduler BlockingScheduler+ 锁 / Celery Beat
断点续跑
记录每个 shop 最后成功 max_modified→ 下次从此时间拉
APScheduler 常驻示例:
from apscheduler.schedulers.blocking import BlockingScheduler
sched = BlockingScheduler()
sched.add_job(lambda: MultiShopOrderSync(APP_KEY,APP_SECRET,conn).sync_all(30),
               'cron', minute='*/5', id='tb_multi_shop_sync')
sched.start()

五、避坑清单

现象
解决
用同一 token 查不同店铺
只能看授权店铺自己订单
每个店铺独立 OAuth → 独立 token
session 传买家 token
403 / 空
必须是卖家 AccessToken
全量翻页不记断点
重启重跑超日额度
last_sync_timeper shop
多节点同时跑
重复插入/重复API调用
Redis NX 锁
淘宝客应用无订单权限
403
创建自用型企业应用申请 taobao.trades.sold.get

六、面试/方案一句话

多店铺淘宝订单统一拉取 = 各店铺分别 OAuth 授权存 Seller AccessToken → 定时增量按 modified时间窗调 taobao.trades.sold.get+ trade.fullinfo.get→ 以 (tid, shop_id)幂等入库 → 分布式 Redis NX 锁防多节点重复触发,签名/QPS 同单店铺,仅 token 按店铺隔离。
需要我补 Redis 分布式锁 Python 示例Token 自动刷新(refresh_token 过期前置换) 吗?


群贤毕至

访客