×

懂车帝item_search - 获取懂车帝搜索数据接口对接全攻略:从入门到精通

万邦科技Lex 万邦科技Lex 发表于2026-02-03 09:23:42 浏览26 评论0

抢沙发发表评论

一、接口概览

1.1 接口简介

item_search接口是懂车帝开放平台的核心搜索接口,支持多维度车辆检索,包括新车、二手车、车型对比、资讯搜索等。该接口返回结构化的搜索结果,支持分页、排序、字段筛选等高级功能。

1.2 核心功能

  • 多类型搜索:新车、二手车、车型、资讯、视频等

  • 多条件筛选:品牌、价格、级别、排量、配置等

  • 智能排序:按相关性、价格、销量、评分等排序

  • 分页查询:支持大数据量的分页加载

  • 字段选择:可指定返回字段,优化网络传输

二、准备工作

2.1 环境配置

# requirements.txtrequests>=2.28.0python-dotenv>=1.0.0pydantic>=2.0.0aiohttp>=3.8.0redis>=4.5.0

2.2 认证配置

# config.pyimport osfrom dotenv import load_dotenv

load_dotenv()class DongchediConfig:    # 懂车帝API配置
    DONGCHEDI_APP_KEY = os.getenv('DONGCHEDI_APP_KEY')
    DONGCHEDI_APP_SECRET = os.getenv('DONGCHEDI_APP_SECRET')
    DONGCHEDI_API_BASE = os.getenv('DONGCHEDI_API_BASE', 
        'https://openapi.dongchedi.com/api/v1'
    )    
    # 请求配置
    REQUEST_TIMEOUT = 30
    MAX_RETRIES = 3
    DEFAULT_PAGE_SIZE = 20
    MAX_PAGE_SIZE = 100
    
    # 缓存配置
    CACHE_TTL = 3600  # 1小时
    SEARCH_CACHE_TTL = 1800  # 30分钟

三、接口详解

3.1 接口地址

GET /search

3.2 请求参数详解

公共参数

参数名
类型
必填
说明
示例
app_key
string
应用标识
dcd_app_2024
timestamp
int
时间戳
1706774400
sign
string
请求签名
详见签名算法
format
string
返回格式
json(默认)
version
string
API版本
1.0

搜索参数

参数名
类型
必填
说明
示例
q
string
搜索关键词
"宝马3系"
search_type
string
搜索类型
new_car/used_car/model/news/video
brand_id
int
品牌ID
2(宝马)
series_id
int
车系ID
20(3系)
price_min
float
最低价格(万元)
20.0
price_max
float
最高价格(万元)
50.0
level
string
车辆级别
A级/B级/C级/SUV/MPV
fuel_type
string
燃油类型
gasoline/diesel/hybrid/electric
transmission
string
变速箱
manual/automatic/cvt/dct
displacement_min
float
最小排量(L)
1.5
displacement_max
float
最大排量(L)
3.0
year_min
int
最小年份
2020
year_max
int
最大年份
2023
region
string
地区
北京/上海/广州
sort_by
string
排序字段
relevance/price/sales/rating
sort_order
string
排序方向
asc/desc(默认desc)
page
int
页码
1(默认)
per_page
int
每页条数
20(默认)
fields
string
返回字段
id,title,price,brand,rating
include_facets
bool
是否包含聚合信息
true

四、完整代码实现

4.1 Python完整实现

