一、接口概览
1.1 接口简介
item_search接口是懂车帝开放平台的核心搜索接口,支持多维度车辆检索,包括新车、二手车、车型对比、资讯搜索等。该接口返回结构化的搜索结果,支持分页、排序、字段筛选等高级功能。1.2 核心功能
- ✅ 多类型搜索:新车、二手车、车型、资讯、视频等
- ✅ 多条件筛选:品牌、价格、级别、排量、配置等
- ✅ 智能排序:按相关性、价格、销量、评分等排序
- ✅ 分页查询:支持大数据量的分页加载
- ✅ 字段选择:可指定返回字段,优化网络传输
二、准备工作
2.1 环境配置
# requirements.txtrequests>=2.28.0python-dotenv>=1.0.0pydantic>=2.0.0aiohttp>=3.8.0redis>=4.5.0
2.2 认证配置
# config.pyimport osfrom dotenv import load_dotenv
load_dotenv()class DongchediConfig: # 懂车帝API配置
DONGCHEDI_APP_KEY = os.getenv('DONGCHEDI_APP_KEY')
DONGCHEDI_APP_SECRET = os.getenv('DONGCHEDI_APP_SECRET')
DONGCHEDI_API_BASE = os.getenv('DONGCHEDI_API_BASE',
'https://openapi.dongchedi.com/api/v1'
)
# 请求配置
REQUEST_TIMEOUT = 30
MAX_RETRIES = 3
DEFAULT_PAGE_SIZE = 20
MAX_PAGE_SIZE = 100
# 缓存配置
CACHE_TTL = 3600 # 1小时
SEARCH_CACHE_TTL = 1800 # 30分钟三、接口详解
3.1 接口地址
GET /search
3.2 请求参数详解
公共参数
参数名 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
app_key | string | 是 | 应用标识 | dcd_app_2024 |
timestamp | int | 是 | 时间戳 | 1706774400 |
sign | string | 是 | 请求签名 | 详见签名算法 |
format | string | 否 | 返回格式 | json(默认) |
version | string | 是 | API版本 | 1.0 |
搜索参数
参数名 | 类型 | 必填 | 说明 | 示例 |
|---|---|---|---|---|
q | string | 否 | 搜索关键词 | "宝马3系" |
search_type | string | 否 | 搜索类型 | new_car/used_car/model/news/video |
brand_id | int | 否 | 品牌ID | 2(宝马) |
series_id | int | 否 | 车系ID | 20(3系) |
price_min | float | 否 | 最低价格(万元) | 20.0 |
price_max | float | 否 | 最高价格(万元) | 50.0 |
level | string | 否 | 车辆级别 | A级/B级/C级/SUV/MPV |
fuel_type | string | 否 | 燃油类型 | gasoline/diesel/hybrid/electric |
transmission | string | 否 | 变速箱 | manual/automatic/cvt/dct |
displacement_min | float | 否 | 最小排量(L) | 1.5 |
displacement_max | float | 否 | 最大排量(L) | 3.0 |
year_min | int | 否 | 最小年份 | 2020 |
year_max | int | 否 | 最大年份 | 2023 |
region | string | 否 | 地区 | 北京/上海/广州 |
sort_by | string | 否 | 排序字段 | relevance/price/sales/rating |
sort_order | string | 否 | 排序方向 | asc/desc(默认desc) |
page | int | 否 | 页码 | 1(默认) |
per_page | int | 否 | 每页条数 | 20(默认) |
fields | string | 否 | 返回字段 | id,title,price,brand,rating |
include_facets | bool | 否 | 是否包含聚合信息 | true |
四、完整代码实现
4.1 Python完整实现
import requestsimport timeimport hashlibimport hmacimport jsonfrom typing import Dict, Any, List, Optionalfrom datetime import datetime, timedeltafrom dataclasses import dataclassfrom urllib.parse import urlencodeimport redis@dataclassclass SearchResultItem: """搜索结果项"""
id: int
type: str # new_car, used_car, model, news, video
title: str
description: str
url: str
image_url: str
price: Optional[float] = None
original_price: Optional[float] = None
brand: Optional[str] = None
series: Optional[str] = None
model: Optional[str] = None
year: Optional[int] = None
fuel_type: Optional[str] = None
transmission: Optional[str] = None
rating: Optional[float] = None
review_count: Optional[int] = None
sales: Optional[int] = None
publish_time: Optional[str] = None
view_count: Optional[int] = None
like_count: Optional[int] = None@dataclassclass SearchFacet: """搜索聚合信息"""
field: str
values: List[Dict[str, Any]] # [{value: "宝马", count: 100}, ...]@dataclassclass SearchResult: """搜索结果"""
success: bool
code: int
message: str
data: Dict[str, Any]
items: List[SearchResultItem]
pagination: Dict[str, Any]
facets: List[SearchFacet]
total_count: int
search_time: floatclass DongchediSearchAPI: """懂车帝搜索API客户端"""
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
def __init__(self, app_key: str, app_secret: str, sandbox: bool = True, redis_client=None): self.app_key = app_key self.app_secret = app_secret self.base_url = "https://sandbox-openapi.dongchedi.com" if sandbox else "https://openapi.dongchedi.com"
self.session = requests.Session() self.session.headers.update({ 'User-Agent': 'Dongchedi-Search-API/1.0', 'Accept': 'application/json'
}) self.redis = redis_client self._access_token = None
self._token_expires = None
def _generate_signature(self, params: Dict[str, Any], timestamp: int) -> str: """生成请求签名"""
# 过滤并排序参数
filtered_params = {
k: v for k, v in params.items()
if v is not None and k != 'sign'
}
sorted_keys = sorted(filtered_params.keys())
# 拼接参数字符串
sign_str = ''
for key in sorted_keys: if isinstance(filtered_params[key], (list, dict)):
value = json.dumps(filtered_params[key], separators=(',', ':')) else:
value = str(filtered_params[key])
sign_str += f"{key}{value}"
# 添加app_secret
sign_str += self.app_secret
# 计算HMAC-SHA256签名
signature = hmac.new( self.app_secret.encode('utf-8'),
sign_str.encode('utf-8'),
hashlib.sha256
).hexdigest()
return signature
def _get_access_token(self) -> str: """获取访问令牌"""
# 检查token是否有效
if self._access_token and self._token_expires and self._token_expires > datetime.now(): return self._access_token
# 获取新token
timestamp = int(time.time())
params = { 'app_key': self.app_key, 'timestamp': timestamp, 'grant_type': 'client_credentials'
}
# 生成签名
signature = self._generate_signature(params, timestamp)
params['sign'] = signature
# 请求token
url = f"{self.base_url}/oauth/token"
response = self.session.post(url, data=params)
if response.status_code == 200:
data = response.json() self._access_token = data['access_token'] self._token_expires = datetime.now() + timedelta(seconds=data['expires_in'] - 300) # 提前5分钟过期
return self._access_token else: raise Exception(f"获取token失败: {response.status_code} - {response.text}")
def search(
self,
query: Optional[str] = None,
search_type: Optional[str] = None,
brand_id: Optional[int] = None,
series_id: Optional[int] = None,
price_min: Optional[float] = None,
price_max: Optional[float] = None,
level: Optional[str] = None,
fuel_type: Optional[str] = None,
transmission: Optional[str] = None,
displacement_min: Optional[float] = None,
displacement_max: Optional[float] = None,
year_min: Optional[int] = None,
year_max: Optional[int] = None,
region: Optional[str] = None,
sort_by: str = "relevance",
sort_order: str = "desc",
page: int = 1,
per_page: int = 20,
fields: Optional[List[str]] = None,
include_facets: bool = False
) -> SearchResult: """
执行搜索
Args:
query: 搜索关键词
search_type: 搜索类型
brand_id: 品牌ID
series_id: 车系ID
price_min: 最低价格
price_max: 最高价格
level: 车辆级别
fuel_type: 燃油类型
transmission: 变速箱类型
displacement_min: 最小排量
displacement_max: 最大排量
year_min: 最小年份
year_max: 最大年份
region: 地区
sort_by: 排序字段
sort_order: 排序方向
page: 页码
per_page: 每页条数
fields: 返回字段列表
include_facets: 是否包含聚合信息
Returns:
搜索结果
"""
# 获取访问令牌
access_token = self._get_access_token()
# 构建请求参数
params = { 'app_key': self.app_key, 'timestamp': int(time.time()), 'format': 'json', 'version': '1.0', 'sort_by': sort_by, 'sort_order': sort_order, 'page': page, 'per_page': min(per_page, DongchediConfig.MAX_PAGE_SIZE)
}
# 添加可选参数
if query:
params['q'] = query if search_type:
params['search_type'] = search_type if brand_id:
params['brand_id'] = brand_id if series_id:
params['series_id'] = series_id if price_min:
params['price_min'] = price_min if price_max:
params['price_max'] = price_max if level:
params['level'] = level if fuel_type:
params['fuel_type'] = fuel_type if transmission:
params['transmission'] = transmission if displacement_min:
params['displacement_min'] = displacement_min if displacement_max:
params['displacement_max'] = displacement_max if year_min:
params['year_min'] = year_min if year_max:
params['year_max'] = year_max if region:
params['region'] = region if fields:
params['fields'] = ','.join(fields) if include_facets:
params['include_facets'] = 'true'
# 生成签名
signature = self._generate_signature(params, params['timestamp'])
params['sign'] = signature
# 添加认证头
headers = { 'Authorization': f'Bearer {access_token}', 'Content-Type': 'application/json'
}
# 发送请求
url = f"{self.base_url}/api/v1/search"
try:
start_time = time.time()
response = self.session.get(
url,
params=params,
headers=headers,
timeout=DongchediConfig.REQUEST_TIMEOUT
)
search_time = time.time() - start_time
if response.status_code == 200:
result = response.json()
# 解析结果
items = self._parse_search_items(result.get('data', {}).get('items', []))
pagination = result.get('data', {}).get('pagination', {})
facets = self._parse_facets(result.get('data', {}).get('facets', []))
total_count = pagination.get('total_count', 0)
return SearchResult(
success=result.get('success', False),
code=result.get('code', 0),
message=result.get('message', ''),
data=result.get('data', {}),
items=items,
pagination=pagination,
facets=facets,
total_count=total_count,
search_time=search_time
) elif response.status_code == 401: # Token过期,重新获取
self._access_token = None
return self.search(
query=query, search_type=search_type, brand_id=brand_id, series_id=series_id,
price_min=price_min, price_max=price_max, level=level, fuel_type=fuel_type,
transmission=transmission, displacement_min=displacement_min, displacement_max=displacement_max,
year_min=year_min, year_max=year_max, region=region, sort_by=sort_by, sort_order=sort_order,
page=page, per_page=per_page, fields=fields, include_facets=include_facets
) else: return SearchResult(
success=False,
code=response.status_code,
message=f"HTTP {response.status_code}",
data={},
items=[],
pagination={},
facets=[],
total_count=0,
search_time=0
)
except requests.exceptions.Timeout: return SearchResult(
success=False,
code=408,
message="请求超时",
data={},
items=[],
pagination={},
facets=[],
total_count=0,
search_time=0
) except requests.exceptions.RequestException as e: return SearchResult(
success=False,
code=500,
message=f"网络请求异常: {str(e)}",
data={},
items=[],
pagination={},
facets=[],
total_count=0,
search_time=0
)
def _parse_search_items(self, items_data: List[Dict[str, Any]]) -> List[SearchResultItem]: """解析搜索结果项"""
items = []
for item_data in items_data: try:
item = SearchResultItem( id=item_data.get('id'), type=item_data.get('type', ''),
title=item_data.get('title', ''),
description=item_data.get('description', ''),
url=item_data.get('url', ''),
image_url=item_data.get('image_url', ''),
price=item_data.get('price'),
original_price=item_data.get('original_price'),
brand=item_data.get('brand'),
series=item_data.get('series'),
model=item_data.get('model'),
year=item_data.get('year'),
fuel_type=item_data.get('fuel_type'),
transmission=item_data.get('transmission'),
rating=item_data.get('rating'),
review_count=item_data.get('review_count'),
sales=item_data.get('sales'),
publish_time=item_data.get('publish_time'),
view_count=item_data.get('view_count'),
like_count=item_data.get('like_count')
)
items.append(item) except Exception as e: print(f"解析搜索结果项失败: {e}, 数据: {item_data}") continue
return items
def _parse_facets(self, facets_data: List[Dict[str, Any]]) -> List[SearchFacet]: """解析聚合信息"""
facets = []
for facet_data in facets_data:
facet = SearchFacet(
field=facet_data.get('field', ''),
values=facet_data.get('values', [])
)
facets.append(facet)
return facets
def search_all(
self,
max_pages: int = 10,
**search_params ) -> List[SearchResultItem]: """
获取所有符合条件的搜索结果(自动处理分页)
Args:
max_pages: 最大页数限制
**search_params: 搜索参数
Returns:
所有搜索结果项
"""
all_items = []
page = 1
while page <= max_pages:
result = self.search(page=page, **search_params)
if not result.success: print(f"第{page}页查询失败: {result.message}") break
# 添加当前页数据
all_items.extend(result.items)
pagination = result.pagination
print(f"已获取第{page}页,共{len(result.items)}条,总计{len(all_items)}条")
# 检查是否还有下一页
has_next = pagination.get('has_next', False)
total_pages = pagination.get('total_pages', 0)
if not has_next or page >= total_pages: break
page += 1
# 避免请求过于频繁
time.sleep(0.5)
return all_items
def search_new_cars(
self,
query: Optional[str] = None,
brand_id: Optional[int] = None,
price_min: Optional[float] = None,
price_max: Optional[float] = None,
level: Optional[str] = None,
**kwargs ) -> SearchResult: """
搜索新车
"""
return self.search(
query=query,
search_type='new_car',
brand_id=brand_id,
price_min=price_min,
price_max=price_max,
level=level,
**kwargs
)
def search_used_cars(
self,
query: Optional[str] = None,
price_min: Optional[float] = None,
price_max: Optional[float] = None,
year_min: Optional[int] = None,
year_max: Optional[int] = None,
region: Optional[str] = None,
**kwargs ) -> SearchResult: """
搜索二手车
"""
return self.search(
query=query,
search_type='used_car',
price_min=price_min,
price_max=price_max,
year_min=year_min,
year_max=year_max,
region=region,
**kwargs
)
def search_models(
self,
query: Optional[str] = None,
brand_id: Optional[int] = None,
series_id: Optional[int] = None,
**kwargs ) -> SearchResult: """
搜索车型
"""
return self.search(
query=query,
search_type='model',
brand_id=brand_id,
series_id=series_id,
**kwargs
)
def search_news(
self,
query: Optional[str] = None,
**kwargs ) -> SearchResult: """
搜索资讯
"""
return self.search(
query=query,
search_type='news',
**kwargs
)
def search_videos(
self,
query: Optional[str] = None,
**kwargs ) -> SearchResult: """
搜索视频
"""
return self.search(
query=query,
search_type='video',
**kwargs
)# 使用示例def demo_search_api(): """搜索API使用演示"""
# 初始化客户端
client = DongchediSearchAPI(
app_key=DongchediConfig.DONGCHEDI_APP_KEY,
app_secret=DongchediConfig.DONGCHEDI_APP_SECRET,
sandbox=True
)
print("=== 示例1:新车搜索 ===")
result = client.search_new_cars(
query="宝马3系",
price_min=25,
price_max=40,
per_page=5
)
if result.success: for item in result.items: print(f"{item.brand} {item.model} - {item.price}万 - 评分{item.rating}")
print("\n=== 示例2:二手车搜索 ===")
result = client.search_used_cars(
query="奥迪A4L",
price_min=15,
price_max=25,
year_min=2018,
year_max=2021,
region="北京"
)
print("\n=== 示例3:车型搜索 ===")
result = client.search_models(
query="SUV",
level="SUV",
price_min=20,
price_max=50,
include_facets=True
)
if result.success: print(f"找到 {result.total_count} 个车型") for facet in result.facets: if facet.field == 'brand': print("品牌分布:") for value in facet.values[:5]: print(f" {value['value']}: {value['count']}个")if __name__ == "__main__":
demo_search_api()4.2 Java实现
import com.fasterxml.jackson.annotation.JsonInclude;import com.fasterxml.jackson.databind.ObjectMapper;import okhttp3.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.crypto.Mac;import javax.crypto.spec.SecretKeySpec;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.security.InvalidKeyException;import java.security.NoSuchAlgorithmException;import java.time.LocalDateTime;import java.util.*;import java.util.concurrent.TimeUnit;public class DongchediSearchClient { private static final Logger logger = LoggerFactory.getLogger(DongchediSearchClient.class);
private final String appKey; private final String appSecret; private final String baseUrl; private final OkHttpClient httpClient; private final ObjectMapper objectMapper;
private String accessToken; private LocalDateTime tokenExpires;
public DongchediSearchClient(String appKey, String appSecret, boolean sandbox) { this.appKey = appKey; this.appSecret = appSecret; this.baseUrl = sandbox ?
"https://sandbox-openapi.dongchedi.com" :
"https://openapi.dongchedi.com";
this.httpClient = new OkHttpClient.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.build();
this.objectMapper = new ObjectMapper(); this.objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
} # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
public SearchResult search(SearchParams params) throws IOException { // 获取访问令牌
String token = getAccessToken();
// 构建请求URL
HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl + "/api/v1/search").newBuilder();
// 添加公共参数
urlBuilder.addQueryParameter("app_key", appKey);
urlBuilder.addQueryParameter("timestamp", String.valueOf(System.currentTimeMillis() / 1000));
urlBuilder.addQueryParameter("format", "json");
urlBuilder.addQueryParameter("version", "1.0");
// 添加搜索参数
if (params.getQuery() != null) {
urlBuilder.addQueryParameter("q", params.getQuery());
} if (params.getSearchType() != null) {
urlBuilder.addQueryParameter("search_type", params.getSearchType());
} if (params.getBrandId() != null) {
urlBuilder.addQueryParameter("brand_id", params.getBrandId().toString());
} // ... 其他参数
// 生成签名
Map<String, String> signParams = new HashMap<>(); for (String name : urlBuilder.build().queryParameterNames()) {
signParams.put(name, urlBuilder.build().queryParameter(name));
} String signature = generateSignature(signParams, Long.parseLong(signParams.get("timestamp")));
urlBuilder.addQueryParameter("sign", signature);
// 构建请求
Request request = new Request.Builder()
.url(urlBuilder.build())
.addHeader("Authorization", "Bearer " + token)
.addHeader("Content-Type", "application/json")
.addHeader("User-Agent", "Dongchedi-Java-Client/1.0")
.build();
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
// 发送请求
try (Response response = httpClient.newCall(request).execute()) { if (response.isSuccessful()) { String responseBody = response.body().string(); return objectMapper.readValue(responseBody, SearchResult.class);
} else { throw new IOException("请求失败: " + response.code());
}
}
}
// 省略其他方法...}class SearchParams { private String query; private String searchType; private Integer brandId; private Integer seriesId; private Double priceMin; private Double priceMax; private String level; private String fuelType; private String transmission; private Double displacementMin; private Double displacementMax; private Integer yearMin; private Integer yearMax; private String region; private String sortBy = "relevance"; private String sortOrder = "desc"; private Integer page = 1; private Integer perPage = 20; private List<String> fields; private Boolean includeFacets = false;
// 省略getter/setter方法}4.3 PHP实现
<?phpclass DongchediSearchService{ private $appKey; private $appSecret; private $baseUrl; private $accessToken; private $tokenExpires;
public function __construct($appKey, $appSecret, $sandbox = true) { $this->appKey = $appKey; $this->appSecret = $appSecret; $this->baseUrl = $sandbox
? 'https://sandbox-openapi.dongchedi.com'
: 'https://openapi.dongchedi.com';
}
public function search($params = []) { // 获取访问令牌
$token = $this->getAccessToken();
// 构建请求参数
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
$requestParams = [ 'app_key' => $this->appKey, 'timestamp' => time(), 'format' => 'json', 'version' => '1.0'
];
// 合并搜索参数
$requestParams = array_merge($requestParams, $params);
// 生成签名
$signature = $this->generateSignature($requestParams); $requestParams['sign'] = $signature;
// 发送请求
$url = $this->baseUrl . '/api/v1/search?' . http_build_query($requestParams);
$ch = curl_init(); curl_setopt_array($ch, [
CURLOPT_URL => $url,
CURLOPT_RETURNTRANSFER => true,
CURLOPT_TIMEOUT => 30,
CURLOPT_HTTPHEADER => [ 'Authorization: Bearer ' . $token, 'User-Agent: Dongchedi-PHP-Client/1.0'
]
]);
$response = curl_exec($ch); $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch);
if ($httpCode === 200) { return json_decode($response, true);
} else { throw new Exception("请求失败: HTTP {$httpCode}");
}
}
public function searchNewCars($query = null, $brandId = null, $priceMin = null, $priceMax = null, $level = null, $page = 1, $perPage = 20) { $params = [ 'search_type' => 'new_car', 'page' => $page, 'per_page' => $perPage
];
if ($query) $params['q'] = $query; if ($brandId) $params['brand_id'] = $brandId; if ($priceMin) $params['price_min'] = $priceMin; if ($priceMax) $params['price_max'] = $priceMax; if ($level) $params['level'] = $level;
return $this->search($params);
}
// 省略其他搜索方法...
private function getAccessToken() { // 检查token是否有效
if ($this->accessToken && $this->tokenExpires && $this->tokenExpires > time()) { return $this->accessToken;
}
// 获取新token
$timestamp = time(); $params = [ 'app_key' => $this->appKey, 'timestamp'] = $timestamp, 'grant_type' => 'client_credentials'
];
$signature = $this->generateSignature($params); $params['sign'] = $signature;
$ch = curl_init(); curl_setopt_array($ch, [
CURLOPT_URL => $this->baseUrl . '/oauth/token',
CURLOPT_RETURNTRANSFER => true,
CURLOPT_POST => true,
CURLOPT_POSTFIELDS => http_build_query($params),
CURLOPT_TIMEOUT => 30
]);
$response = curl_exec($ch); $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE); curl_close($ch);
if ($httpCode === 200) { $result = json_decode($response, true); $this->accessToken = $result['access_token']; $this->tokenExpires = time() + $result['expires_in'] - 300; // 提前5分钟过期
return $this->accessToken;
} else { throw new Exception("获取token失败: HTTP {$httpCode}");
}
}
private function generateSignature($params) { // 移除sign参数并排序
unset($params['sign']); ksort($params);
// 拼接参数字符串
$paramStr = ''; foreach ($params as $key => $value) { $paramStr .= $key . '=' . $value . '&';
} $paramStr = rtrim($paramStr, '&');
// 构建签名字符串
$signStr = $this->appKey . $paramStr . $params['timestamp'] . $this->appSecret;
// 计算HMAC-SHA256
return hash_hmac('sha256', $signStr, $this->appSecret);
}
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
}// 使用示例try { $service = new DongchediSearchService('your_app_key', 'your_app_secret');
$result = $service->searchNewCars('宝马3系', null, 25, 40);
if ($result['success']) { foreach ($result['data']['items'] as $item) { echo "{$item['brand']} {$item['model']} - {$item['price']}万\n";
}
}
} catch (Exception $e) { echo "错误: " . $e->getMessage() . "\n";
}?>五、返回结果解析
5.1 成功响应示例
{
"success": true,
"code": 200,
"message": "成功",
"data": {
"items": [
{
"id": 12345,
"type": "new_car",
"title": "2023款 宝马3系 325Li M运动套装",
"description": "2023款宝马3系325Li M运动套装,搭载2.0T发动机,最大功率184马力...",
"url": "https://www.dongchedi.com/vehicle/12345",
"image_url": "https://img.dongchedi.com/vehicle/12345.jpg",
"price": 34.99,
"original_price": 34.99,
"brand": "宝马",
"series": "3系",
"model": "325Li M运动套装",
"year": 2023,
"fuel_type": "汽油",
"transmission": "自动",
"rating": 4.5,
"review_count": 1250,
"sales": 500,
"publish_time": "2023-01-15 10:00:00"
},
{
"id": 12346,
"type": "new_car",
"title": "2023款 宝马3系 320Li 时尚型",
"description": "2023款宝马3系320Li时尚型,搭载2.0T发动机,最大功率156马力...",
"url": "https://www.dongchedi.com/vehicle/12346",
"image_url": "https://img.dongchedi.com/vehicle/12346.jpg",
"price": 29.99,
"original_price": 29.99,
"brand": "宝马",
"series": "3系",
"model": "320Li 时尚型",
"year": 2023,
"fuel_type": "汽油",
"transmission": "自动",
"rating": 4.3,
"review_count": 850,
"sales": 300,
"publish_time": "2023-01-15 10:00:00"
}
],
"pagination": {
"page": 1,
"per_page": 20,
"total_count": 125,
"total_pages": 7,
"has_next": true,
"has_previous": false
},
"facets": [
{
"field": "brand",
"values": [
{"value": "宝马", "count": 75},
{"value": "奥迪", "count": 30},
{"value": "奔驰", "count": 20}
]
},
{
"field": "price_range",
"values": [
{"value": "20-30万", "count": 40},
{"value": "30-40万", "count": 60},
{"value": "40-50万", "count": 25}
]
}
]
}}5.2 错误响应示例
{
"success": false,
"code": 400,
"message": "参数错误:price_min不能大于price_max",
"data": null}5.3 状态码说明
状态码 | 说明 | 处理建议 |
|---|---|---|
200 | 成功 | - |
400 | 参数错误 | 检查请求参数格式 |
401 | 认证失败 | 检查API密钥和签名 |
403 | 权限不足 | 检查API权限范围 |
404 | 数据不存在 | 检查搜索条件 |
429 | 请求频率超限 | 降低请求频率 |
500 | 服务器错误 | 稍后重试 |
六、高级功能实现
6.1 智能搜索建议
class IntelligentSearchService: """智能搜索服务"""
def __init__(self, api_client): self.client = api_client self.search_history = []
def smart_search(self, query: str, search_type: str = "auto") -> SearchResult: """
智能搜索:自动识别搜索类型和参数
Args:
query: 搜索查询字符串
search_type: 搜索类型(auto自动识别)
Returns:
搜索结果
"""
# 自动识别搜索类型
if search_type == "auto":
detected_type = self._detect_search_type(query) else:
detected_type = search_type
# 解析查询参数
search_params = self._parse_query_params(query, detected_type)
# 执行搜索
result = self.client.search(search_type=detected_type, **search_params)
# 记录搜索历史
self._record_search_history(query, detected_type, result)
return result
def _detect_search_type(self, query: str) -> str: """自动识别搜索类型"""
import re
# 检查是否为价格范围
if re.match(r'^\d+-\d+万$', query): return 'new_car'
# 检查是否为车型关键词
car_keywords = ['SUV', 'MPV', '轿车', '跑车', '皮卡'] for keyword in car_keywords: if keyword in query: return 'model'
# 检查是否为二手车相关
used_car_keywords = ['二手车', '二手', '过户', '里程'] for keyword in used_car_keywords: if keyword in query: return 'used_car'
# 检查是否为资讯关键词
news_keywords = ['新闻', '资讯', '报道', '评测'] for keyword in news_keywords: if keyword in query: return 'news'
# 检查是否为视频关键词
video_keywords = ['视频', '试驾', '测评', 'vlog'] for keyword in video_keywords: if keyword in query: return 'video'
# 默认为新车搜索
return 'new_car'
def _parse_query_params(self, query: str, search_type: str) -> Dict[str, Any]: """解析查询参数"""
params = {'query': query} import re
if search_type == 'new_car': # 解析价格范围
price_match = re.search(r'(\d+)-(\d+)万', query) if price_match:
params['price_min'] = float(price_match.group(1))
params['price_max'] = float(price_match.group(2))
# 解析级别
level_mapping = { 'A级': 'A', 'B级': 'B', 'C级': 'C', 'SUV': 'SUV', 'MPV': 'MPV'
} for keyword, level in level_mapping.items(): if keyword in query:
params['level'] = level break
elif search_type == 'used_car': # 解析年份范围
year_match = re.search(r'(\d+)-(\d+)年', query) if year_match:
params['year_min'] = int(year_match.group(1))
params['year_max'] = int(year_match.group(2))
return params6.2 数据缓存优化
import redisfrom functools import lru_cacheclass CachedDongchediSearchAPI(DongchediSearchAPI): """带缓存的懂车帝搜索API"""
def __init__(self, app_key, app_secret, redis_client, sandbox=True): super().__init__(app_key, app_secret, sandbox) self.redis = redis_client self.cache_prefix = "dongchedi:search:"
def search_cached(self, cache_key: str, **params) -> SearchResult: """
带缓存的搜索
"""
# 检查缓存
if self.redis:
cached = self.redis.get(cache_key) if cached:
data = json.loads(cached) return SearchResult(**data)
# 调用API
result = super().search(**params)
# 缓存结果
if self.redis and result.success: # 根据搜索类型设置缓存时间
ttl = self._calculate_ttl(params.get('search_type')) self.redis.setex(
cache_key,
ttl,
json.dumps(result.__dict__)
)
return result
def _calculate_ttl(self, search_type: str) -> int: """根据搜索类型计算缓存时间"""
ttl_config = { 'new_car': 3600, # 1小时
'used_car': 1800, # 30分钟
'model': 7200, # 2小时
'news': 900, # 15分钟
'video': 1800 # 30分钟
} return ttl_config.get(search_type, 1800) # 默认30分钟
def get_cache_key(self, **params) -> str: """生成缓存键"""
# 移除分页参数
cache_params = params.copy()
cache_params.pop('page', None)
cache_params.pop('per_page', None)
# 生成唯一键
param_str = json.dumps(cache_params, sort_keys=True) return f"{self.cache_prefix}{hashlib.md5(param_str.encode()).hexdigest()}"6.3 批量处理优化
from concurrent.futures import ThreadPoolExecutor, as_completedclass BatchSearchProcessor: """批量搜索处理器"""
def __init__(self, api_client): self.client = api_client
def batch_search_by_queries(
self,
search_queries: List[Dict[str, Any]],
max_workers: int = 3
) -> Dict[str, SearchResult]: """
批量执行多个搜索查询
Args:
search_queries: 搜索查询列表
max_workers: 最大并发数
Returns:
查询标识到结果的映射
"""
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor: # 提交所有搜索任务
future_to_query = {
executor.submit(self.client.search, **query): query.get('query', 'unknown') for query in search_queries
}
# 收集结果
for future in as_completed(future_to_query):
query_id = future_to_query[future] try:
result = future.result(timeout=30)
results[query_id] = result except Exception as e: print(f"查询 {query_id} 失败: {e}")
results[query_id] = SearchResult(
success=False,
code=500,
message=str(e),
data={},
items=[],
pagination={},
facets=[],
total_count=0,
search_time=0
)
return results
def analyze_search_trends(
self,
search_results: Dict[str, SearchResult] ) -> Dict[str, Any]: """
分析搜索趋势
"""
analysis = { 'total_queries': len(search_results), 'success_rate': 0, 'avg_search_time': 0, 'total_items': 0, 'top_brands': [], 'price_distribution': {}
}
successful_searches = 0
total_search_time = 0
brand_counts = {}
price_ranges = {}
for query_id, result in search_results.items(): if result.success:
successful_searches += 1
total_search_time += result.search_time
analysis['total_items'] += result.total_count
# 统计品牌分布
for item in result.items: if item.brand:
brand_counts[item.brand] = brand_counts.get(item.brand, 0) + 1
# 统计价格分布
for item in result.items: if item.price:
price_range = self._get_price_range(item.price)
price_ranges[price_range] = price_ranges.get(price_range, 0) + 1
# 计算指标
analysis['success_rate'] = successful_searches / len(search_results) * 100
analysis['avg_search_time'] = total_search_time / successful_searches if successful_searches > 0 else 0
# 获取热门品牌
analysis['top_brands'] = sorted(brand_counts.items(), key=lambda x: x[1], reverse=True)[:5]
analysis['price_distribution'] = price_ranges
return analysis
def _get_price_range(self, price: float) -> str: """获取价格区间"""
if price < 10: return "10万以下"
elif price < 20: return "10-20万"
elif price < 30: return "20-30万"
elif price < 50: return "30-50万"
elif price < 100: return "50-100万"
else: return "100万以上"七、实战应用场景
7.1 汽车比价平台
class CarPriceComparison: """汽车比价平台"""
def __init__(self, api_client): self.client = api_client self.price_history = {}
def compare_prices_by_model(
self,
model: str,
regions: List[str],
year_range: Tuple[int, int] = (2020, 2023) ) -> Dict[str, Any]: """
按车型比较地区价格差异
"""
search_results = {}
for region in regions:
result = self.client.search_new_cars(
query=model,
region=region,
year_min=year_range[0],
year_max=year_range[1],
sort_by="price",
sort_order="asc"
)
if result.success:
search_results[region] = result else: print(f"地区 {region} 搜索失败: {result.message}")
# 分析价格差异
price_analysis = self._analyze_regional_prices(search_results)
return { 'model': model, 'year_range': year_range, 'regions': regions, 'search_results': search_results, 'price_analysis': price_analysis, 'recommendation': self._generate_recommendation(price_analysis)
}
def _analyze_regional_prices(self, search_results: Dict[str, SearchResult]) -> Dict[str, Any]: """分析地区价格差异"""
analysis = { 'regional_prices': {}, 'avg_prices': {}, 'price_differences': {}
}
all_prices = []
for region, result in search_results.items(): if result.success and result.items:
prices = [item.price for item in result.items if item.price] if prices:
avg_price = sum(prices) / len(prices)
analysis['regional_prices'][region] = prices
analysis['avg_prices'][region] = avg_price
all_prices.extend(prices)
# 计算价格差异
if len(analysis['avg_prices']) > 1:
min_price_region = min(analysis['avg_prices'], key=analysis['avg_prices'].get)
max_price_region = max(analysis['avg_prices'], key=analysis['avg_prices'].get)
min_price = analysis['avg_prices'][min_price_region]
max_price = analysis['avg_prices'][max_price_region]
analysis['price_differences'] = { 'min_price_region': min_price_region, 'min_price': min_price, 'max_price_region': max_price_region, 'max_price': max_price, 'max_difference': max_price - min_price, 'difference_rate': (max_price - min_price) / min_price * 100
}
return analysis7.2 汽车推荐系统
class CarRecommender: """汽车推荐系统"""
def __init__(self, api_client): self.client = api_client self.user_preferences = {}
def recommend_cars_by_preferences(
self,
user_prefs: Dict[str, Any],
max_results: int = 10
) -> List[SearchResultItem]: """
根据用户偏好推荐汽车
Args:
user_prefs: 用户偏好
max_results: 最大推荐数量
Returns:
推荐汽车列表
"""
# 构建搜索参数
search_params = { 'search_type': 'new_car', 'sort_by': 'rating', 'sort_order': 'desc', 'per_page': 50
}
# 应用用户偏好
if 'budget' in user_prefs:
budget = user_prefs['budget']
search_params['price_max'] = budget
if 'preferred_brands' in user_prefs: # 对多个品牌进行搜索
pass
if 'vehicle_type' in user_prefs:
search_params['level'] = user_prefs['vehicle_type']
if 'fuel_type' in user_prefs:
search_params['fuel_type'] = user_prefs['fuel_type']
# 执行搜索
result = self.client.search(**search_params)
if not result.success: return []
# 应用推荐算法
scored_cars = [] for item in result.items:
score = self._calculate_recommendation_score(item, user_prefs)
scored_cars.append((item, score))
# 按得分排序
scored_cars.sort(key=lambda x: x[1], reverse=True)
return [car for car, score in scored_cars[:max_results]]
def _calculate_recommendation_score(
self,
car: SearchResultItem,
user_prefs: Dict[str, Any] ) -> float: """计算推荐得分"""
score = 0
# 价格得分
if 'budget' in user_prefs and car.price:
budget = user_prefs['budget']
price_diff = abs(car.price - budget)
price_score = max(0, 100 - price_diff * 2) # 每差1万减2分
score += price_score * 0.3
# 品牌偏好得分
if 'preferred_brands' in user_prefs and car.brand:
preferred_brands = user_prefs['preferred_brands'] if car.brand in preferred_brands:
score += 50
# 评分得分
if car.rating:
score += car.rating * 10 # 4.5分 = 45分
# 销量得分
if car.sales:
sales_score = min(car.sales / 100, 20) # 每100台销量加1分,最多20分
score += sales_score
return score八、故障排查与优化
8.1 常见问题解决
问题1:签名验证失败
def debug_signature_generation(params, app_secret, timestamp): """调试签名生成过程"""
print("=== 签名调试信息 ===")
# 排序参数
filtered_params = {k: v for k, v in params.items() if v is not None and k != 'sign'}
sorted_keys = sorted(filtered_params.keys())
sign_str = ''
for key in sorted_keys: if isinstance(filtered_params[key], (list, dict)):
value = json.dumps(filtered_params[key], separators=(',', ':')) else:
value = str(filtered_params[key])
sign_str += f"{key}{value}"
print(f" {key}: {value}")
sign_str += app_secret print(f"签名字符串: {sign_str}")
# 计算签名
import hmac
signature = hmac.new(
app_secret.encode('utf-8'),
sign_str.encode('utf-8'),
hashlib.sha256
).hexdigest() print(f"计算签名: {signature}")
return signature问题2:分页数据不准确
def stable_pagination_search(self, **params): """稳定的分页搜索"""
# 确保有稳定的排序字段
if 'sort_by' not in params:
params['sort_by'] = 'id' # 使用唯一字段排序
if 'sort_order' not in params:
params['sort_order'] = 'desc'
# 使用游标分页
all_items = []
last_id = None
while True: if last_id: # 使用游标进行分页
params['cursor'] = last_id
result = self.search(**params)
if not result.success or not result.items: break
all_items.extend(result.items)
last_id = result.items[-1].id
# 检查是否还有下一页
pagination = result.pagination if not pagination.get('has_next', False): break
# 避免频繁请求
time.sleep(0.5)
return all_items8.2 性能优化建议
- 合理使用缓存
# 多级缓存策略class MultiLevelCache: def __init__(self, redis_client): self.memory_cache = {} self.redis = redis_client self.memory_ttl = 300 # 5分钟
self.redis_ttl = 1800 # 30分钟
def get_search_data(self, cache_key): # 1. 检查内存缓存
if cache_key in self.memory_cache:
data, expire_time = self.memory_cache[cache_key] if time.time() < expire_time: return data
# 2. 检查Redis缓存
if self.redis:
cached = self.redis.get(cache_key) if cached:
data = json.loads(cached) # 更新内存缓存
self.memory_cache[cache_key] = (data, time.time() + self.memory_ttl) return data
return None- 连接池管理
import requestsfrom requests.adapters import HTTPAdapterfrom urllib3.util.retry import Retrydef create_optimized_session(): """创建优化的会话"""
session = requests.Session()
# 配置重试策略
retry_strategy = Retry(
total=3,
backoff_factor=0.5,
status_forcelist=[429, 500, 502, 503, 504],
)
adapter = HTTPAdapter(
max_retries=retry_strategy,
pool_connections=10,
pool_maxsize=100,
pool_block=False
)
session.mount("https://", adapter)
session.mount("http://", adapter)
return session九、最佳实践总结
9.1 安全实践
- 密钥管理:使用环境变量存储API密钥
- HTTPS强制:确保所有请求使用HTTPS
- 输入验证:验证所有输入参数
- 错误处理:不暴露敏感错误信息
9.2 性能实践
- 缓存策略:根据搜索类型设置合适的缓存时间
- 批量操作:合并多个请求减少API调用次数
- 分页优化:使用游标分页提高稳定性
- 异步处理:批量操作使用异步方式提高吞吐量
9.3 业务实践
- 搜索建议:提供智能搜索建议功能
- 聚合分析:利用facet数据进行统计分析
- 个性化推荐:基于用户行为优化推荐
- 趋势分析:记录和分析搜索趋势
附录:快速开始模板
# quick_start.pyfrom dongchedi_search import DongchediSearchAPI# 1. 初始化客户端client = DongchediSearchAPI(
app_key="your_app_key",
app_secret="your_app_secret",
sandbox=True)# 2. 简单搜索result = client.search(query="宝马3系")if result.success: for item in result.items: print(f"{item.brand} {item.model} - {item.price}万")# 3. 新车搜索result = client.search_new_cars(
query="SUV",
price_min=20,
price_max=50,
level="SUV")# 4. 获取所有数据all_items = client.search_all(
query="奥迪A4L",
search_type="new_car",
max_pages=5)print(f"共找到 {len(all_items)} 条记录")通过本攻略,您应该能够:
- 理解懂车帝
item_search接口的完整功能 - 实现安全的API认证和请求
- 处理各种搜索条件和分页逻辑
- 构建高性能的汽车搜索应用
- 在实际业务中灵活应用该接口
建议根据实际业务需求选择合适的实现方案,并遵循最佳实践确保系统的稳定性和可维护性。