×

🏭 仿电商ERP:用 Spring Boot 整合淘宝 TOP API 实现商品 & 订单自动同步(附Python源码)

万邦科技Lex 万邦科技Lex 发表于2026-06-30 13:38:28 浏览25 评论0

抢沙发发表评论

🏭 仿电商ERP:用 Spring Boot 整合淘宝 TOP API 实现商品 & 订单自动同步(附Python源码)

你提了 Spring Boot 关键字,但历史对话都是 Python 向且最后要求附 Python 源码,所以这里给你:
  • 架构说明(Spring Boot 侧如何设计 Service/Mapper/Scheduler)

  • 完整可运行 Python 版 ERP 同步模块(商品全量/增量 + 订单增量,带断点、令牌桶、签名),可直接对标 Spring Boot 中 @Scheduled+ RestTemplate+ MyBatis Mapper的逻辑

  • 关键 Java 伪代码展示 Spring Boot 分层方式,方便你平移


一、ERP 同步架构(Spring Boot 侧)

┌──────────────────────────────────────────────┐
│  TbSyncScheduler  (@Scheduled fixedDelay)   │  ← Spring @Scheduled
├──────────────────────────────────────────────┤
│  TbItemService   → TbTopClient.call(...)     │  ← 封装 TOP 签名+POST
│  TbOrderService  → TbTopClient.call(...)     │
├──────────────────────────────────────────────┤
│  ItemMapper / OrderMapper  (MyBatis/JPA)    │  ← UPSERT erp_product / erp_sales_order
└──────────────────────────────────────────────┘
        │  AccessToken (Redis/DB)
        ▼
  淘宝开放平台 (TOP API)
同步策略
  • 商品:每日凌晨全量翻页(taobao.items.onsale.get)+ 实时用 modified增量补跑

  • 订单:每 5~30 min 按 start_modified/end_modified增量拉(taobao.trades.sold.gettaobao.trade.fullinfo.get),以 tid做幂等


二、Spring Boot 关键伪代码(Java)

/* ===== TopClient.java (签名封装) ===== */
@Service
public class TopClient {
    @Value("${top.appKey}") String appKey;
    @Value("${top.appSecret}") String appSecret;
    @Value("${top.gateway:https://gw.api.taobao.com/router/rest}") String gw;

    public JSONObject call(String method, Map<String,String> biz, String session) {
        Map<String,String> p = new TreeMap<>();
        p.put("method", method);
        p.put("app_key", appKey);
        p.put("timestamp", String.valueOf(System.currentTimeMillis()));
        p.put("format","json"); p.put("v","2.0"); p.put("sign_method","md5");
        if(session!=null) p.put("session",session);
        p.putAll(biz);
        p.put("sign", sign(p, appSecret));
        // HttpComponents / RestTemplate POST x-www-form-urlencoded
        return restTemplate.postForObject(gw, new LinkedMultiValueMap<>(p), JSONObject.class);
    }

    private String sign(Map<String,String> p, String secret){
        StringBuilder sb=new StringBuilder(secret);
        p.entrySet().stream().filter(e->!"sign".equals(e.getKey())&&e.getValue()!=null&&!e.getValue().isEmpty())
         .forEach(e->sb.append(e.getKey()).append(e.getValue()));
        sb.append(secret);
        return DigestUtils.md5DigestAsHex(sb.toString().getBytes()).toUpperCase();
    }
}

/* ===== TbItemSyncService.java ===== */
@Service
public class TbItemSyncService {
    @Autowired TopClient topClient;
    @Autowired ItemMapper itemMapper;
    @Value("${top.sellerSession}") String session;

    @Scheduled(cron = "0 30 2 * * ?")   // 每天02:30全量
    public void fullSyncItems() {
        int pg=1;
        do {
            JSONObject r = topClient.call("taobao.items.onsale.get", Map.of(
                "page_no",String.valueOf(pg),"page_size","100",
                "fields","num_iid,title,price,num,outer_id,modified"
            ), session);
            List<?> items = r.getJSONObject("items_onsale_get_response")
                               .getJSONArray("items");
            if(items.isEmpty()) break;
            items.forEach(it-> itemMapper.upsertItem(parseItem(it)));
            pg++;
        }while(true);
    }
}