import requestsimport timeimport hashlibimport hmacimport jsonfrom typing import Dict, Any, List, Optionalfrom datetime import datetime, timedeltafrom dataclasses import dataclassfrom urllib.parse import urlencodeimport redis@dataclassclass SearchResultItem:    """搜索结果项"""
    id: int
    type: str  # new_car, used_car, model, news, video
    title: str
    description: str
    url: str
    image_url: str
    price: Optional[float] = None
    original_price: Optional[float] = None
    brand: Optional[str] = None
    series: Optional[str] = None
    model: Optional[str] = None
    year: Optional[int] = None
    fuel_type: Optional[str] = None
    transmission: Optional[str] = None
    rating: Optional[float] = None
    review_count: Optional[int] = None
    sales: Optional[int] = None
    publish_time: Optional[str] = None
    view_count: Optional[int] = None
    like_count: Optional[int] = None@dataclassclass SearchFacet:    """搜索聚合信息"""
    field: str
    values: List[Dict[str, Any]]  # [{value: "宝马", count: 100}, ...]@dataclassclass SearchResult:    """搜索结果"""
    success: bool
    code: int
    message: str
    data: Dict[str, Any]
    items: List[SearchResultItem]
    pagination: Dict[str, Any]
    facets: List[SearchFacet]
    total_count: int
    search_time: floatclass DongchediSearchAPI:    """懂车帝搜索API客户端"""
    # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
    def __init__(self, app_key: str, app_secret: str, sandbox: bool = True, redis_client=None):        self.app_key = app_key        self.app_secret = app_secret        self.base_url = "https://sandbox-openapi.dongchedi.com" if sandbox else "https://openapi.dongchedi.com"
        self.session = requests.Session()        self.session.headers.update({            'User-Agent': 'Dongchedi-Search-API/1.0',            'Accept': 'application/json'
        })        self.redis = redis_client        self._access_token = None
        self._token_expires = None
    
    def _generate_signature(self, params: Dict[str, Any], timestamp: int) -> str:        """生成请求签名"""
        # 过滤并排序参数
        filtered_params = {
            k: v for k, v in params.items() 
            if v is not None and k != 'sign'
        }
        sorted_keys = sorted(filtered_params.keys())        
        # 拼接参数字符串
        sign_str = ''
        for key in sorted_keys:            if isinstance(filtered_params[key], (list, dict)):
                value = json.dumps(filtered_params[key], separators=(',', ':'))            else:
                value = str(filtered_params[key])
            sign_str += f"{key}{value}"
        
        # 添加app_secret
        sign_str += self.app_secret        
        # 计算HMAC-SHA256签名
        signature = hmac.new(            self.app_secret.encode('utf-8'),
            sign_str.encode('utf-8'),
            hashlib.sha256
        ).hexdigest()        
        return signature    
    def _get_access_token(self) -> str:        """获取访问令牌"""
        # 检查token是否有效
        if self._access_token and self._token_expires and self._token_expires > datetime.now():            return self._access_token        
        # 获取新token
        timestamp = int(time.time())
        params = {            'app_key': self.app_key,            'timestamp': timestamp,            'grant_type': 'client_credentials'
        }        
        # 生成签名
        signature = self._generate_signature(params, timestamp)
        params['sign'] = signature        
        # 请求token
        url = f"{self.base_url}/oauth/token"
        response = self.session.post(url, data=params)        
        if response.status_code == 200:
            data = response.json()            self._access_token = data['access_token']            self._token_expires = datetime.now() + timedelta(seconds=data['expires_in'] - 300)  # 提前5分钟过期
            return self._access_token        else:            raise Exception(f"获取token失败: {response.status_code} - {response.text}")    
    def search(
        self,
        query: Optional[str] = None,
        search_type: Optional[str] = None,
        brand_id: Optional[int] = None,
        series_id: Optional[int] = None,
        price_min: Optional[float] = None,
        price_max: Optional[float] = None,
        level: Optional[str] = None,
        fuel_type: Optional[str] = None,
        transmission: Optional[str] = None,
        displacement_min: Optional[float] = None,
        displacement_max: Optional[float] = None,
        year_min: Optional[int] = None,
        year_max: Optional[int] = None,
        region: Optional[str] = None,
        sort_by: str = "relevance",
        sort_order: str = "desc",
        page: int = 1,
        per_page: int = 20,
        fields: Optional[List[str]] = None,
        include_facets: bool = False
    ) -> SearchResult:        """
        执行搜索
        
        Args:
            query: 搜索关键词
            search_type: 搜索类型
            brand_id: 品牌ID
            series_id: 车系ID
            price_min: 最低价格
            price_max: 最高价格
            level: 车辆级别
            fuel_type: 燃油类型
            transmission: 变速箱类型
            displacement_min: 最小排量
            displacement_max: 最大排量
            year_min: 最小年份
            year_max: 最大年份
            region: 地区
            sort_by: 排序字段
            sort_order: 排序方向
            page: 页码
            per_page: 每页条数
            fields: 返回字段列表
            include_facets: 是否包含聚合信息
        
        Returns:
            搜索结果
        """
        # 获取访问令牌
        access_token = self._get_access_token()        
        # 构建请求参数
        params = {            'app_key': self.app_key,            'timestamp': int(time.time()),            'format': 'json',            'version': '1.0',            'sort_by': sort_by,            'sort_order': sort_order,            'page': page,            'per_page': min(per_page, DongchediConfig.MAX_PAGE_SIZE)
        }        
        # 添加可选参数
        if query:
            params['q'] = query        if search_type:
            params['search_type'] = search_type        if brand_id:
            params['brand_id'] = brand_id        if series_id:
            params['series_id'] = series_id        if price_min:
            params['price_min'] = price_min        if price_max:
            params['price_max'] = price_max        if level:
            params['level'] = level        if fuel_type:
            params['fuel_type'] = fuel_type        if transmission:
            params['transmission'] = transmission        if displacement_min:
            params['displacement_min'] = displacement_min        if displacement_max:
            params['displacement_max'] = displacement_max        if year_min:
            params['year_min'] = year_min        if year_max:
            params['year_max'] = year_max        if region:
            params['region'] = region        if fields:
            params['fields'] = ','.join(fields)        if include_facets:
            params['include_facets'] = 'true'
        
        # 生成签名
        signature = self._generate_signature(params, params['timestamp'])
        params['sign'] = signature        
        # 添加认证头
        headers = {            'Authorization': f'Bearer {access_token}',            'Content-Type': 'application/json'
        }        
        # 发送请求
        url = f"{self.base_url}/api/v1/search"
        
        try:
            start_time = time.time()
            response = self.session.get(
                url,
                params=params,
                headers=headers,
                timeout=DongchediConfig.REQUEST_TIMEOUT
            )
            search_time = time.time() - start_time            
            if response.status_code == 200:
                result = response.json()                
                # 解析结果
                items = self._parse_search_items(result.get('data', {}).get('items', []))
                pagination = result.get('data', {}).get('pagination', {})
                facets = self._parse_facets(result.get('data', {}).get('facets', []))
                total_count = pagination.get('total_count', 0)                
                return SearchResult(
                    success=result.get('success', False),
                    code=result.get('code', 0),
                    message=result.get('message', ''),
                    data=result.get('data', {}),
                    items=items,
                    pagination=pagination,
                    facets=facets,
                    total_count=total_count,
                    search_time=search_time
                )            elif response.status_code == 401:                # Token过期,重新获取
                self._access_token = None
                return self.search(
                    query=query, search_type=search_type, brand_id=brand_id, series_id=series_id,
                    price_min=price_min, price_max=price_max, level=level, fuel_type=fuel_type,
                    transmission=transmission, displacement_min=displacement_min, displacement_max=displacement_max,
                    year_min=year_min, year_max=year_max, region=region, sort_by=sort_by, sort_order=sort_order,
                    page=page, per_page=per_page, fields=fields, include_facets=include_facets
                )            else:                return SearchResult(
                    success=False,
                    code=response.status_code,
                    message=f"HTTP {response.status_code}",
                    data={},
                    items=[],
                    pagination={},
                    facets=[],
                    total_count=0,
                    search_time=0
                )                
        except requests.exceptions.Timeout:            return SearchResult(
                success=False,
                code=408,
                message="请求超时",
                data={},
                items=[],
                pagination={},
                facets=[],
                total_count=0,
                search_time=0
            )        except requests.exceptions.RequestException as e:            return SearchResult(
                success=False,
                code=500,
                message=f"网络请求异常: {str(e)}",
                data={},
                items=[],
                pagination={},
                facets=[],
                total_count=0,
                search_time=0
            )    
    def _parse_search_items(self, items_data: List[Dict[str, Any]]) -> List[SearchResultItem]:        """解析搜索结果项"""
        items = []        
        for item_data in items_data:            try:
                item = SearchResultItem(                    id=item_data.get('id'),                    type=item_data.get('type', ''),
                    title=item_data.get('title', ''),
                    description=item_data.get('description', ''),
                    url=item_data.get('url', ''),
                    image_url=item_data.get('image_url', ''),
                    price=item_data.get('price'),
                    original_price=item_data.get('original_price'),
                    brand=item_data.get('brand'),
                    series=item_data.get('series'),
                    model=item_data.get('model'),
                    year=item_data.get('year'),
                    fuel_type=item_data.get('fuel_type'),
                    transmission=item_data.get('transmission'),
                    rating=item_data.get('rating'),
                    review_count=item_data.get('review_count'),
                    sales=item_data.get('sales'),
                    publish_time=item_data.get('publish_time'),
                    view_count=item_data.get('view_count'),
                    like_count=item_data.get('like_count')
                )
                items.append(item)            except Exception as e:                print(f"解析搜索结果项失败: {e}, 数据: {item_data}")                continue
        
        return items    
    def _parse_facets(self, facets_data: List[Dict[str, Any]]) -> List[SearchFacet]:        """解析聚合信息"""
        facets = []        
        for facet_data in facets_data:
            facet = SearchFacet(
                field=facet_data.get('field', ''),
                values=facet_data.get('values', [])
            )
            facets.append(facet)        
        return facets    
    def search_all(
        self,
        max_pages: int = 10,
        **search_params    ) -> List[SearchResultItem]:        """
        获取所有符合条件的搜索结果(自动处理分页)
        
        Args:
            max_pages: 最大页数限制
            **search_params: 搜索参数
        
        Returns:
            所有搜索结果项
        """
        all_items = []
        page = 1
        
        while page <= max_pages:
            result = self.search(page=page, **search_params)            
            if not result.success:                print(f"第{page}页查询失败: {result.message}")                break
            
            # 添加当前页数据
            all_items.extend(result.items)
            pagination = result.pagination            
            print(f"已获取第{page}页,共{len(result.items)}条,总计{len(all_items)}条")            
            # 检查是否还有下一页
            has_next = pagination.get('has_next', False)
            total_pages = pagination.get('total_pages', 0)            
            if not has_next or page >= total_pages:                break
            
            page += 1
            
            # 避免请求过于频繁
            time.sleep(0.5)        
        return all_items    
    def search_new_cars(
        self,
        query: Optional[str] = None,
        brand_id: Optional[int] = None,
        price_min: Optional[float] = None,
        price_max: Optional[float] = None,
        level: Optional[str] = None,
        **kwargs    ) -> SearchResult:        """
        搜索新车
        """
        return self.search(
            query=query,
            search_type='new_car',
            brand_id=brand_id,
            price_min=price_min,
            price_max=price_max,
            level=level,
            **kwargs
        )    
    def search_used_cars(
        self,
        query: Optional[str] = None,
        price_min: Optional[float] = None,
        price_max: Optional[float] = None,
        year_min: Optional[int] = None,
        year_max: Optional[int] = None,
        region: Optional[str] = None,
        **kwargs    ) -> SearchResult:        """
        搜索二手车
        """
        return self.search(
            query=query,
            search_type='used_car',
            price_min=price_min,
            price_max=price_max,
            year_min=year_min,
            year_max=year_max,
            region=region,
            **kwargs
        )    
    def search_models(
        self,
        query: Optional[str] = None,
        brand_id: Optional[int] = None,
        series_id: Optional[int] = None,
        **kwargs    ) -> SearchResult:        """
        搜索车型
        """
        return self.search(
            query=query,
            search_type='model',
            brand_id=brand_id,
            series_id=series_id,
            **kwargs
        )    
    def search_news(
        self,
        query: Optional[str] = None,
        **kwargs    ) -> SearchResult:        """
        搜索资讯
        """
        return self.search(
            query=query,
            search_type='news',
            **kwargs
        )    
    def search_videos(
        self,
        query: Optional[str] = None,
        **kwargs    ) -> SearchResult:        """
        搜索视频
        """
        return self.search(
            query=query,
            search_type='video',
            **kwargs
        )# 使用示例def demo_search_api():    """搜索API使用演示"""
    
    # 初始化客户端
    client = DongchediSearchAPI(
        app_key=DongchediConfig.DONGCHEDI_APP_KEY,
        app_secret=DongchediConfig.DONGCHEDI_APP_SECRET,
        sandbox=True
    )    
    print("=== 示例1:新车搜索 ===")
    result = client.search_new_cars(
        query="宝马3系",
        price_min=25,
        price_max=40,
        per_page=5
    )    
    if result.success:        for item in result.items:            print(f"{item.brand} {item.model} - {item.price}万 - 评分{item.rating}")    
    print("\n=== 示例2:二手车搜索 ===")
    result = client.search_used_cars(
        query="奥迪A4L",
        price_min=15,
        price_max=25,
        year_min=2018,
        year_max=2021,
        region="北京"
    )    
    print("\n=== 示例3:车型搜索 ===")
    result = client.search_models(
        query="SUV",
        level="SUV",
        price_min=20,
        price_max=50,
        include_facets=True
    )    
    if result.success:        print(f"找到 {result.total_count} 个车型")        for facet in result.facets:            if facet.field == 'brand':                print("品牌分布:")                for value in facet.values[:5]:                    print(f"  {value['value']}: {value['count']}个")if __name__ == "__main__":
    demo_search_api()

