微信生态作为国内最大的内容分发平台之一,汇聚了海量公众号优质文章。item_search 接口(基于搜狗微信搜索合规能力封装)是通过关键词批量获取微信文章列表的核心工具,支持按关键词、发布时间、公众号类型、互动量等多维度筛选,返回文章标题、URL、发布时间、互动数据等核心信息,广泛应用于舆情监测、热点追踪、内容聚合、垂直领域文章采集等场景。
一、接口核心认知:先明确 “能做什么”“适配什么场景”
1. 接口定位与核心价值
2. 核心参数与返回字段(关键词搜索场景适配版)
(1)请求参数(必填 + 可选,按优先级排序)
(2)返回核心字段(按业务场景分类,关键词搜索重点标注)
3. 接口限制与注意事项
二、对接前准备:3 步搞定前置条件
1. 获取接口密钥(核心步骤)
2. 技术环境准备
(1)支持语言与协议
(2)必备工具与依赖
3. 业务需求梳理
三、实操步骤:从调试到落地(Python 示例)
步骤 1:理解请求流程
步骤 2:签名生成规则(关键!不同服务商可能差异,以实际文档为准)
签名示例(参数排序与拼接)
步骤 3:完整代码实现(Python)
(1)依赖安装
(2)完整代码(含签名生成、接口调用、批量分页、数据保存)
import requests
import hashlib
import time
import json
import pandas as pd
from urllib.parse import urlencode, unquote
from typing import Dict, List, Optional
import logging
from apscheduler.schedulers.blocking import BlockingScheduler
import aiohttp
import asyncio
# 配置日志(记录接口调用、错误信息,便于合规追溯)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("sogou_wechat_item_search.log"), logging.StreamHandler()]
)
# 接口核心配置(替换为服务商提供的appkey、secret、API_URL)
APP_KEY = "你的appkey"
SECRET = "你的secret"
API_URL = "https://api.sogou-wechat.com/item_search" # 服务商接口地址(以实际为准)
SAVE_PATH = "微信关键词文章列表.xlsx" # 数据保存路径
CACHE_KEY_PREFIX = "wechat_search_" # Redis缓存键前缀(如需缓存可启用)
def generate_sign(params: Dict) -> str:
"""生成接口签名(通用MD5版,若服务商要求SHA256可调整)"""
# 1. 按参数名ASCII升序排序(排除sign字段)
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接参数字符串(urlencode自动处理中文、特殊字符)
param_str = urlencode(sorted_params, encoding="utf-8") + f"&secret={SECRET}"
# 3. MD5加密(32位大写)
md5 = hashlib.md5()
md5.update(param_str.encode("utf-8"))
return md5.hexdigest().upper()
def standardize_data(raw_article: Dict) -> Dict:
"""标准化文章数据(统一字段格式,适配业务展示/分析)"""
# 解析互动数据
interact_data = raw_article.get("interact_data", {}) or raw_article # 兼容不同服务商字段结构
# 标准化时间格式
pub_time = raw_article.get("pub_time", 0)
pub_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(pub_time/1000)) if pub_time else ""
# 处理阅读量(兼容“10万+”格式)
read_count = interact_data.get("read_count", 0)
if isinstance(read_count, str) and "万+" in read_count:
read_count = int(float(read_count.replace("万+", "")) * 10000)
return {
"请求时间": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"关键词": raw_article.get("keyword", ""),
"文章标题": raw_article.get("title", ""),
"文章URL": raw_article.get("article_url", ""),
"搜狗文章ID": raw_article.get("article_id", ""),
"公众号名称": raw_article.get("official_account", ""),
"公众号认证类型": raw_article.get("verify_type", ""),
"发布时间": pub_time_str,
"文章类型": "原创" if raw_article.get("is_original", False) else "转载",
"转载来源": raw_article.get("reprint_source", ""),
"阅读量": read_count,
"在看数": interact_data.get("like_count", 0),
"留言数": interact_data.get("comment_count", 0),
"转发量(估算)": interact_data.get("share_count", 0),
"摘要": raw_article.get("summary", ""),
"相关度评分": raw_article.get("relevance_score", 0),
"文章标签": ",".join(raw_article.get("tags", [])) if raw_article.get("tags") else "",
"文章状态": "正常" if not raw_article.get("is_illegal", False) else "已删除/违规"
}
def get_article_list_sync(
keyword: str,
time_range: str = "all",
start_date: Optional[str] = None,
end_date: Optional[str] = None,
verify_type: str = "all",
sort_type: str = "relevance",
article_type: str = "all",
page_no: int = 1,
page_size: int = 20,
match_type: str = "fuzzy"
) -> Dict:
"""
同步调用item_search接口,获取单页微信文章列表
:param keyword: 搜索关键词(支持多关键词组合)
:param time_range: 时间范围
:param start_date: 自定义开始日期(time_range=custom时必填)
:param end_date: 自定义结束日期(time_range=custom时必填)
:param verify_type: 公众号认证类型
:param sort_type: 排序方式
:param article_type: 文章类型
:param page_no: 页码
:param page_size: 每页条数
:param match_type: 关键词匹配类型
:return: 标准化后的文章列表结果
"""
# 1. 校验参数合法性
if time_range == "custom" and not (start_date and end_date):
logging.error("time_range=custom时,start_date和end_date为必填参数")
return {"success": False, "error_msg": "缺少自定义时间范围参数", "error_code": -1}
# 2. 构建基础参数(必填项)
params = {
"appkey": APP_KEY,
"keyword": keyword,
"time_range": time_range,
"verify_type": verify_type,
"sort_type": sort_type,
"article_type": article_type,
"page_no": page_no,
"page_size": page_size,
"match_type": match_type,
"timestamp": int(time.time() * 1000)
}
# 3. 补充可选参数
if time_range == "custom":
params["start_date"] = start_date
params["end_date"] = end_date
# 4. 生成签名
params["sign"] = generate_sign(params)
try:
# 5. 发送POST请求(HTTPS协议,超时15秒)
response = requests.post(
url=API_URL,
data=json.dumps(params),
headers={"Content-Type": "application/json"},
timeout=15,
verify=True
)
response.raise_for_status() # 抛出HTTP请求异常(如404、500)
result = response.json()
# 6. 处理响应(不同服务商返回格式可能不同,需按实际文档调整)
# 假设服务商返回格式:{"code":200,"msg":"success","data":{"article_list":[],"total":0,"page_total":1}}
if result.get("code") == 200:
raw_data = result.get("data", {})
article_list = raw_data.get("article_list", [])
total = raw_data.get("total", 0) # 匹配文章总条数
page_total = raw_data.get("page_total", 1) # 总页码
# 标准化文章数据
standard_articles = [standardize_data(article) for article in article_list]
for article in standard_articles:
article["关键词"] = keyword # 补充关键词字段,便于多关键词筛选时区分
return {
"success": True,
"data": standard_articles,
"total": total,
"page_no": page_no,
"page_total": page_total,
"error_msg": ""
}
else:
error_msg = result.get("msg", "接口调用失败")
error_code = result.get("code", -2)
logging.error(f"接口返回错误:code={error_code}, msg={error_msg}(关键词:{keyword},页码:{page_no})")
return {
"success": False,
"data": [],
"total": 0,
"page_no": page_no,
"page_total": 0,
"error_code": error_code,
"error_msg": error_msg
}
except requests.exceptions.RequestException as e:
logging.error(f"网络异常(关键词:{keyword},页码:{page_no}):{str(e)}")
return {
"success": False,
"data": [],
"total": 0,
"page_no": page_no,
"page_total": 0,
"error_code": -3,
"error_msg": f"网络异常:{str(e)}"
}
except Exception as e:
logging.error(f"数据处理异常(关键词:{keyword},页码:{page_no}):{str(e)}")
return {
"success": False,
"data": [],
"total": 0,
"page_no": page_no,
"page_total": 0,
"error_code": -4,
"error_msg": f"处理异常:{str(e)}"
}
async def get_article_list_async(session, keyword: str, **kwargs) -> Dict:
"""异步调用item_search接口,提升批量请求效率"""
page_no = kwargs.get("page_no", 1)
# 构建参数(逻辑同同步方法)
params = {
"appkey": APP_KEY,
"keyword": keyword,
"time_range": kwargs.get("time_range", "all"),
"verify_type": kwargs.get("verify_type", "all"),
"sort_type": kwargs.get("sort_type", "relevance"),
"article_type": kwargs.get("article_type", "all"),
"page_no": page_no,
"page_size": kwargs.get("page_size", 20),
"match_type": kwargs.get("match_type", "fuzzy"),
"timestamp": int(time.time() * 1000)
}
if params["time_range"] == "custom":
params["start_date"] = kwargs.get("start_date")
params["end_date"] = kwargs.get("end_date")
# 生成签名
params["sign"] = generate_sign(params)
try:
async with session.post(
API_URL,
json=params,
headers={"Content-Type": "application/json"},
timeout=15
) as response:
response.raise_for_status()
result = await response.json()
if result.get("code") == 200:
raw_data = result.get("data", {})
article_list = raw_data.get("article_list", [])
standard_articles = [standardize_data(article) for article in article_list]
for article in standard_articles:
article["关键词"] = keyword
return {
"success": True,
"data": standard_articles,
"total": raw_data.get("total", 0),
"page_no": page_no,
"page_total": raw_data.get("page_total", 1),
"error_msg": ""
}
else:
error_msg = result.get("msg", "接口调用失败")
logging.error(f"异步请求失败(关键词:{keyword},页码:{page_no}):{error_msg}")
return {"success": False, "data": [], "error_msg": error_msg}
except Exception as e:
logging.error(f"异步请求异常(关键词:{keyword},页码:{page_no}):{str(e)}")
return {"success": False, "data": [], "error_msg": str(e)}
def batch_get_article_list_sync(
keyword: str,
max_page: int = 5, # 最大获取页码(避免无限制分页)
**kwargs
) -> List[Dict]:
"""同步批量获取多页文章列表(遍历所有页码或指定max_page)"""
all_articles = []
page_no = 1
while True:
logging.info(f"正在获取关键词「{keyword}」第{page_no}页文章列表")
result = get_article_list_sync(keyword=keyword, page_no=page_no, **kwargs)
if not result["success"]:
logging.error(f"第{page_no}页获取失败:{result['error_msg']}")
break
page_articles = result["data"]
if not page_articles:
logging.info(f"第{page_no}页无匹配文章,批量获取结束")
break
all_articles.extend(page_articles)
logging.info(f"第{page_no}页获取成功,新增{len(page_articles)}条数据(累计{len(all_articles)}条)")
# 终止条件:达到最大页码或总页码
if page_no >= result["page_total"] or page_no >= max_page:
break
page_no += 1
# 控制调用频率(普通用户10次/分钟,间隔6秒;企业用户50次/分钟,间隔1秒)
time.sleep(6)
return all_articles
async def batch_get_article_list_async(
keyword: str,
max_page: int = 5,
**kwargs
) -> List[Dict]:
"""异步批量获取多页文章列表(并行请求,提升效率)"""
all_articles = []
# 先获取总页码
first_page_result = await get_article_list_async(
session=None, keyword=keyword, page_no=1, **kwargs
)
if not first_page_result["success"]:
logging.error(f"关键词「{keyword}」第1页获取失败:{first_page_result['error_msg']}")
return all_articles
all_articles.extend(first_page_result["data"])
page_total = min(first_page_result["page_total"], max_page)
if page_total <= 1:
logging.info(f"关键词「{keyword}」仅1页数据,批量获取结束")
return all_articles
# 并行请求剩余页码
async with aiohttp.ClientSession() as session:
tasks = []
for page_no in range(2, page_total + 1):
tasks.append(get_article_list_async(session, keyword=keyword, page_no=page_no, **kwargs))
# 控制并发数(避免频率超限)
semaphore = asyncio.Semaphore(5) # 最大并发5个请求
async def bounded_task(task):
async with semaphore:
return await task
results = await asyncio.gather(*[bounded_task(task) for task in tasks])
for result in results:
if result["success"]:
all_articles.extend(result["data"])
logging.info(f"异步获取成功,新增{len(result['data'])}条数据")
logging.info(f"关键词「{keyword}」异步批量获取结束,累计{len(all_articles)}条数据")
return all_articles
def save_article_list(articles: List[Dict], save_path: str = SAVE_PATH):
"""将文章列表保存为Excel文件(便于归档/分析)"""
if not articles:
logging.warning("无文章数据可保存")
return
df = pd.DataFrame(articles)
# 筛选常用字段,优化Excel可读性
columns = [
"请求时间", "关键词", "文章标题", "公众号名称", "公众号认证类型",
"发布时间", "文章类型", "阅读量", "在看数", "留言数", "转发量(估算)",
"相关度评分", "文章标签", "文章状态", "摘要", "文章URL", "搜狗文章ID"
]
df = df[columns].drop_duplicates(subset=["文章URL"]) # 按文章URL去重(避免重复保存)
# 增量保存(避免覆盖历史数据)
try:
history_df = pd.read_excel(save_path, engine="openpyxl")
df = pd.concat([history_df, df], ignore_index=True).drop_duplicates(subset=["文章URL"])
except FileNotFoundError:
pass
df.to_excel(save_path, index=False, engine="openpyxl")
logging.info(f"文章列表已归档至:{save_path}(累计{len(df)}条数据)")
def keyword_monitor_task(keywords: List[str], **kwargs):
"""关键词定时监测任务(实时追踪相关文章)"""
logging.info("=== 开始执行关键词定时监测任务 ===")
all_articles = []
for keyword in keywords:
logging.info(f"=== 开始监测关键词:{keyword} ===")
# 同步批量获取(如需高效可改用异步)
articles = batch_get_article_list_sync(keyword=keyword, **kwargs)
all_articles.extend(articles)
logging.info(f"=== 关键词「{keyword}」监测完成,获取{len(articles)}条数据 ===")
# 保存所有关键词的文章列表
save_article_list(all_articles)
logging.info("=== 关键词定时监测任务执行完成 ===")
# 调用示例(支持单页/批量/异步/定时监测)
if __name__ == "__main__":
# 模式1:获取单页文章列表(关键词“数字经济 政策解读”,30天内,按发布时间排序)
single_page_result = get_article_list_sync(
keyword="数字经济 政策解读",
time_range="30days",
sort_type="pub_time",
page_no=1,
page_size=20
)
if single_page_result["success"]:
print("="*80)
print(f"关键词「数字经济 政策解读」第1页文章列表(共{len(single_page_result['data'])}条)")
print("="*80)
for idx, article in enumerate(single_page_result["data"][:10], 1): # 打印前10条
print(f"{idx:2d}. 标题:{article['文章标题']}")
print(f" 公众号:{article['公众号名称']}({article['公众号认证类型']})")
print(f" 发布时间:{article['发布时间']} | 阅读量:{article['阅读量']} | 在看数:{article['在看数']}")
print(f" 相关度:{article['相关度评分']}分 | 状态:{article['文章状态']}")
print(f" URL:{article['文章URL']}")
print("-"*80)
else:
print(f"单页文章列表获取失败:{single_page_result['error_msg']}(错误码:{single_page_result['error_code']})")
# 模式2:同步批量获取多页文章列表(关键词“AI大模型”,7天内,最多获取5页)
# batch_articles = batch_get_article_list_sync(
# keyword="AI大模型",
# time_range="7days",
# sort_type="read_count",
# max_page=5,
# page_size=30
# )
# save_article_list(batch_articles)
# 模式3:异步批量获取多页文章列表(关键词“乡村振兴”,自定义时间2024-01-01至2024-12-31)
# asyncio.run(batch_get_article_list_async(
# keyword="乡村振兴",
# time_range="custom",
# start_date="2024-01-01",
# end_date="2024-12-31",
# verify_type="official",
# max_page=10,
# page_size=50
# ))
# 模式4:启动定时监测任务(每1小时监测一次关键词“数字经济”“AI大模型”)
# scheduler = BlockingScheduler()
# scheduler.add_job(
# keyword_monitor_task,
# 'interval',
# hours=1,
# args=[["数字经济", "AI大模型"]],
# kwargs={"time_range": "1day", "sort_type": "pub_time", "max_page": 3}
# )
# logging.info("关键词定时监测任务已启动,每1小时执行一次...")
# try:
# scheduler.start()
# except (KeyboardInterrupt, SystemExit):
# logging.info("关键词定时监测任务已停止")