一、接口概览
1.1 接口简介
item_search接口是锦程物流开放平台提供的物流订单搜索接口,支持根据多种条件查询物流订单列表,实现订单的模糊搜索、状态筛选、时间范围查询等复杂检索功能。1.2 核心功能
- ✅ 关键词搜索:订单号、运单号、收发货人姓名/电话模糊匹配
- ✅ 多条件筛选:按状态、时间、物流产品等维度筛选
- ✅ 分页查询:支持大数据量的分页加载
- ✅ 排序功能:支持按创建时间、更新时间等字段排序
- ✅ 字段选择:可指定返回字段,优化网络传输
二、准备工作
2.1 环境准备
# Python环境pip install requests python-dotenv# Java环境(Maven)<dependency> <groupId>com.squareup.okhttp3</groupId> <artifactId>okhttp</artifactId> <version>4.10.0</version> </dependency># PHP环境composer require guzzlehttp/guzzle
2.2 配置管理
# config.pyimport osfrom dotenv import load_dotenv
load_dotenv()class Config: # API配置
JC_APP_KEY = os.getenv('JC_APP_KEY', '')
JC_APP_SECRET = os.getenv('JC_APP_SECRET', '')
JC_API_ENDPOINT = os.getenv('JC_API_ENDPOINT',
'https://sandbox-api.jc56.com' # 默认沙箱环境
)
# 业务配置
DEFAULT_PAGE_SIZE = 20
MAX_PAGE_SIZE = 100
REQUEST_TIMEOUT = 30三、接口详解
3.1 接口地址
POST /v1/item/search
3.2 请求参数详解
公共参数
参数名 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
app_key | string | 是 | 应用标识 | jc_app_2024 |
timestamp | string | 是 | 请求时间戳 | 2026-02-01 10:00:00 |
sign | string | 是 | 请求签名 | 详见签名算法 |
sign_method | string | 是 | 签名方法 | md5 |
format | string | 否 | 返回格式 | json(默认) |
version | string | 是 | API版本 | 1.0 |
业务参数
参数名 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
keyword | string | 否 | 搜索关键词 | JC20260201 |
order_no | string | 否 | 精确订单号 | JC202602010001 |
waybill_no | string | 否 | 运单号 | SF123456789 |
shipper_name | string | 否 | 发货人姓名 | 张三 |
shipper_phone | string | 否 | 发货人电话 | 13800138000 |
consignee_name | string | 否 | 收货人姓名 | 李四 |
consignee_phone | string | 否 | 收货人电话 | 13900139000 |
status | string | 否 | 订单状态 | CREATED, PICKED_UP, IN_TRANSIT, DELIVERED |
start_time | string | 否 | 开始时间 | 2026-01-01 00:00:00 |
end_time | string | 否 | 结束时间 | 2026-02-01 23:59:59 |
product_code | string | 否 | 产品编码 | JC_EXPRESS |
page_no | int | 否 | 页码 | 1(默认) |
page_size | int | 否 | 每页条数 | 20(默认) |
sort_field | string | 否 | 排序字段 | create_time, update_time |
sort_order | string | 否 | 排序方向 | desc(默认), asc |
fields | string | 否 | 返回字段 | order_no,status,create_time |
3.3 签名算法
import hashlibfrom typing import Dict, Anydef generate_sign(params: Dict[str, Any], app_secret: str) -> str: """
生成API请求签名
Args:
params: 请求参数字典
app_secret: 应用密钥
Returns:
32位大写MD5签名
"""
# 1. 过滤空值和sign参数
filtered_params = {
k: v for k, v in params.items()
if v is not None and k != 'sign'
}
# 2. 按键名ASCII升序排序
sorted_keys = sorted(filtered_params.keys())
# 3. 拼接键值对
sign_str = ''
for key in sorted_keys: # 处理数组和对象类型参数
if isinstance(filtered_params[key], (list, dict)):
value = str(filtered_params[key]) else:
value = str(filtered_params[key])
sign_str += f"{key}{value}"
# 4. 拼接app_secret
sign_str += app_secret
# 5. 计算MD5并转为大写
md5_hash = hashlib.md5()
md5_hash.update(sign_str.encode('utf-8')) return md5_hash.hexdigest().upper()# 测试用例test_params = { "app_key": "test_app", "timestamp": "2026-02-01 10:00:00", "keyword": "测试", "page_no": 1, "page_size": 20}
app_secret = "test_secret"signature = generate_sign(test_params, app_secret)print(f"生成的签名: {signature}")四、完整代码实现
4.1 Python完整实现
import requestsimport jsonimport hashlibimport timefrom typing import Dict, Any, List, Optionalfrom datetime import datetimefrom urllib.parse import urljoinclass JCLogisticsSearchAPI: """锦程物流搜索API客户端"""
def __init__(self, app_key: str, app_secret: str, sandbox: bool = True): """
初始化API客户端
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
Args:
app_key: 应用密钥
app_secret: 应用密钥
sandbox: 是否使用沙箱环境
"""
self.app_key = app_key self.app_secret = app_secret self.base_url = "https://sandbox-api.jc56.com" if sandbox else "https://api.jc56.com"
self.session = requests.Session() self.session.headers.update({ "User-Agent": "JCLogistics-Search-Client/1.0", "Accept": "application/json", "Content-Type": "application/json; charset=utf-8"
})
def _generate_timestamp(self) -> str: """生成标准格式时间戳"""
return datetime.now().strftime("%Y-%m-%d %H:%M:%S")
def _generate_sign(self, params: Dict[str, Any]) -> str: """生成请求签名"""
# 过滤空值和sign参数
filtered_params = {
k: v for k, v in params.items()
if v is not None and k != 'sign'
}
# 排序并拼接
sorted_keys = sorted(filtered_params.keys())
sign_str = ''
for key in sorted_keys: if isinstance(filtered_params[key], (list, dict)):
value = json.dumps(filtered_params[key], separators=(',', ':')) else:
value = str(filtered_params[key])
sign_str += f"{key}{value}"
# 添加密钥并计算MD5
sign_str += self.app_secret return hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper()
def search_orders(
self,
keyword: Optional[str] = None,
order_no: Optional[str] = None,
status_list: Optional[List[str]] = None,
start_time: Optional[str] = None,
end_time: Optional[str] = None,
page_no: int = 1,
page_size: int = 20,
sort_field: str = "create_time",
sort_order: str = "desc",
fields: Optional[List[str]] = None,
**extra_params ) -> Dict[str, Any]: """
搜索物流订单
Args:
keyword: 搜索关键词(支持模糊匹配)
order_no: 精确订单号
status_list: 状态筛选列表
start_time: 开始时间(格式:YYYY-MM-DD HH:MM:SS)
end_time: 结束时间(格式:YYYY-MM-DD HH:MM:SS)
page_no: 页码(从1开始)
page_size: 每页数量(1-100)
sort_field: 排序字段
sort_order: 排序方向(desc降序/asc升序)
fields: 指定返回字段列表
**extra_params: 其他查询参数
Returns:
API响应数据
"""
# 构建请求参数
params = { "app_key": self.app_key, "timestamp": self._generate_timestamp(), "sign_method": "md5", "format": "json", "version": "1.0", "page_no": max(1, page_no), "page_size": min(max(1, page_size), 100), "sort_field": sort_field, "sort_order": sort_order,
}
# 添加可选参数
if keyword:
params["keyword"] = keyword if order_no:
params["order_no"] = order_no if status_list:
params["status"] = ",".join(status_list) if start_time:
params["start_time"] = start_time if end_time:
params["end_time"] = end_time if fields:
params["fields"] = ",".join(fields)
# 添加额外参数
params.update(extra_params)
# 生成签名
params["sign"] = self._generate_sign(params)
# 发送请求
url = urljoin(self.base_url, "/v1/item/search")
try:
response = self.session.post(
url,
json=params,
timeout=30
)
response.raise_for_status()
result = response.json()
# 验证返回签名(如果返回中有sign字段)
if "sign" in result: # 注意:这里需要根据实际API返回的签名验证规则实现
pass
return result
except requests.exceptions.Timeout: return { "success": False, "code": "TIMEOUT", "message": "请求超时", "data": None
} except requests.exceptions.RequestException as e: return { "success": False, "code": "REQUEST_ERROR", "message": f"请求失败: {str(e)}", "data": None
} except json.JSONDecodeError: return { "success": False, "code": "INVALID_RESPONSE", "message": "响应解析失败", "data": None
}
def search_all_orders(
self,
**search_params ) -> List[Dict[str, Any]]: """
获取所有符合条件的订单(自动处理分页)
Args:
**search_params: 搜索参数
Returns:
订单列表
"""
all_orders = []
page_no = 1
while True: # 获取当前页数据
result = self.search_orders(
page_no=page_no,
page_size=100, # 使用最大页数减少请求次数
**search_params
)
if not result.get("success"): print(f"第{page_no}页查询失败: {result.get('message')}") break
data = result.get("data", {})
orders = data.get("list", [])
pagination = data.get("pagination", {})
# 添加当前页数据
all_orders.extend(orders)
# 检查是否还有下一页
current_page = pagination.get("page_no", page_no)
total_pages = pagination.get("total_pages", 0)
has_next = pagination.get("has_next", False)
print(f"已获取第{current_page}页,共{len(orders)}条,总计{len(all_orders)}条")
if not has_next or current_page >= total_pages: break
page_no += 1
# 避免请求过于频繁
time.sleep(0.5)
return all_orders# 使用示例def demo_search(): """搜索接口使用演示"""
# 初始化客户端
client = JCLogisticsSearchAPI(
app_key="your_app_key",
app_secret="your_app_secret",
sandbox=True
)
print("=== 示例1:关键词搜索 ===")
result1 = client.search_orders(
keyword="北京",
page_no=1,
page_size=10
) print(json.dumps(result1, ensure_ascii=False, indent=2))
print("\n=== 示例2:状态筛选 ===")
result2 = client.search_orders(
status_list=["DELIVERED", "IN_TRANSIT"],
start_time="2026-01-01 00:00:00",
end_time="2026-02-01 23:59:59",
sort_field="create_time",
sort_order="desc"
)
print("\n=== 示例3:获取所有待发货订单 ===")
all_pending_orders = client.search_all_orders(
status_list=["CREATED"],
start_time="2026-01-01 00:00:00"
) print(f"共找到 {len(all_pending_orders)} 个待发货订单")if __name__ == "__main__":
demo_search()4.2 Java实现
import com.fasterxml.jackson.annotation.JsonInclude;import com.fasterxml.jackson.databind.ObjectMapper;import okhttp3.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.crypto.Mac;import javax.crypto.spec.SecretKeySpec;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.security.MessageDigest;import java.time.LocalDateTime;import java.time.format.DateTimeFormatter;import java.util.*;import java.util.concurrent.TimeUnit;public class JCLogisticsSearchClient { private static final Logger logger = LoggerFactory.getLogger(JCLogisticsSearchClient.class);
private final String appKey; private final String appSecret; private final String baseUrl; private final OkHttpClient httpClient; private final ObjectMapper objectMapper;
public JCLogisticsSearchClient(String appKey, String appSecret, boolean sandbox) { this.appKey = appKey; this.appSecret = appSecret; this.baseUrl = sandbox ? "https://sandbox-api.jc56.com" : "https://api.jc56.com";
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.addInterceptor(new LoggingInterceptor())
.build();
this.objectMapper = new ObjectMapper(); this.objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
}
// 搜索参数构建器
public static class SearchParams { private String keyword; private String orderNo; private List<String> statusList; private String startTime; private String endTime; private Integer pageNo = 1; private Integer pageSize = 20; private String sortField = "create_time"; private String sortOrder = "desc"; private List<String> fields;
// 省略getter/setter和builder方法
}
public SearchResult searchOrders(SearchParams params) throws IOException { // 构建请求参数
Map<String, Object> requestParams = new HashMap<>();
requestParams.put("app_key", appKey);
requestParams.put("timestamp", LocalDateTime.now()
.format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")));
requestParams.put("sign_method", "md5");
requestParams.put("format", "json");
requestParams.put("version", "1.0");
// 添加业务参数
if (params.getKeyword() != null) {
requestParams.put("keyword", params.getKeyword());
} if (params.getStatusList() != null && !params.getStatusList().isEmpty()) {
requestParams.put("status", String.join(",", params.getStatusList()));
} // ... 其他参数
// 生成签名
String sign = generateSign(requestParams);
requestParams.put("sign", sign);
// 构建请求
String jsonBody = objectMapper.writeValueAsString(requestParams); Request request = new Request.Builder()
.url(baseUrl + "/v1/item/search")
.post(RequestBody.create(jsonBody, MediaType.parse("application/json")))
.addHeader("User-Agent", "JCLogistics-Java-Client/1.0")
.build();
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
// 发送请求
try (Response response = httpClient.newCall(request).execute()) { if (!response.isSuccessful()) { throw new IOException("Unexpected code " + response);
}
String responseBody = response.body().string(); return objectMapper.readValue(responseBody, SearchResult.class);
}
}
// 省略其他辅助方法...
static class SearchResult { private Boolean success; private String code; private String message; private SearchData data;
// 省略getter/setter
}
static class SearchData { private List<OrderInfo> list; private Pagination pagination;
// 省略getter/setter
}
static class Pagination { private Integer pageNo; private Integer pageSize; private Integer totalCount; private Integer totalPages; private Boolean hasNext;
// 省略getter/setter
}
}4.3 PHP实现(Laravel示例)
<?phpnamespace App\Services\Logistics;use GuzzleHttp\Client;use GuzzleHttp\Exception\RequestException;use Illuminate\Support\Facades\Cache;use Illuminate\Support\Facades\Log;class JCLogisticsSearchService{ private $appKey; private $appSecret; private $baseUrl; private $client; private $timeout = 30;
public function __construct($sandbox = true) { $this->appKey = config('services.jc_logistics.app_key'); $this->appSecret = config('services.jc_logistics.app_secret'); $this->baseUrl = $sandbox
? 'https://sandbox-api.jc56.com'
: 'https://api.jc56.com';
$this->client = new Client([ 'base_uri' => $this->baseUrl, 'timeout' => $this->timeout, 'headers' => [ 'User-Agent' => 'JCLogistics-Laravel-Client/1.0', 'Accept' => 'application/json',
]
]);
}
/**
* 搜索订单
*/
public function search(array $params = []): array
{ try { // 基础参数
$requestParams = [ 'app_key' => $this->appKey, 'timestamp' => date('Y-m-d H:i:s'), 'sign_method' => 'md5', 'format' => 'json', 'version' => '1.0',
];
// 合并业务参数
$requestParams = array_merge($requestParams, $params);
// 生成签名
$requestParams['sign'] = $this->generateSign($requestParams);
// 发送请求
$response = $this->client->post('/v1/item/search', [ 'json' => $requestParams, 'http_errors' => false
]);
$statusCode = $response->getStatusCode(); $body = $response->getBody()->getContents(); $result = json_decode($body, true);
if ($statusCode !== 200) { Log::error('锦程物流搜索接口请求失败', [ 'status' => $statusCode, 'response' => $result
]);
return [ 'success' => false, 'code' => 'HTTP_ERROR', 'message' => '接口请求失败', 'data' => null
];
}
return $result;
} catch (RequestException $e) { Log::error('锦程物流搜索接口异常', [ 'error' => $e->getMessage()
]);
return [ 'success' => false, 'code' => 'REQUEST_EXCEPTION', 'message' => $e->getMessage(), 'data' => null
];
}
}
/**
* 带缓存搜索
*/
public function searchWithCache(array $params, int $ttl = 300): array
{ $cacheKey = 'jc_search:' . md5(serialize($params));
return Cache::remember($cacheKey, $ttl, function () use ($params) { return $this->search($params);
});
}
/**
* 生成签名
*/
private function generateSign(array $params): string
{ // 移除sign参数
unset($params['sign']);
// 过滤空值并按键名排序
$params = array_filter($params, function ($value) { return $value !== null && $value !== '';
}); ksort($params);
// 拼接字符串
$signStr = ''; foreach ($params as $key => $value) { if (is_array($value) || is_object($value)) { $value = json_encode($value, JSON_UNESCAPED_UNICODE);
} $signStr .= $key . $value;
}
// 添加密钥并计算MD5
$signStr .= $this->appSecret; return strtoupper(md5($signStr));
}
/**
* 批量搜索所有订单
*/
public function searchAll(array $params = []): array
{ $allOrders = []; $pageNo = 1;
do { $params['page_no'] = $pageNo; $params['page_size'] = 100;
$result = $this->search($params);
if (!$result['success']) { break;
}
$orders = $result['data']['list'] ?? []; $pagination = $result['data']['pagination'] ?? [];
$allOrders = array_merge($allOrders, $orders);
$hasNext = $pagination['has_next'] ?? false; $totalPages = $pagination['total_pages'] ?? 0;
if (!$hasNext || $pageNo >= $totalPages) { break;
}
$pageNo++; usleep(500000); // 延迟0.5秒
} while (true);
return $allOrders;
}
}五、返回结果处理
5.1 成功响应格式
{
"success": true,
"code": "10000",
"message": "成功",
"data": {
"list": [
{
"order_no": "JC202602010001",
"waybill_no": "SF123456789",
"status": "DELIVERED",
"status_desc": "已签收",
"shipper_name": "张三",
"shipper_phone": "13800138000",
"consignee_name": "李四",
"consignee_phone": "13900139000",
"create_time": "2026-02-01 09:00:00",
"update_time": "2026-02-01 16:30:00",
"product_name": "锦程快递",
"weight": 2.5,
"volume": 0.02,
"amount": 25.50
},
{
"order_no": "JC202602010002",
"waybill_no": "SF123456790",
"status": "IN_TRANSIT",
"status_desc": "运输中",
"shipper_name": "王五",
"shipper_phone": "13800138001",
"consignee_name": "赵六",
"consignee_phone": "13900139001",
"create_time": "2026-02-01 10:00:00",
"update_time": "2026-02-01 14:20:00",
"product_name": "锦程快运",
"weight": 15.8,
"volume": 0.15,
"amount": 120.00
}
],
"pagination": {
"page_no": 1,
"page_size": 20,
"total_count": 125,
"total_pages": 7,
"has_next": true,
"has_previous": false
},
"summary": {
"total_amount": 2845.50,
"total_weight": 158.3,
"order_count": 125,
"status_distribution": {
"CREATED": 12,
"PICKED_UP": 28,
"IN_TRANSIT": 45,
"DELIVERED": 40
}
}
}}5.2 响应字段说明
订单信息字段
字段名 | 类型 | 说明 |
|---|---|---|
order_no | string | 订单号(唯一标识) |
waybill_no | string | 运单号 |
status | string | 状态编码 |
status_desc | string | 状态描述 |
shipper_* | string | 发货人信息 |
consignee_* | string | 收货人信息 |
create_time | string | 创建时间 |
update_time | string | 更新时间 |
product_name | string | 产品名称 |
weight | float | 重量(kg) |
volume | float | 体积(m³) |
amount | float | 金额(元) |
分页信息字段
字段名 | 类型 | 说明 |
|---|---|---|
page_no | int | 当前页码 |
page_size | int | 每页数量 |
total_count | int | 总记录数 |
total_pages | int | 总页数 |
has_next | bool | 是否有下一页 |
has_previous | bool | 是否有上一页 |
5.3 错误响应处理
class JCLogisticsError(Exception): """锦程物流API异常基类"""
passclass AuthenticationError(JCLogisticsError): """认证错误"""
passclass ValidationError(JCLogisticsError): """参数验证错误"""
passclass RateLimitError(JCLogisticsError): """频率限制错误"""
passclass APIError(JCLogisticsError): """API错误"""
def __init__(self, code, message, response=None): self.code = code self.message = message self.response = response super().__init__(f"[{code}] {message}")def handle_api_response(response: Dict[str, Any]) -> Dict[str, Any]: """
统一处理API响应
Args:
response: API响应数据
Returns:
处理后的数据
Raises:
各种业务异常
"""
if not response.get("success"):
code = response.get("code")
message = response.get("message", "未知错误")
# 根据错误码抛出特定异常
if code in ["20001", "20002", "20003"]: raise AuthenticationError(message) elif code in ["20004", "20005"]: raise ValidationError(message) elif code == "20006": raise RateLimitError(message) else: raise APIError(code, message, response)
return response.get("data", {})# 使用示例try:
result = client.search_orders(keyword="测试")
data = handle_api_response(result)
orders = data.get("list", [])
except AuthenticationError as e: print(f"认证失败: {e}") # 重新获取token或提醒用户except RateLimitError as e: print(f"请求频率超限: {e}") # 等待后重试
time.sleep(60)except APIError as e: print(f"API错误: {e}") # 记录日志并通知管理员
logger.error(f"锦程物流API错误: {e.code} - {e.message}")except Exception as e: print(f"未知错误: {e}")六、高级功能实现
6.1 智能搜索建议
class IntelligentSearchService: """智能搜索服务"""
def __init__(self, api_client): self.client = api_client self.search_history = [] # 搜索历史缓存
def smart_search(self, query: str) -> Dict[str, Any]: """
智能搜索:自动识别搜索类型
Args:
query: 搜索查询字符串
Returns:
搜索结果
"""
# 识别查询类型
search_type = self._detect_search_type(query)
# 构建搜索参数
params = self._build_search_params(query, search_type)
# 执行搜索
result = self.client.search_orders(**params)
# 记录搜索历史
self._record_search_history(query, search_type, result)
return result
def _detect_search_type(self, query: str) -> str: """
自动识别搜索类型
"""
# 检查是否为订单号(特定格式)
if re.match(r'^JC\d{12}$', query): return 'order_no'
# 检查是否为运单号
if re.match(r'^[A-Z]{2}\d{9}$', query): return 'waybill_no'
# 检查是否为手机号
if re.match(r'^1[3-9]\d{9}$', query): return 'phone'
# 检查是否为日期
if re.match(r'^\d{4}-\d{2}-\d{2}$', query): return 'date'
# 默认为关键词搜索
return 'keyword'
def _build_search_params(self, query: str, search_type: str) -> Dict[str, Any]: """
根据搜索类型构建参数
"""
params = {}
if search_type == 'order_no':
params['order_no'] = query elif search_type == 'waybill_no':
params['waybill_no'] = query elif search_type == 'phone': # 尝试同时搜索发货人和收货人电话
params['shipper_phone'] = query
params['consignee_phone'] = query elif search_type == 'date':
params['start_time'] = f"{query} 00:00:00"
params['end_time'] = f"{query} 23:59:59"
else:
params['keyword'] = query
return params6.2 实时搜索提示
// 前端实现示例(Vue.js)<template> <div class="logistics-search">
<div class="search-box">
<input
v-model="searchQuery"
@input="onSearchInput"
@keyup.enter="doSearch"
placeholder="搜索订单号、运单号、手机号..."
/>
<button @click="doSearch">搜索</button>
</div>
<!-- 搜索建议 -->
<div v-if="showSuggestions" class="suggestions">
<div
v-for="suggestion in suggestions"
:key="suggestion.value"
@click="selectSuggestion(suggestion)"
class="suggestion-item"
>
<div class="suggestion-type">{{ suggestion.type }}</div>
<div class="suggestion-value">{{ suggestion.value }}</div>
</div>
</div>
<!-- 搜索结果 -->
<div v-if="searchResults" class="results">
<div class="result-summary">
找到 {{ searchResults.pagination.total_count }} 条记录 </div>
<table class="result-table">
<!-- 表格内容 -->
</table>
<div class="pagination">
<!-- 分页组件 -->
</div>
</div>
</div></template><script>export default { data() { return { searchQuery: '', suggestions: [], showSuggestions: false, searchResults: null, searchHistory: JSON.parse(localStorage.getItem('jc_search_history')) || []
}
}, methods: { async onSearchInput() { if (this.searchQuery.length < 2) { this.suggestions = []; this.showSuggestions = false; return;
}
// 防抖处理
clearTimeout(this.suggestTimer); this.suggestTimer = setTimeout(async () => { try { const response = await this.$http.post('/api/jc-logistics/suggest', { query: this.searchQuery
}); this.suggestions = response.data; this.showSuggestions = this.suggestions.length > 0;
} catch (error) { console.error('获取搜索建议失败:', error);
}
}, 300);
},
async doSearch() { try { const response = await this.$http.post('/api/jc-logistics/search', { query: this.searchQuery, page: 1, size: 20
});
this.searchResults = response.data; this.showSuggestions = false;
// 保存搜索历史
this.saveSearchHistory(this.searchQuery);
} catch (error) { this.$message.error('搜索失败: ' + error.message);
}
},
saveSearchHistory(query) { // 去重并限制历史记录数量
const index = this.searchHistory.findIndex(item => item === query); if (index !== -1) { this.searchHistory.splice(index, 1);
}
this.searchHistory.unshift(query); this.searchHistory = this.searchHistory.slice(0, 10);
localStorage.setItem('jc_search_history',
JSON.stringify(this.searchHistory));
}
}
}</script>6.3 搜索性能优化
import redisfrom functools import lru_cachefrom datetime import datetime, timedeltaclass OptimizedSearchService: """优化搜索服务"""
def __init__(self, api_client, redis_client): self.api_client = api_client self.redis = redis_client self.cache_prefix = "jc_search:"
@lru_cache(maxsize=1000)
def search_with_cache(self, cache_key: str, **params): """
使用内存缓存和Redis缓存的搜索
"""
# 先尝试从Redis获取
cached = self.redis.get(cache_key) if cached: return json.loads(cached)
# 调用API
result = self.api_client.search_orders(**params)
# 缓存到Redis(根据数据量设置不同过期时间)
if result.get("success"):
data = result.get("data", {})
total_count = data.get("pagination", {}).get("total_count", 0)
# 根据数据量设置缓存时间
if total_count == 0:
ttl = 300 # 5分钟
elif total_count <= 100:
ttl = 1800 # 30分钟
else:
ttl = 3600 # 1小时
self.redis.setex(
cache_key,
ttl,
json.dumps(result)
)
return result
def smart_search_optimized(self, **params) -> Dict[str, Any]: """
智能优化搜索
"""
# 生成缓存键
cache_key = self._generate_cache_key(params)
# 检查是否可以使用缓存
if self._can_use_cache(params): return self.search_with_cache(cache_key, **params)
# 实时搜索
return self.api_client.search_orders(**params)
def _generate_cache_key(self, params: Dict[str, Any]) -> str: """生成缓存键"""
# 移除分页参数,因为它们不影响数据本身
cache_params = params.copy()
cache_params.pop('page_no', None)
cache_params.pop('page_size', None)
# 生成唯一键
param_str = json.dumps(cache_params, sort_keys=True) return f"{self.cache_prefix}{hashlib.md5(param_str.encode()).hexdigest()}"
def _can_use_cache(self, params: Dict[str, Any]) -> bool: """判断是否可以使用缓存"""
# 实时性要求高的查询不使用缓存
if params.get('real_time', False): return False
# 特定状态不使用缓存
real_time_statuses = ['PICKED_UP', 'IN_TRANSIT', 'DELIVERING'] if params.get('status') in real_time_statuses: return False
return True
def batch_search(self, search_queries: List[Dict[str, Any]]) -> List[Dict[str, Any]]: """
批量搜索优化
"""
from concurrent.futures import ThreadPoolExecutor, as_completed
results = []
with ThreadPoolExecutor(max_workers=5) as executor: # 提交所有搜索任务
future_to_query = {
executor.submit(self.smart_search_optimized, **query): query
for query in search_queries
}
# 收集结果
for future in as_completed(future_to_query): try:
result = future.result(timeout=10)
results.append(result) except Exception as e:
query = future_to_query[future] print(f"查询失败 {query}: {e}")
results.append({ "success": False, "error": str(e)
})
return results七、实战应用场景
7.1 物流管理系统集成
class LogisticsManagementSystem: """物流管理系统集成示例"""
def __init__(self, jc_client): self.jc_client = jc_client self.order_cache = {}
def search_orders_by_criteria(self, criteria: Dict[str, Any]) -> List[Dict[str, Any]]: """
根据综合条件搜索订单
Args:
criteria: 搜索条件字典,包含:
- date_range: 日期范围
- status: 状态
- customer_info: 客户信息
- product_type: 产品类型
- amount_range: 金额范围
Returns:
订单列表
"""
# 构建API搜索参数
search_params = {}
# 处理日期范围
if 'date_range' in criteria:
start_date, end_date = criteria['date_range']
search_params['start_time'] = f"{start_date} 00:00:00"
search_params['end_time'] = f"{end_date} 23:59:59"
# 处理状态
if 'status' in criteria:
search_params['status'] = criteria['status']
# 处理客户信息
if 'customer_info' in criteria:
customer = criteria['customer_info'] # 尝试多种搜索方式
if 'phone' in customer:
search_params['shipper_phone'] = customer['phone']
search_params['consignee_phone'] = customer['phone'] elif 'name' in customer:
search_params['keyword'] = customer['name']
# 执行搜索
result = self.jc_client.search_orders(**search_params)
if not result.get('success'): raise Exception(f"搜索失败: {result.get('message')}")
orders = result.get('data', {}).get('list', [])
# 应用额外的过滤条件
filtered_orders = self._apply_filters(orders, criteria)
return filtered_orders
def get_dashboard_stats(self, start_date: str, end_date: str) -> Dict[str, Any]: """
获取仪表板统计信息
"""
# 搜索所有订单
all_orders = self.jc_client.search_all_orders(
start_time=f"{start_date} 00:00:00",
end_time=f"{end_date} 23:59:59"
)
# 计算统计信息
stats = { 'total_orders': len(all_orders), 'total_amount': sum(order.get('amount', 0) for order in all_orders), 'total_weight': sum(order.get('weight', 0) for order in all_orders), 'status_distribution': {}, 'daily_trend': self._calculate_daily_trend(all_orders)
}
# 统计状态分布
for order in all_orders:
status = order.get('status', 'UNKNOWN')
stats['status_distribution'][status] = \
stats['status_distribution'].get(status, 0) + 1
return stats
def export_orders_to_excel(self, search_params: Dict[str, Any],
filename: str) -> str: """
导出订单到Excel
"""
import pandas as pd
# 获取所有订单
all_orders = self.jc_client.search_all_orders(**search_params)
# 转换为DataFrame
df = pd.DataFrame(all_orders)
# 数据清洗和转换
if not df.empty: # 选择需要的列
columns = ['order_no', 'waybill_no', 'status_desc',
'shipper_name', 'shipper_phone', 'consignee_name', 'consignee_phone', 'create_time', 'amount', 'weight']
df = df[columns]
# 重命名列
column_mapping = { 'order_no': '订单号', 'waybill_no': '运单号', 'status_desc': '状态', 'shipper_name': '发货人', 'shipper_phone': '发货人电话', 'consignee_name': '收货人', 'consignee_phone': '收货人电话', 'create_time': '创建时间', 'amount': '金额', 'weight': '重量'
}
df = df.rename(columns=column_mapping)
# 导出到Excel
df.to_excel(filename, index=False)
return filename7.2 订单状态监控系统
class OrderMonitor: """订单状态监控系统"""
def __init__(self, jc_client, alert_service): self.jc_client = jc_client self.alert_service = alert_service self.monitored_statuses = { 'DELAYED': 24 * 3600, # 24小时未更新
'EXCEPTION': 0, # 异常状态立即通知
}
def monitor_abnormal_orders(self): """
监控异常订单
"""
# 搜索特定状态的订单
abnormal_orders = []
for status in ['EXCEPTION', 'PROBLEM']:
result = self.jc_client.search_orders(
status=status,
page_size=100
)
if result.get('success'):
orders = result.get('data', {}).get('list', [])
abnormal_orders.extend(orders)
# 处理异常订单
for order in abnormal_orders: self._handle_abnormal_order(order)
def monitor_delayed_orders(self): """
监控延迟订单
"""
# 搜索24小时未更新的运输中订单
import datetime
cutoff_time = (datetime.datetime.now() -
datetime.timedelta(hours=24)).strftime("%Y-%m-%d %H:%M:%S")
result = self.jc_client.search_orders(
status='IN_TRANSIT',
end_time=cutoff_time,
sort_field='update_time',
sort_order='asc'
)
if result.get('success'):
delayed_orders = result.get('data', {}).get('list', [])
for order in delayed_orders: self._handle_delayed_order(order)
def _handle_abnormal_order(self, order: Dict[str, Any]): """处理异常订单"""
order_no = order.get('order_no')
status_desc = order.get('status_desc', '未知状态')
# 发送告警
alert_message = f"""
订单异常告警!
订单号:{order_no}
状态:{status_desc}
发货人:{order.get('shipper_name')}
收货人:{order.get('consignee_name')}
更新时间:{order.get('update_time')}
"""
self.alert_service.send_alert(
title="物流订单异常",
message=alert_message,
level="HIGH",
recipients=["logistics_manager@company.com"]
)
# 记录到数据库
self._log_abnormal_order(order)
def _handle_delayed_order(self, order: Dict[str, Any]): """处理延迟订单"""
order_no = order.get('order_no')
last_update = order.get('update_time')
# 发送通知
notification = f"""
订单运输延迟提醒
订单号:{order_no}
最后更新:{last_update}
已超过24小时未更新状态
请及时跟进处理
"""
self.alert_service.send_alert(
title="订单运输延迟",
message=notification,
level="MEDIUM",
recipients=["customer_service@company.com"]
)八、故障排查与优化
8.1 常见问题解决
问题1:签名验证失败
症状:返回"签名错误"或"签名验证失败"
# 解决方案:调试签名生成过程def debug_signature(params, app_secret): """调试签名生成"""
print("=== 签名调试信息 ===") print(f"原始参数: {params}")
# 1. 过滤并排序参数
filtered = {k: v for k, v in params.items() if v is not None and k != 'sign'}
sorted_keys = sorted(filtered.keys())
print(f"排序后的键: {sorted_keys}")
# 2. 拼接字符串
sign_str = ''
for key in sorted_keys:
value = str(filtered[key])
sign_str += f"{key}{value}"
print(f" {key}: {value}")
sign_str += app_secret print(f"拼接结果: {sign_str}")
# 3. 计算签名
import hashlib
signature = hashlib.md5(sign_str.encode('utf-8')).hexdigest().upper() print(f"计算签名: {signature}")
return signature问题2:分页数据不准确
症状:分页数据重复或缺失
# 解决方案:使用稳定的排序字段def stable_pagination_search(self, **params): """
稳定的分页搜索
"""
# 确保有排序字段
if 'sort_field' not in params:
params['sort_field'] = 'order_no' # 使用唯一字段排序
if 'sort_order' not in params:
params['sort_order'] = 'asc'
# 处理游标分页
last_order_no = None
all_orders = []
while True: if last_order_no: # 使用游标进行分页
params['cursor'] = last_order_no
result = self.search_orders(**params)
if not result.get('success'): break
orders = result.get('data', {}).get('list', []) if not orders: break
all_orders.extend(orders)
last_order_no = orders[-1].get('order_no')
# 检查是否还有更多数据
pagination = result.get('data', {}).get('pagination', {}) if not pagination.get('has_next', False): break
# 避免频繁请求
time.sleep(0.1)
return all_orders问题3:API限流处理
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exceptionclass RateLimitedClient: """带限流处理的客户端"""
def __init__(self, api_client): self.api_client = api_client self.rate_limiter = RateLimiter(max_calls=100, period=60) # 60秒100次
def is_rate_limit_error(self, result): """判断是否为限流错误"""
return result.get('code') == '20005' # 频率限制错误码
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception(lambda x: isinstance(x, dict) and
x.get('code') == '20005') )
def search_with_retry(self, **params): """带重试的搜索"""
with self.rate_limiter:
result = self.api_client.search_orders(**params)
if self.is_rate_limit_error(result): raise result # 触发重试
return result8.2 性能优化建议
- 合理使用缓存
# 多级缓存策略class MultiLevelCache: def __init__(self): self.memory_cache = {} # 内存缓存
self.redis_cache = redis.Redis() # Redis缓存
self.local_ttl = 60 # 内存缓存60秒
self.redis_ttl = 300 # Redis缓存5分钟
def get(self, key): # 1. 检查内存缓存
if key in self.memory_cache:
item = self.memory_cache[key] if time.time() < item['expire']: return item['data']
# 2. 检查Redis缓存
cached = self.redis_cache.get(key) if cached:
data = json.loads(cached) # 更新内存缓存
self.memory_cache[key] = { 'data': data, 'expire': time.time() + self.local_ttl
} return data
return None- 连接池管理
import requestsfrom requests.adapters import HTTPAdapterfrom urllib3.util.retry import Retrydef create_session_with_pool(): """创建带连接池的会话"""
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10, # 连接池大小
pool_maxsize=100,
pool_block=False
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session- 批量请求优化
from concurrent.futures import ThreadPoolExecutor, as_completedimport asyncioasync def batch_search_async(search_queries): """异步批量搜索""" async with aiohttp.ClientSession() as session: tasks = [] for query in search_queries: task = self._async_search(session, query) tasks.append(task) results = await asyncio.gather(*tasks, return_exceptions=True) return results
九、监控与告警
9.1 监控指标
class APIMonitor: """API监控"""
def __init__(self): self.metrics = { 'total_requests': 0, 'successful_requests': 0, 'failed_requests': 0, 'average_response_time': 0, 'error_codes': {}, 'last_error_time': None
}
def record_request(self, success, response_time, error_code=None): """记录请求指标"""
self.metrics['total_requests'] += 1
if success: self.metrics['successful_requests'] += 1
else: self.metrics['failed_requests'] += 1
if error_code: self.metrics['error_codes'][error_code] = \ self.metrics['error_codes'].get(error_code, 0) + 1
self.metrics['last_error_time'] = time.time()
# 更新平均响应时间(移动平均)
alpha = 0.1 # 平滑因子
old_avg = self.metrics['average_response_time'] self.metrics['average_response_time'] = \
alpha * response_time + (1 - alpha) * old_avg
def get_health_status(self): """获取健康状态"""
total = self.metrics['total_requests'] if total == 0: return "UNKNOWN"
success_rate = self.metrics['successful_requests'] / total
if success_rate > 0.95: return "HEALTHY"
elif success_rate > 0.8: return "DEGRADED"
else: return "UNHEALTHY"9.2 告警配置
# alert_rules.yamlalert_rules: - name: "api_error_rate_high" condition: "error_rate > 0.1" duration: "5m" severity: "warning" message: "API错误率超过10%" - name: "api_response_time_slow" condition: "response_time_p95 > 5000" duration: "10m" severity: "warning" message: "API响应时间P95超过5秒" - name: "api_unavailable" condition: "success_rate == 0" duration: "2m" severity: "critical" message: "API完全不可用"
十、最佳实践总结
10.1 安全实践
- 密钥管理:使用环境变量或密钥管理服务,不要硬编码在代码中
- 访问控制:为不同环境使用不同的API密钥
- 请求验证:验证所有输入参数,防止注入攻击
- HTTPS强制:确保所有请求都使用HTTPS
10.2 性能实践
- 合理分页:根据实际需求设置合适的page_size
- 缓存策略:对不常变的数据使用缓存
- 连接复用:使用连接池减少连接建立开销
- 异步处理:批量操作使用异步方式提高吞吐量
10.3 代码质量
- 错误处理:完善的异常处理和重试机制
- 日志记录:详细记录请求和响应信息
- 单元测试:编写测试用例覆盖主要功能
- 代码审查:定期进行代码审查和安全检查
10.4 运维实践
- 监控告警:实时监控API调用情况
- 容量规划:根据业务量预估API调用频率
- 版本管理:记录API版本变更,制定升级计划
- 文档维护:保持接口文档的及时更新
附录:快速开始模板
# quick_start.pyfrom jc_logistics import JCLogisticsSearchAPI# 1. 初始化客户端client = JCLogisticsSearchAPI(
app_key="your_app_key",
app_secret="your_app_secret",
sandbox=True # 测试环境)# 2. 简单搜索result = client.search_orders(keyword="测试订单")print(f"找到 {len(result['data']['list'])} 条记录")# 3. 条件搜索result = client.search_orders(
status_list=["DELIVERED"],
start_time="2026-01-01 00:00:00",
end_time="2026-02-01 23:59:59",
page_size=50)# 4. 获取所有数据all_orders = client.search_all_orders(
keyword="重要客户",
status_list=["IN_TRANSIT", "DELIVERED"]
)print(f"总共找到 {len(all_orders)} 条订单")通过本攻略,您应该能够全面掌握锦程物流
item_search接口的对接和使用。建议根据实际业务需求选择合适的实现方案,并遵循最佳实践确保系统的稳定性和可维护性。