4.2 Java实现

import com.fasterxml.jackson.annotation.JsonInclude;import com.fasterxml.jackson.databind.ObjectMapper;import okhttp3.*;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import javax.crypto.Mac;import javax.crypto.spec.SecretKeySpec;import java.io.IOException;import java.nio.charset.StandardCharsets;import java.security.InvalidKeyException;import java.security.NoSuchAlgorithmException;import java.time.LocalDateTime;import java.util.*;import java.util.concurrent.TimeUnit;public class DongchediSearchClient {    private static final Logger logger = LoggerFactory.getLogger(DongchediSearchClient.class);    
    private final String appKey;    private final String appSecret;    private final String baseUrl;    private final OkHttpClient httpClient;    private final ObjectMapper objectMapper;    
    private String accessToken;    private LocalDateTime tokenExpires;    
    public DongchediSearchClient(String appKey, String appSecret, boolean sandbox) {        this.appKey = appKey;        this.appSecret = appSecret;        this.baseUrl = sandbox ? 
            "https://sandbox-openapi.dongchedi.com" : 
            "https://openapi.dongchedi.com";        
        this.httpClient = new OkHttpClient.Builder()
                .connectTimeout(30, TimeUnit.SECONDS)
                .readTimeout(30, TimeUnit.SECONDS)
                .writeTimeout(30, TimeUnit.SECONDS)
                .build();        
        this.objectMapper = new ObjectMapper();        this.objectMapper.setSerializationInclusion(JsonInclude.Include.NON_NULL);
    }  # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex  
    public SearchResult search(SearchParams params) throws IOException {        // 获取访问令牌
        String token = getAccessToken();        
        // 构建请求URL
        HttpUrl.Builder urlBuilder = HttpUrl.parse(baseUrl + "/api/v1/search").newBuilder();        
        // 添加公共参数
        urlBuilder.addQueryParameter("app_key", appKey);
        urlBuilder.addQueryParameter("timestamp", String.valueOf(System.currentTimeMillis() / 1000));
        urlBuilder.addQueryParameter("format", "json");
        urlBuilder.addQueryParameter("version", "1.0");        
        // 添加搜索参数
        if (params.getQuery() != null) {
            urlBuilder.addQueryParameter("q", params.getQuery());
        }        if (params.getSearchType() != null) {
            urlBuilder.addQueryParameter("search_type", params.getSearchType());
        }        if (params.getBrandId() != null) {
            urlBuilder.addQueryParameter("brand_id", params.getBrandId().toString());
        }        // ... 其他参数
        
        // 生成签名
        Map<String, String> signParams = new HashMap<>();        for (String name : urlBuilder.build().queryParameterNames()) {
            signParams.put(name, urlBuilder.build().queryParameter(name));
        }        String signature = generateSignature(signParams, Long.parseLong(signParams.get("timestamp")));
        urlBuilder.addQueryParameter("sign", signature);        
        // 构建请求
        Request request = new Request.Builder()
                .url(urlBuilder.build())
                .addHeader("Authorization", "Bearer " + token)
                .addHeader("Content-Type", "application/json")
                .addHeader("User-Agent", "Dongchedi-Java-Client/1.0")
        
              .build(); 
              # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex       
        // 发送请求
        try (Response response = httpClient.newCall(request).execute()) {            if (response.isSuccessful()) {                String responseBody = response.body().string();                return objectMapper.readValue(responseBody, SearchResult.class);
            } else {                throw new IOException("请求失败: " + response.code());
            }
        }
    }    
    // 省略其他方法...}class SearchParams {    private String query;    private String searchType;    private Integer brandId;    private Integer seriesId;    private Double priceMin;    private Double priceMax;    private String level;    private String fuelType;    private String transmission;    private Double displacementMin;    private Double displacementMax;    private Integer yearMin;    private Integer yearMax;    private String region;    private String sortBy = "relevance";    private String sortOrder = "desc";    private Integer page = 1;    private Integer perPage = 20;    private List<String> fields;    private Boolean includeFacets = false;    
    // 省略getter/setter方法}

4.3 PHP实现

<?phpclass DongchediSearchService{    private $appKey;    private $appSecret;    private $baseUrl;    private $accessToken;    private $tokenExpires;    
    public function __construct($appKey, $appSecret, $sandbox = true)    {        $this->appKey = $appKey;        $this->appSecret = $appSecret;        $this->baseUrl = $sandbox 
            ? 'https://sandbox-openapi.dongchedi.com'
            : 'https://openapi.dongchedi.com';
    }    
    public function search($params = [])    {        // 获取访问令牌
        $token = $this->getAccessToken();        
        // 构建请求参数
        # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
        $requestParams = [            'app_key' => $this->appKey,            'timestamp' => time(),            'format' => 'json',            'version' => '1.0'
        ];        
        // 合并搜索参数
        $requestParams = array_merge($requestParams, $params);        
        // 生成签名
        $signature = $this->generateSignature($requestParams);        $requestParams['sign'] = $signature;        
        // 发送请求
        $url = $this->baseUrl . '/api/v1/search?' . http_build_query($requestParams);        
        $ch = curl_init();        curl_setopt_array($ch, [
            CURLOPT_URL => $url,
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_TIMEOUT => 30,
            CURLOPT_HTTPHEADER => [                'Authorization: Bearer ' . $token,                'User-Agent: Dongchedi-PHP-Client/1.0'
            ]
        ]);        
        $response = curl_exec($ch);        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);        curl_close($ch);        
        if ($httpCode === 200) {            return json_decode($response, true);
        } else {            throw new Exception("请求失败: HTTP {$httpCode}");
        }
    }    
    public function searchNewCars($query = null, $brandId = null, $priceMin = null, $priceMax = null, $level = null, $page = 1, $perPage = 20)    {        $params = [            'search_type' => 'new_car',            'page' => $page,            'per_page' => $perPage
        ];        
        if ($query) $params['q'] = $query;        if ($brandId) $params['brand_id'] = $brandId;        if ($priceMin) $params['price_min'] = $priceMin;        if ($priceMax) $params['price_max'] = $priceMax;        if ($level) $params['level'] = $level;        
        return $this->search($params);
    }    
    // 省略其他搜索方法...
    
    private function getAccessToken()    {        // 检查token是否有效
        if ($this->accessToken && $this->tokenExpires && $this->tokenExpires > time()) {            return $this->accessToken;
        }        
        // 获取新token
        $timestamp = time();        $params = [            'app_key' => $this->appKey,            'timestamp'] = $timestamp,            'grant_type' => 'client_credentials'
        ];        
        $signature = $this->generateSignature($params);        $params['sign'] = $signature;        
        $ch = curl_init();        curl_setopt_array($ch, [
            CURLOPT_URL => $this->baseUrl . '/oauth/token',
            CURLOPT_RETURNTRANSFER => true,
            CURLOPT_POST => true,
            CURLOPT_POSTFIELDS => http_build_query($params),
            CURLOPT_TIMEOUT => 30
        ]);        
        $response = curl_exec($ch);        $httpCode = curl_getinfo($ch, CURLINFO_HTTP_CODE);        curl_close($ch);        
        if ($httpCode === 200) {            $result = json_decode($response, true);            $this->accessToken = $result['access_token'];            $this->tokenExpires = time() + $result['expires_in'] - 300; // 提前5分钟过期
            return $this->accessToken;
        } else {            throw new Exception("获取token失败: HTTP {$httpCode}");
        }
    }    
    private function generateSignature($params)    {        // 移除sign参数并排序
        unset($params['sign']);        ksort($params);        
        // 拼接参数字符串
        $paramStr = '';        foreach ($params as $key => $value) {            $paramStr .= $key . '=' . $value . '&';
        }        $paramStr = rtrim($paramStr, '&');        
        // 构建签名字符串
        $signStr = $this->appKey . $paramStr . $params['timestamp'] . $this->appSecret;        
        // 计算HMAC-SHA256
        return hash_hmac('sha256', $signStr, $this->appSecret);
    }
    # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
}// 使用示例try {    $service = new DongchediSearchService('your_app_key', 'your_app_secret');    
    $result = $service->searchNewCars('宝马3系', null, 25, 40);    
    if ($result['success']) {        foreach ($result['data']['items'] as $item) {            echo "{$item['brand']} {$item['model']} - {$item['price']}万\n";
        }
    }
} catch (Exception $e) {    echo "错误: " . $e->getMessage() . "\n";
}?>