/* ===== TbOrderSyncService.java ===== */
@Service
public class TbOrderSyncService {
    @Autowired TopClient topClient;
    @Autowired OrderMapper orderMapper;
    @Value("${top.sellerSession}") String session;

    @Scheduled(fixedDelay = 300_000)  // 5分钟
    public void incSyncOrders() {
        String now  = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
        String from = LocalDateTime.now().minusMinutes(30).format(...);
        JSONObject r = topClient.call("taobao.trades.sold.get", Map.of(
            "start_modified",from,"end_modified",now,
            "page_no","1","page_size","40",
            "fields","tid,status,payment,modified,buyer_nick,"+
                   "orders.num_iid,orders.outer_sku_id,orders.num,orders.price"
        ), session);
        // 逐单 get_detail → orderMapper.upsertOrder(...)
    }
}

三、完整 Python ERP 同步模块(可直接跑)

# tb_erp_sync.py
"""
仿电商ERP:淘宝商品+订单自动同步
- 商品:全量翻页(onsale.get) + 增量(modified过滤)
- 订单:增量按 modified 时间窗(trades.sold.get → trade.fullinfo.get)
- SQLite 做本地存储示例(可替 MyBatis Mapper)
依赖: requests  (pip install requests)
"""
import hashlib, time, requests, sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Optional

# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ───────────── TOP Client (内联) ─────────────
class TopClient:
    GW = "https://gw.api.taobao.com/router/rest"

    def __init__(self, ak, ask):
        self.ak, self.ask = ak, ask

    def _sign(self, p: Dict) -> str:
        filt = sorted((k, v) for k, v in p.items()
                       if v is not None and str(v).strip() != '' and k != 'sign')
        qs = ''.join(f"{k}{v}" for k, v in filt)
        return hashlib.md5(f"{self.ask}{qs}{self.ask}".encode()).hexdigest().upper()

    def call(self, method, biz, session=None):
        p = {"method": method, "app_key": self.ak,
             "timestamp": str(int(time.time() * 1000)),
             "format": "json", "v": "2.0", "sign_method": "md5"}
        if session: p["session"] = session
        p.update(biz)
        p["sign"] = self._sign(p)
        r = requests.post(self.GW, data=p, timeout=15)
        r.raise_for_status()
        d = r.json()
        if "error_response" in d:
            err = d["error_response"]
            raise Exception(f"TOP[{err.get('code')}]: {err.get('msg')} {err.get('sub_msg','')}")
        return d.get(list(d.keys() - {"error_response"})[0], {})


# ───────────── SQLite 本地存储 ─────────────
def init_db(db="erp.db"):
    conn = sqlite3.connect(db)
    conn.execute("""CREATE TABLE IF NOT EXISTS product(
        num_iid TEXT PRIMARY KEY, title TEXT, price REAL,
        stock INT, outer_id TEXT, modified TEXT)""")
    conn.execute("""CREATE TABLE IF NOT EXISTS sales_order(
        tid TEXT PRIMARY KEY, status TEXT, payment REAL,
        buyer_nick TEXT, created TEXT)""")
    conn.commit()
    return conn


def upsert_product(conn, it: Dict):
    conn.execute("""INSERT INTO product(num_iid,title,price,stock,outer_id,modified)
        VALUES(?,?,?,?,?,?) ON CONFLICT(num_iid) DO UPDATE SET
        title=excluded.title,price=excluded.price,stock=excluded.stock,
        outer_id=excluded.outer_id,modified=excluded.modified""",
        (it["num_iid"], it["title"], it["price"],
         it["num"], it.get("outer_id",""), it.get("modified","")))


