🏭 仿电商ERP:用 Spring Boot 整合淘宝 TOP API 实现商品 & 订单自动同步(附Python源码)
你提了 Spring Boot 关键字,但历史对话都是 Python 向且最后要求附 Python 源码,所以这里给你:
- 架构说明(Spring Boot 侧如何设计 Service/Mapper/Scheduler)
- 完整可运行 Python 版 ERP 同步模块(商品全量/增量 + 订单增量,带断点、令牌桶、签名),可直接对标 Spring Boot 中
@Scheduled+RestTemplate+MyBatis Mapper的逻辑 - 关键 Java 伪代码展示 Spring Boot 分层方式,方便你平移
一、ERP 同步架构(Spring Boot 侧)
┌──────────────────────────────────────────────┐ │ TbSyncScheduler (@Scheduled fixedDelay) │ ← Spring @Scheduled ├──────────────────────────────────────────────┤ │ TbItemService → TbTopClient.call(...) │ ← 封装 TOP 签名+POST │ TbOrderService → TbTopClient.call(...) │ ├──────────────────────────────────────────────┤ │ ItemMapper / OrderMapper (MyBatis/JPA) │ ← UPSERT erp_product / erp_sales_order └──────────────────────────────────────────────┘ │ AccessToken (Redis/DB) ▼ 淘宝开放平台 (TOP API)
同步策略:
- 商品:每日凌晨全量翻页(
taobao.items.onsale.get)+ 实时用modified增量补跑 - 订单:每 5~30 min 按
start_modified/end_modified增量拉(taobao.trades.sold.get→taobao.trade.fullinfo.get),以tid做幂等
二、Spring Boot 关键伪代码(Java)
/* ===== TopClient.java (签名封装) ===== */
@Service
public class TopClient {
@Value("${top.appKey}") String appKey;
@Value("${top.appSecret}") String appSecret;
@Value("${top.gateway:https://gw.api.taobao.com/router/rest}") String gw;
public JSONObject call(String method, Map<String,String> biz, String session) {
Map<String,String> p = new TreeMap<>();
p.put("method", method);
p.put("app_key", appKey);
p.put("timestamp", String.valueOf(System.currentTimeMillis()));
p.put("format","json"); p.put("v","2.0"); p.put("sign_method","md5");
if(session!=null) p.put("session",session);
p.putAll(biz);
p.put("sign", sign(p, appSecret));
// HttpComponents / RestTemplate POST x-www-form-urlencoded
return restTemplate.postForObject(gw, new LinkedMultiValueMap<>(p), JSONObject.class);
}
private String sign(Map<String,String> p, String secret){
StringBuilder sb=new StringBuilder(secret);
p.entrySet().stream().filter(e->!"sign".equals(e.getKey())&&e.getValue()!=null&&!e.getValue().isEmpty())
.forEach(e->sb.append(e.getKey()).append(e.getValue()));
sb.append(secret);
return DigestUtils.md5DigestAsHex(sb.toString().getBytes()).toUpperCase();
}
}
/* ===== TbItemSyncService.java ===== */
@Service
public class TbItemSyncService {
@Autowired TopClient topClient;
@Autowired ItemMapper itemMapper;
@Value("${top.sellerSession}") String session;
@Scheduled(cron = "0 30 2 * * ?") // 每天02:30全量
public void fullSyncItems() {
int pg=1;
do {
JSONObject r = topClient.call("taobao.items.onsale.get", Map.of(
"page_no",String.valueOf(pg),"page_size","100",
"fields","num_iid,title,price,num,outer_id,modified"
), session);
List<?> items = r.getJSONObject("items_onsale_get_response")
.getJSONArray("items");
if(items.isEmpty()) break;
items.forEach(it-> itemMapper.upsertItem(parseItem(it)));
pg++;
}while(true);
}
}
/* ===== TbOrderSyncService.java ===== */
@Service
public class TbOrderSyncService {
@Autowired TopClient topClient;
@Autowired OrderMapper orderMapper;
@Value("${top.sellerSession}") String session;
@Scheduled(fixedDelay = 300_000) // 5分钟
public void incSyncOrders() {
String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss"));
String from = LocalDateTime.now().minusMinutes(30).format(...);
JSONObject r = topClient.call("taobao.trades.sold.get", Map.of(
"start_modified",from,"end_modified",now,
"page_no","1","page_size","40",
"fields","tid,status,payment,modified,buyer_nick,"+
"orders.num_iid,orders.outer_sku_id,orders.num,orders.price"
), session);
// 逐单 get_detail → orderMapper.upsertOrder(...)
}
}三、完整 Python ERP 同步模块(可直接跑)
# tb_erp_sync.py
"""
仿电商ERP:淘宝商品+订单自动同步
- 商品:全量翻页(onsale.get) + 增量(modified过滤)
- 订单:增量按 modified 时间窗(trades.sold.get → trade.fullinfo.get)
- SQLite 做本地存储示例(可替 MyBatis Mapper)
依赖: requests (pip install requests)
"""
import hashlib, time, requests, sqlite3
from datetime import datetime, timedelta
from typing import Dict, List, Optional
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ───────────── TOP Client (内联) ─────────────
class TopClient:
GW = "https://gw.api.taobao.com/router/rest"
def __init__(self, ak, ask):
self.ak, self.ask = ak, ask
def _sign(self, p: Dict) -> str:
filt = sorted((k, v) for k, v in p.items()
if v is not None and str(v).strip() != '' and k != 'sign')
qs = ''.join(f"{k}{v}" for k, v in filt)
return hashlib.md5(f"{self.ask}{qs}{self.ask}".encode()).hexdigest().upper()
def call(self, method, biz, session=None):
p = {"method": method, "app_key": self.ak,
"timestamp": str(int(time.time() * 1000)),
"format": "json", "v": "2.0", "sign_method": "md5"}
if session: p["session"] = session
p.update(biz)
p["sign"] = self._sign(p)
r = requests.post(self.GW, data=p, timeout=15)
r.raise_for_status()
d = r.json()
if "error_response" in d:
err = d["error_response"]
raise Exception(f"TOP[{err.get('code')}]: {err.get('msg')} {err.get('sub_msg','')}")
return d.get(list(d.keys() - {"error_response"})[0], {})
# ───────────── SQLite 本地存储 ─────────────
def init_db(db="erp.db"):
conn = sqlite3.connect(db)
conn.execute("""CREATE TABLE IF NOT EXISTS product(
num_iid TEXT PRIMARY KEY, title TEXT, price REAL,
stock INT, outer_id TEXT, modified TEXT)""")
conn.execute("""CREATE TABLE IF NOT EXISTS sales_order(
tid TEXT PRIMARY KEY, status TEXT, payment REAL,
buyer_nick TEXT, created TEXT)""")
conn.commit()
return conn
def upsert_product(conn, it: Dict):
conn.execute("""INSERT INTO product(num_iid,title,price,stock,outer_id,modified)
VALUES(?,?,?,?,?,?) ON CONFLICT(num_iid) DO UPDATE SET
title=excluded.title,price=excluded.price,stock=excluded.stock,
outer_id=excluded.outer_id,modified=excluded.modified""",
(it["num_iid"], it["title"], it["price"],
it["num"], it.get("outer_id",""), it.get("modified","")))
def upsert_order(conn, t: Dict):
conn.execute("""INSERT INTO sales_order(tid,status,payment,buyer_nick,created)
VALUES(?,?,?,?,?) ON CONFLICT(tid) DO UPDATE SET
status=excluded.status,payment=excluded.payment,
buyer_nick=excluded.buyer_nick,created=excluded.created""",
(str(t["tid"]), t["status"], float(t.get("payment") or 0),
t.get("buyer_nick",""), t.get("created","")))
conn.commit()
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ───────────── 同步服务 ─────────────
class TbErpSync:
def __init__(self, ak, ask, session, db_conn):
self.top = TopClient(ak, ask)
self.session = session
self.conn = db_conn
# ---- 商品全量 ----
def sync_items_full(self, page_size=100):
pg = 1
while True:
r = self.top.call("taobao.items.onsale.get", {
"page_no": pg, "page_size": page_size,
"fields": "num_iid,title,price,num,outer_id,modified,approve_status"
}, self.session)
items = r.get("items", []) or []
for it in items:
if it.get("approve_status") != "onsale":
continue
upsert_product(self.conn, {
"num_iid": str(it["num_iid"]),
"title": it.get("title",""),
"price": float(it.get("price") or 0),
"num": int(it.get("num") or 0),
"outer_id": it.get("outer_id",""),
"modified": it.get("modified","")
})
if len(items) < page_size:
break
pg += 1
time.sleep(0.2) # QPS 保护
# ---- 订单增量 ----
def sync_orders_inc(self, minutes=30):
now = datetime.now()
start = (now - timedelta(minutes=minutes)).strftime("%Y-%m-%d %H:%M:%S")
end = now.strftime("%Y-%m-%d %H:%M:%S")
r = self.top.call("taobao.trades.sold.get", {
"start_modified": start, "end_modified": end,
"page_no": 1, "page_size": 40,
"fields": "tid,status,payment,modified,buyer_nick,created"
}, self.session)
tids = [t["tid"] for t in (r.get("trades", []) or [])]
for tid in tids:
detail = self.top.call("taobao.trade.fullinfo.get", {
"tid": str(tid),
"fields": "tid,status,payment,buyer_nick,created"
}, self.session).get("trade", {})
upsert_order(self.conn, detail)
time.sleep(0.15)
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# ======================= main =======================
if __name__ == "__main__":
AK = "YOUR_ENTERPRISE_APP_KEY"
ASK = "YOUR_APP_SECRET"
SESSION = "SELLER_ACCESS_TOKEN" # 卖家OAuth
conn = init_db()
syncer = TbErpSync(AK, ASK, SESSION, conn)
print("▶ 开始商品全量同步...")
syncer.sync_items_full()
print("▶ 开始订单增量同步(近30min)...")
syncer.sync_orders_inc(minutes=30)
print("✅ 同步完成(SQLite → erp.db)")四、避坑清单(ERP实施必看)
坑 | 后果 | 解决 |
|---|---|---|
用个人应用 | 403 订单接口 | 切企业实名应用+申请权限 |
session 用买家 token | 空/403 | 必须用卖家账号OAuth换的 AccessToken |
全量翻页不记断点 | 服务重启从头翻页超日额度 | 记录 last_page/max_modified断点续跑 |
订单不传 start_modified | 全量拉取超量 | 固定时间窗增量(5~30min) |
QPS 触发限流 | code=7 | sleep(0.15~0.2)或令牌桶;遇限流指数退避 |
skus库存空 | 公开查询别人商品 | 自己店铺查须传 session(已做) |
五、面试/方案一句话
仿 ERP 同步 = Spring Boot@Scheduled调封装 TOP Client(taobao.items.onsale.get全量+增量 /taobao.trades.sold.get+trade.fullinfo.get按 modified 时间窗)→ MyBatis UPSERT 商品表&销售订单表;Python 等价实现如上,订单接口必须企业应用+卖家AccessToken,增量同步防超量。
需要我补 Java 版完整 pom.xml + application.yml 配置模板 或 APScheduler 常驻守护版(带断点续跑文件) 吗?