五、返回结果解析

5.1 成功响应示例

{
  "success": true,
  "code": 200,
  "message": "成功",
  "data": {
    "items": [
      {
        "id": 12345,
        "type": "new_car",
        "title": "2023款 宝马3系 325Li M运动套装",
        "description": "2023款宝马3系325Li M运动套装,搭载2.0T发动机,最大功率184马力...",
        "url": "https://www.dongchedi.com/vehicle/12345",
        "image_url": "https://img.dongchedi.com/vehicle/12345.jpg",
        "price": 34.99,
        "original_price": 34.99,
        "brand": "宝马",
        "series": "3系",
        "model": "325Li M运动套装",
        "year": 2023,
        "fuel_type": "汽油",
        "transmission": "自动",
        "rating": 4.5,
        "review_count": 1250,
        "sales": 500,
        "publish_time": "2023-01-15 10:00:00"
      },
      {
        "id": 12346,
        "type": "new_car",
        "title": "2023款 宝马3系 320Li 时尚型",
        "description": "2023款宝马3系320Li时尚型,搭载2.0T发动机,最大功率156马力...",
        "url": "https://www.dongchedi.com/vehicle/12346",
        "image_url": "https://img.dongchedi.com/vehicle/12346.jpg",
        "price": 29.99,
        "original_price": 29.99,
        "brand": "宝马",
        "series": "3系",
        "model": "320Li 时尚型",
        "year": 2023,
        "fuel_type": "汽油",
        "transmission": "自动",
        "rating": 4.3,
        "review_count": 850,
        "sales": 300,
        "publish_time": "2023-01-15 10:00:00"
      }
    ],
    "pagination": {
      "page": 1,
      "per_page": 20,
      "total_count": 125,
      "total_pages": 7,
      "has_next": true,
      "has_previous": false
    },
    "facets": [
      {
        "field": "brand",
        "values": [
          {"value": "宝马", "count": 75},
          {"value": "奥迪", "count": 30},
          {"value": "奔驰", "count": 20}
        ]
      },
      {
        "field": "price_range",
        "values": [
          {"value": "20-30万", "count": 40},
          {"value": "30-40万", "count": 60},
          {"value": "40-50万", "count": 25}
        ]
      }
    ]
  }}