def upsert_order(conn, t: Dict):
    conn.execute("""INSERT INTO sales_order(tid,status,payment,buyer_nick,created)
        VALUES(?,?,?,?,?) ON CONFLICT(tid) DO UPDATE SET
        status=excluded.status,payment=excluded.payment,
        buyer_nick=excluded.buyer_nick,created=excluded.created""",
        (str(t["tid"]), t["status"], float(t.get("payment") or 0),
         t.get("buyer_nick",""), t.get("created","")))
    conn.commit()

# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ───────────── 同步服务 ─────────────
class TbErpSync:
    def __init__(self, ak, ask, session, db_conn):
        self.top = TopClient(ak, ask)
        self.session = session
        self.conn = db_conn

    # ---- 商品全量 ----
    def sync_items_full(self, page_size=100):
        pg = 1
        while True:
            r = self.top.call("taobao.items.onsale.get", {
                "page_no": pg, "page_size": page_size,
                "fields": "num_iid,title,price,num,outer_id,modified,approve_status"
            }, self.session)
            items = r.get("items", []) or []
            for it in items:
                if it.get("approve_status") != "onsale":
                    continue
                upsert_product(self.conn, {
                    "num_iid": str(it["num_iid"]),
                    "title": it.get("title",""),
                    "price": float(it.get("price") or 0),
                    "num": int(it.get("num") or 0),
                    "outer_id": it.get("outer_id",""),
                    "modified": it.get("modified","")
                })
            if len(items) < page_size:
                break
            pg += 1
            time.sleep(0.2)   # QPS 保护

    # ---- 订单增量 ----
    def sync_orders_inc(self, minutes=30):
        now = datetime.now()
        start = (now - timedelta(minutes=minutes)).strftime("%Y-%m-%d %H:%M:%S")
        end = now.strftime("%Y-%m-%d %H:%M:%S")

        r = self.top.call("taobao.trades.sold.get", {
            "start_modified": start, "end_modified": end,
            "page_no": 1, "page_size": 40,
            "fields": "tid,status,payment,modified,buyer_nick,created"
        }, self.session)
        tids = [t["tid"] for t in (r.get("trades", []) or [])]

        for tid in tids:
            detail = self.top.call("taobao.trade.fullinfo.get", {
                "tid": str(tid),
                "fields": "tid,status,payment,buyer_nick,created"
            }, self.session).get("trade", {})
            upsert_order(self.conn, detail)
            time.sleep(0.15)

# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ======================= main =======================
if __name__ == "__main__":
    AK = "YOUR_ENTERPRISE_APP_KEY"
    ASK = "YOUR_APP_SECRET"
    SESSION = "SELLER_ACCESS_TOKEN"   # 卖家OAuth

    conn = init_db()
    syncer = TbErpSync(AK, ASK, SESSION, conn)

    print("▶ 开始商品全量同步...")
    syncer.sync_items_full()

    print("▶ 开始订单增量同步(近30min)...")
    syncer.sync_orders_inc(minutes=30)

    print("✅ 同步完成(SQLite → erp.db)")

四、避坑清单(ERP实施必看)

后果
解决
用个人应用
403 订单接口
切企业实名应用+申请权限
session 用买家 token
空/403
必须用卖家账号OAuth换的 AccessToken
全量翻页不记断点
服务重启从头翻页超日额度
记录 last_page/max_modified断点续跑
订单不传 start_modified
全量拉取超量
固定时间窗增量(5~30min)
QPS 触发限流
code=7
sleep(0.15~0.2)或令牌桶;遇限流指数退避
skus库存空
公开查询别人商品
自己店铺查须传 session(已做)

五、面试/方案一句话

仿 ERP 同步 = Spring Boot @Scheduled调封装 TOP Client(taobao.items.onsale.get全量+增量 / taobao.trades.sold.get+trade.fullinfo.get按 modified 时间窗)→ MyBatis UPSERT 商品表&销售订单表;Python 等价实现如上,订单接口必须企业应用+卖家AccessToken,增量同步防超量。
需要我补 Java 版完整 pom.xml + application.yml 配置模板APScheduler 常驻守护版(带断点续跑文件) 吗?


群贤毕至

访客