5.2 错误响应示例

{
  "success": false,
  "code": 400,
  "message": "参数错误:price_min不能大于price_max",
  "data": null}

5.3 状态码说明

状态码
说明
处理建议
200
成功
-
400
参数错误
检查请求参数格式
401
认证失败
检查API密钥和签名
403
权限不足
检查API权限范围
404
数据不存在
检查搜索条件
429
请求频率超限
降低请求频率
500
服务器错误
稍后重试

六、高级功能实现

6.1 智能搜索建议

class IntelligentSearchService:    """智能搜索服务"""
    
    def __init__(self, api_client):        self.client = api_client        self.search_history = []    
    def smart_search(self, query: str, search_type: str = "auto") -> SearchResult:        """
        智能搜索:自动识别搜索类型和参数
        
        Args:
            query: 搜索查询字符串
            search_type: 搜索类型(auto自动识别)
        
        Returns:
            搜索结果
        """
        # 自动识别搜索类型
        if search_type == "auto":
            detected_type = self._detect_search_type(query)        else:
            detected_type = search_type        
        # 解析查询参数
        search_params = self._parse_query_params(query, detected_type)        
        # 执行搜索
        result = self.client.search(search_type=detected_type, **search_params)        
        # 记录搜索历史
        self._record_search_history(query, detected_type, result)        
        return result    
    def _detect_search_type(self, query: str) -> str:        """自动识别搜索类型"""
        import re        
        # 检查是否为价格范围
        if re.match(r'^\d+-\d+万$', query):            return 'new_car'
        
        # 检查是否为车型关键词
        car_keywords = ['SUV', 'MPV', '轿车', '跑车', '皮卡']        for keyword in car_keywords:            if keyword in query:                return 'model'
        
        # 检查是否为二手车相关
        used_car_keywords = ['二手车', '二手', '过户', '里程']        for keyword in used_car_keywords:            if keyword in query:                return 'used_car'
        
        # 检查是否为资讯关键词
        news_keywords = ['新闻', '资讯', '报道', '评测']        for keyword in news_keywords:            if keyword in query:                return 'news'
        
        # 检查是否为视频关键词
        video_keywords = ['视频', '试驾', '测评', 'vlog']        for keyword in video_keywords:            if keyword in query:                return 'video'
        
        # 默认为新车搜索
        return 'new_car'
    
    def _parse_query_params(self, query: str, search_type: str) -> Dict[str, Any]:        """解析查询参数"""
        params = {'query': query}        import re        
        if search_type == 'new_car':            # 解析价格范围
            price_match = re.search(r'(\d+)-(\d+)万', query)            if price_match:
                params['price_min'] = float(price_match.group(1))
                params['price_max'] = float(price_match.group(2))            
            # 解析级别
            level_mapping = {                'A级': 'A',                'B级': 'B',                'C级': 'C',                'SUV': 'SUV',                'MPV': 'MPV'
            }            for keyword, level in level_mapping.items():                if keyword in query:
                    params['level'] = level                    break
        
        elif search_type == 'used_car':            # 解析年份范围
            year_match = re.search(r'(\d+)-(\d+)年', query)            if year_match:
                params['year_min'] = int(year_match.group(1))
                params['year_max'] = int(year_match.group(2))        
        return params

6.2 数据缓存优化

import redisfrom functools import lru_cacheclass CachedDongchediSearchAPI(DongchediSearchAPI):    """带缓存的懂车帝搜索API"""
    
    def __init__(self, app_key, app_secret, redis_client, sandbox=True):        super().__init__(app_key, app_secret, sandbox)        self.redis = redis_client        self.cache_prefix = "dongchedi:search:"
    
    def search_cached(self, cache_key: str, **params) -> SearchResult:        """
        带缓存的搜索
        """
        # 检查缓存
        if self.redis:
            cached = self.redis.get(cache_key)            if cached:
                data = json.loads(cached)                return SearchResult(**data)        
        # 调用API
        result = super().search(**params)        
        # 缓存结果
        if self.redis and result.success:            # 根据搜索类型设置缓存时间
            ttl = self._calculate_ttl(params.get('search_type'))            self.redis.setex(
                cache_key,
                ttl,
                json.dumps(result.__dict__)
            )        
        return result    
    def _calculate_ttl(self, search_type: str) -> int:        """根据搜索类型计算缓存时间"""
        ttl_config = {            'new_car': 3600,  # 1小时
            'used_car': 1800,  # 30分钟
            'model': 7200,  # 2小时
            'news': 900,  # 15分钟
            'video': 1800  # 30分钟
        }        return ttl_config.get(search_type, 1800)  # 默认30分钟
    
    def get_cache_key(self, **params) -> str:        """生成缓存键"""
        # 移除分页参数
        cache_params = params.copy()
        cache_params.pop('page', None)
        cache_params.pop('per_page', None)        
        # 生成唯一键
        param_str = json.dumps(cache_params, sort_keys=True)        return f"{self.cache_prefix}{hashlib.md5(param_str.encode()).hexdigest()}"

6.3 批量处理优化

from concurrent.futures import ThreadPoolExecutor, as_completedclass BatchSearchProcessor:    """批量搜索处理器"""
    
    def __init__(self, api_client):        self.client = api_client    
    def batch_search_by_queries(
        self,
        search_queries: List[Dict[str, Any]],
        max_workers: int = 3
    ) -> Dict[str, SearchResult]:        """
        批量执行多个搜索查询
        
        Args:
            search_queries: 搜索查询列表
            max_workers: 最大并发数
        
        Returns:
            查询标识到结果的映射
        """
        results = {}        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:            # 提交所有搜索任务
            future_to_query = {
                executor.submit(self.client.search, **query): query.get('query', 'unknown')                for query in search_queries
            }            
            # 收集结果
            for future in as_completed(future_to_query):
                query_id = future_to_query[future]                try:
                    result = future.result(timeout=30)
                    results[query_id] = result                except Exception as e:                    print(f"查询 {query_id} 失败: {e}")
                    results[query_id] = SearchResult(
                        success=False,
                        code=500,
                        message=str(e),
                        data={},
                        items=[],
                        pagination={},
                        facets=[],
                        total_count=0,
                        search_time=0
                    )        
        return results    
    def analyze_search_trends(
        self,
        search_results: Dict[str, SearchResult]    ) -> Dict[str, Any]:        """
        分析搜索趋势
        """
        analysis = {            'total_queries': len(search_results),            'success_rate': 0,            'avg_search_time': 0,            'total_items': 0,            'top_brands': [],            'price_distribution': {}
        }
        
        successful_searches = 0
        total_search_time = 0
        brand_counts = {}
        price_ranges = {}        
        for query_id, result in search_results.items():            if result.success:
                successful_searches += 1
                total_search_time += result.search_time
                analysis['total_items'] += result.total_count                
                # 统计品牌分布
                for item in result.items:                    if item.brand:
                        brand_counts[item.brand] = brand_counts.get(item.brand, 0) + 1
                
                # 统计价格分布
                for item in result.items:                    if item.price:
                        price_range = self._get_price_range(item.price)
                        price_ranges[price_range] = price_ranges.get(price_range, 0) + 1
        
        # 计算指标
        analysis['success_rate'] = successful_searches / len(search_results) * 100
        analysis['avg_search_time'] = total_search_time / successful_searches if successful_searches > 0 else 0
        
        # 获取热门品牌
        analysis['top_brands'] = sorted(brand_counts.items(), key=lambda x: x[1], reverse=True)[:5]
        analysis['price_distribution'] = price_ranges        
        return analysis    
    def _get_price_range(self, price: float) -> str:        """获取价格区间"""
        if price < 10:            return "10万以下"
        elif price < 20:            return "10-20万"
        elif price < 30:            return "20-30万"
        elif price < 50:            return "30-50万"
        elif price < 100:            return "50-100万"
        else:            return "100万以上"

七、实战应用场景

7.1 汽车比价平台

class CarPriceComparison:    """汽车比价平台"""
    
    def __init__(self, api_client):        self.client = api_client        self.price_history = {}    
    def compare_prices_by_model(
        self,
        model: str,
        regions: List[str],
        year_range: Tuple[int, int] = (2020, 2023)    ) -> Dict[str, Any]:        """
        按车型比较地区价格差异
        """
        search_results = {}        
        for region in regions:
            result = self.client.search_new_cars(
                query=model,
                region=region,
                year_min=year_range[0],
                year_max=year_range[1],
                sort_by="price",
                sort_order="asc"
            )            
            if result.success:
                search_results[region] = result            else:                print(f"地区 {region} 搜索失败: {result.message}")        
        # 分析价格差异
        price_analysis = self._analyze_regional_prices(search_results)        
        return {            'model': model,            'year_range': year_range,            'regions': regions,            'search_results': search_results,            'price_analysis': price_analysis,            'recommendation': self._generate_recommendation(price_analysis)
        }    
    def _analyze_regional_prices(self, search_results: Dict[str, SearchResult]) -> Dict[str, Any]:        """分析地区价格差异"""
        analysis = {            'regional_prices': {},            'avg_prices': {},            'price_differences': {}
        }
        
        all_prices = []        
        for region, result in search_results.items():            if result.success and result.items:
                prices = [item.price for item in result.items if item.price]                if prices:
                    avg_price = sum(prices) / len(prices)
                    analysis['regional_prices'][region] = prices
                    analysis['avg_prices'][region] = avg_price
                    all_prices.extend(prices)        
        # 计算价格差异
        if len(analysis['avg_prices']) > 1:
            min_price_region = min(analysis['avg_prices'], key=analysis['avg_prices'].get)
            max_price_region = max(analysis['avg_prices'], key=analysis['avg_prices'].get)
            min_price = analysis['avg_prices'][min_price_region]
            max_price = analysis['avg_prices'][max_price_region]
            
            analysis['price_differences'] = {                'min_price_region': min_price_region,                'min_price': min_price,                'max_price_region': max_price_region,                'max_price': max_price,                'max_difference': max_price - min_price,                'difference_rate': (max_price - min_price) / min_price * 100
            }        
        return analysis

7.2 汽车推荐系统

class CarRecommender:    """汽车推荐系统"""
    
    def __init__(self, api_client):        self.client = api_client        self.user_preferences = {}    
    def recommend_cars_by_preferences(
        self,
        user_prefs: Dict[str, Any],
        max_results: int = 10
    ) -> List[SearchResultItem]:        """
        根据用户偏好推荐汽车
        
        Args:
            user_prefs: 用户偏好
            max_results: 最大推荐数量
        
        Returns:
            推荐汽车列表
        """
        # 构建搜索参数
        search_params = {            'search_type': 'new_car',            'sort_by': 'rating',            'sort_order': 'desc',            'per_page': 50
        }        
        # 应用用户偏好
        if 'budget' in user_prefs:
            budget = user_prefs['budget']
            search_params['price_max'] = budget        
        if 'preferred_brands' in user_prefs:            # 对多个品牌进行搜索
            pass
        
        if 'vehicle_type' in user_prefs:
            search_params['level'] = user_prefs['vehicle_type']        
        if 'fuel_type' in user_prefs:
            search_params['fuel_type'] = user_prefs['fuel_type']        
        # 执行搜索
        result = self.client.search(**search_params)        
        if not result.success:            return []        
        # 应用推荐算法
        scored_cars = []        for item in result.items:
            score = self._calculate_recommendation_score(item, user_prefs)
            scored_cars.append((item, score))        
        # 按得分排序
        scored_cars.sort(key=lambda x: x[1], reverse=True)        
        return [car for car, score in scored_cars[:max_results]]    
    def _calculate_recommendation_score(
        self,
        car: SearchResultItem,
        user_prefs: Dict[str, Any]    ) -> float:        """计算推荐得分"""
        score = 0
        
        # 价格得分
        if 'budget' in user_prefs and car.price:
            budget = user_prefs['budget']
            price_diff = abs(car.price - budget)
            price_score = max(0, 100 - price_diff * 2)  # 每差1万减2分
            score += price_score * 0.3
        
        # 品牌偏好得分
        if 'preferred_brands' in user_prefs and car.brand:
            preferred_brands = user_prefs['preferred_brands']            if car.brand in preferred_brands:
                score += 50
        
        # 评分得分
        if car.rating:
            score += car.rating * 10  # 4.5分 = 45分
        
        # 销量得分
        if car.sales:
            sales_score = min(car.sales / 100, 20)  # 每100台销量加1分,最多20分
            score += sales_score        
        return score

八、故障排查与优化

8.1 常见问题解决

问题1:签名验证失败

def debug_signature_generation(params, app_secret, timestamp):    """调试签名生成过程"""
    print("=== 签名调试信息 ===")    
    # 排序参数
    filtered_params = {k: v for k, v in params.items() if v is not None and k != 'sign'}
    sorted_keys = sorted(filtered_params.keys())
    
    sign_str = ''
    for key in sorted_keys:        if isinstance(filtered_params[key], (list, dict)):
            value = json.dumps(filtered_params[key], separators=(',', ':'))        else:
            value = str(filtered_params[key])
        sign_str += f"{key}{value}"
        print(f"  {key}: {value}")
    
    sign_str += app_secret    print(f"签名字符串: {sign_str}")    
    # 计算签名
    import hmac
    signature = hmac.new(
        app_secret.encode('utf-8'),
        sign_str.encode('utf-8'),
        hashlib.sha256
    ).hexdigest()    print(f"计算签名: {signature}")    
    return signature

问题2:分页数据不准确

def stable_pagination_search(self, **params):    """稳定的分页搜索"""
    # 确保有稳定的排序字段
    if 'sort_by' not in params:
        params['sort_by'] = 'id'  # 使用唯一字段排序
    
    if 'sort_order' not in params:
        params['sort_order'] = 'desc'
    
    # 使用游标分页
    all_items = []
    last_id = None
    
    while True:        if last_id:            # 使用游标进行分页
            params['cursor'] = last_id
        
        result = self.search(**params)        
        if not result.success or not result.items:            break
        
        all_items.extend(result.items)
        last_id = result.items[-1].id
        
        # 检查是否还有下一页
        pagination = result.pagination        if not pagination.get('has_next', False):            break
        
        # 避免频繁请求
        time.sleep(0.5)    
    return all_items

8.2 性能优化建议

  1. 合理使用缓存

# 多级缓存策略class MultiLevelCache:    def __init__(self, redis_client):        self.memory_cache = {}        self.redis = redis_client        self.memory_ttl = 300  # 5分钟
        self.redis_ttl = 1800  # 30分钟
    
    def get_search_data(self, cache_key):        # 1. 检查内存缓存
        if cache_key in self.memory_cache:
            data, expire_time = self.memory_cache[cache_key]            if time.time() < expire_time:                return data        
        # 2. 检查Redis缓存
        if self.redis:
            cached = self.redis.get(cache_key)            if cached:
                data = json.loads(cached)                # 更新内存缓存
                self.memory_cache[cache_key] = (data, time.time() + self.memory_ttl)                return data        
        return None
  1. 连接池管理

import requestsfrom requests.adapters import HTTPAdapterfrom urllib3.util.retry import Retrydef create_optimized_session():    """创建优化的会话"""
    session = requests.Session()    
    # 配置重试策略
    retry_strategy = Retry(
        total=3,
        backoff_factor=0.5,
        status_forcelist=[429, 500, 502, 503, 504],
    )
    
    adapter = HTTPAdapter(
        max_retries=retry_strategy,
        pool_connections=10,
        pool_maxsize=100,
        pool_block=False
    )
    
    session.mount("https://", adapter)
    session.mount("http://", adapter)    
    return session

九、最佳实践总结

9.1 安全实践

  1. 密钥管理:使用环境变量存储API密钥

  2. HTTPS强制:确保所有请求使用HTTPS

  3. 输入验证:验证所有输入参数

  4. 错误处理:不暴露敏感错误信息

9.2 性能实践

  1. 缓存策略:根据搜索类型设置合适的缓存时间

  2. 批量操作:合并多个请求减少API调用次数

  3. 分页优化:使用游标分页提高稳定性

  4. 异步处理:批量操作使用异步方式提高吞吐量

9.3 业务实践

  1. 搜索建议:提供智能搜索建议功能

  2. 聚合分析:利用facet数据进行统计分析

  3. 个性化推荐:基于用户行为优化推荐

  4. 趋势分析:记录和分析搜索趋势


附录:快速开始模板

# quick_start.pyfrom dongchedi_search import DongchediSearchAPI# 1. 初始化客户端client = DongchediSearchAPI(
    app_key="your_app_key",
    app_secret="your_app_secret",
    sandbox=True)# 2. 简单搜索result = client.search(query="宝马3系")if result.success:    for item in result.items:        print(f"{item.brand} {item.model} - {item.price}万")# 3. 新车搜索result = client.search_new_cars(
    query="SUV",
    price_min=20,
    price_max=50,
    level="SUV")# 4. 获取所有数据all_items = client.search_all(
    query="奥迪A4L",
    search_type="new_car",
    max_pages=5)print(f"共找到 {len(all_items)} 条记录")
通过本攻略,您应该能够:
  • 理解懂车帝item_search接口的完整功能

  • 实现安全的API认证和请求

  • 处理各种搜索条件和分页逻辑

  • 构建高性能的汽车搜索应用

  • 在实际业务中灵活应用该接口

建议根据实际业务需求选择合适的实现方案,并遵循最佳实践确保系统的稳定性和可维护性。


群贤毕至

访客