搜狗微信搜索提供的 item_get 接口(非微信官方原生接口,基于搜狗合规爬虫能力封装),是批量获取微信公众号文章详情的核心工具。该接口支持通过文章 URL、文章 ID 或公众号 + 文章标题组合查询,返回文章正文、作者、发布时间、阅读量、在看数等完整信息,广泛应用于舆情监测、内容分析、公众号运营调研、学术研究等场景。
一、接口核心认知:先明确 “能做什么”“适配什么场景”
1. 接口定位与核心价值
2. 核心参数与返回字段(微信文章场景适配版)
(1)请求参数(必填 + 可选,按优先级排序)
(2)返回核心字段(按业务场景分类,微信文章重点标注)
3. 接口限制与注意事项
二、对接前准备:3 步搞定前置条件
1. 获取接口密钥(核心步骤)
2. 技术环境准备
(1)支持语言与协议
(2)必备工具与依赖
3. 业务需求梳理
三、实操步骤:从调试到落地(Python 示例)
步骤 1:理解请求流程
步骤 2:签名生成规则(关键!不同服务商可能差异,以实际文档为准)
签名示例(参数排序与拼接)
步骤 3:完整代码实现(Python)
(1)依赖安装
(2)完整代码(含签名生成、接口调用、数据解析、保存)
import requests
import hashlib
import time
import json
import pandas as pd
from urllib.parse import urlencode, unquote
from typing import Dict, Optional, List
from bs4 import BeautifulSoup
import logging
# 配置日志(记录接口调用、错误信息,便于合规追溯)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[logging.FileHandler("sogou_wechat_item_get.log"), logging.StreamHandler()]
)
# 接口核心配置(替换为服务商提供的appkey、secret、API_URL)
APP_KEY = "你的appkey"
SECRET = "你的secret"
API_URL = "https://api.sogou-wechat.com/item_get" # 服务商接口地址(以实际为准)
SAVE_PATH = "搜狗微信文章数据.xlsx" # 数据保存路径
def generate_sign(params: Dict) -> str:
"""生成接口签名(通用版,若服务商规则不同需调整)"""
# 1. 按参数名ASCII升序排序(排除sign字段)
sorted_params = sorted(params.items(), key=lambda x: x[0])
# 2. 拼接参数字符串(urlencode自动处理中文、特殊字符)
param_str = urlencode(sorted_params, encoding="utf-8") + f"&secret={SECRET}"
# 3. MD5加密(32位大写)
md5 = hashlib.md5()
md5.update(param_str.encode("utf-8"))
return md5.hexdigest().upper()
def html_to_text(html_content: str) -> str:
"""将HTML格式正文转换为纯文本(适配文本分析场景)"""
if not html_content:
return ""
soup = BeautifulSoup(html_content, "lxml")
# 移除脚本、样式、广告标签
for tag in soup(["script", "style", "ad", "iframe", "img"]):
tag.decompose()
# 提取文本并清理空白字符
text = soup.get_text(strip=True, separator="\n")
return "\n".join([line.strip() for line in text.split("\n") if line.strip()])
def get_wechat_article(
query_type: str,
article_url: Optional[str] = None,
article_id: Optional[str] = None,
official_account: Optional[str] = None,
article_title: Optional[str] = None,
need_full_content: int = 1,
need_interact: int = 1,
need_account_info: int = 0,
format: str = "html"
) -> Dict:
"""
调用item_get接口获取搜狗微信文章信息
:param query_type: 查询类型(url/article_id/name_title)
:param article_url: 文章URL(query_type=url时必填)
:param article_id: 搜狗文章ID(query_type=article_id时必填)
:param official_account: 公众号名称(query_type=name_title时必填)
:param article_title: 文章标题(query_type=name_title时必填)
:param need_full_content: 是否返回完整正文(1=是,0=否)
:param need_interact: 是否返回互动数据(1=是,0=否)
:param need_account_info: 是否返回公众号信息(1=是,0=否)
:param format: 正文格式(html/text)
:return: 标准化后的文章数据字典
"""
# 1. 校验参数合法性
if query_type == "url" and not article_url:
logging.error("query_type=url时,article_url为必填参数")
return {"success": False, "error_msg": "缺少文章URL", "error_code": -1}
elif query_type == "article_id" and not article_id:
logging.error("query_type=article_id时,article_id为必填参数")
return {"success": False, "error_msg": "缺少搜狗文章ID", "error_code": -1}
elif query_type == "name_title" and not (official_account and article_title):
logging.error("query_type=name_title时,official_account和article_title为必填参数")
return {"success": False, "error_msg": "缺少公众号名称或文章标题", "error_code": -1}
# 2. 构建基础参数(必填项)
params = {
"appkey": APP_KEY,
"query_type": query_type,
"need_full_content": need_full_content,
"need_interact": need_interact,
"need_account_info": need_account_info,
"format": format,
"timestamp": int(time.time() * 1000)
}
# 3. 补充查询类型对应的必填参数
if query_type == "url":
params["article_url"] = article_url
elif query_type == "article_id":
params["article_id"] = article_id
elif query_type == "name_title":
params["official_account"] = official_account
params["article_title"] = article_title
# 4. 生成签名
params["sign"] = generate_sign(params)
try:
# 5. 发送POST请求(HTTPS协议,超时15秒,适配接口响应速度)
response = requests.post(
url=API_URL,
data=json.dumps(params),
headers={"Content-Type": "application/json"},
timeout=15,
verify=True
)
response.raise_for_status() # 抛出HTTP请求异常(如404、500)
result = response.json()
# 6. 处理响应(不同服务商返回格式可能不同,需按实际文档调整)
# 假设服务商返回格式:{"code":200,"msg":"success","data":{...}}
if result.get("code") == 200:
raw_data = result.get("data", {})
# 解析互动数据
interact_data = raw_data.get("interact_data", {})
# 解析公众号信息
account_info = raw_data.get("account_info", {})
# 正文格式转换(如需纯文本)
content = raw_data.get("content", "")
if format == "text":
content = html_to_text(content)
# 标准化时间格式
pub_time = raw_data.get("pub_time", 0)
pub_time_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(pub_time/1000)) if pub_time else ""
standard_data = {
"请求时间": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"查询类型": query_type,
"文章ID(搜狗)": raw_data.get("article_id", ""),
"文章URL": raw_data.get("article_url", article_url or ""),
"标题": raw_data.get("title", ""),
"摘要": raw_data.get("summary", ""),
"发布时间": pub_time_str,
"正文内容": content,
"原创标识": "是" if raw_data.get("is_original", False) else "否",
"转载来源": raw_data.get("reprint_source", ""),
"阅读量": interact_data.get("read_count", 0),
"在看数": interact_data.get("like_count", 0),
"留言数": interact_data.get("comment_count", 0),
"转发量(估算)": interact_data.get("share_count", 0),
"公众号名称": account_info.get("official_account", official_account or ""),
"公众号ID": account_info.get("wechat_id", ""),
"公众号认证类型": account_info.get("verify_type", ""),
"运营主体": account_info.get("operator", ""),
"文章状态": "正常" if not raw_data.get("is_illegal", False) else "已删除/违规",
"版权声明": raw_data.get("copyright", "数据来源:搜狗微信搜索")
}
return {
"success": True,
"data": standard_data,
"error_msg": ""
}
else:
error_msg = result.get("msg", "接口调用失败")
error_code = result.get("code", -2)
logging.error(f"接口返回错误:code={error_code}, msg={error_msg}")
return {
"success": False,
"data": {},
"error_code": error_code,
"error_msg": error_msg
}
except requests.exceptions.RequestException as e:
logging.error(f"网络异常:{str(e)}")
return {
"success": False,
"data": {},
"error_code": -3,
"error_msg": f"网络异常:{str(e)}"
}
except Exception as e:
logging.error(f"数据处理异常:{str(e)}")
return {
"success": False,
"data": {},
"error_code": -4,
"error_msg": f"处理异常:{str(e)}"
}
def batch_get_wechat_articles(article_urls: List[str]) -> List[Dict]:
"""批量获取微信文章信息(按URL列表批量查询)"""
all_articles = []
for idx, url in enumerate(article_urls, 1):
logging.info(f"正在获取第{idx}篇文章:{url}")
result = get_wechat_article(query_type="url", article_url=url, format="text")
if result["success"]:
all_articles.append(result["data"])
logging.info(f"第{idx}篇文章获取成功:{result['data']['标题']}")
else:
logging.error(f"第{idx}篇文章获取失败:{result['error_msg']}(错误码:{result['error_code']})")
# 控制调用频率(普通用户10次/分钟,间隔6秒;企业用户50次/分钟,间隔1秒)
time.sleep(6)
return all_articles
def save_articles(articles: List[Dict], save_path: str = SAVE_PATH):
"""将文章数据保存为Excel文件(便于归档/分析)"""
if not articles:
logging.warning("无文章数据可保存")
return
df = pd.DataFrame(articles)
# 筛选常用字段,优化Excel可读性
columns = [
"请求时间", "查询类型", "文章URL", "标题", "发布时间", "原创标识", "转载来源",
"阅读量", "在看数", "留言数", "转发量(估算)", "公众号名称", "公众号认证类型",
"运营主体", "文章状态", "版权声明", "摘要", "正文内容"
]
df = df[columns].drop_duplicates(subset=["文章URL"]) # 按文章URL去重
# 增量保存(避免覆盖历史数据)
try:
history_df = pd.read_excel(save_path, engine="openpyxl")
df = pd.concat([history_df, df], ignore_index=True)
except FileNotFoundError:
pass
df.to_excel(save_path, index=False, engine="openpyxl")
logging.info(f"文章数据已归档至:{save_path}(累计{len(df)}条数据)")
# 调用示例(支持单篇/批量获取)
if __name__ == "__main__":
# 模式1:获取单篇文章信息(按文章URL查询)
test_url = "https://mp.weixin.qq.com/s/abc123xyz" # 替换为实际微信文章URL
single_article = get_wechat_article(
query_type="url",
article_url=test_url,
need_full_content=1,
need_interact=1,
need_account_info=1,
format="text"
)
if single_article["success"]:
print("="*80)
print(f"文章标题:{single_article['data']['标题']}")
print("="*80)
print(f"公众号名称:{single_article['data']['公众号名称']}")
print(f"认证类型:{single_article['data']['公众号认证类型']}")
print(f"发布时间:{single_article['data']['发布时间']}")
print(f"原创标识:{single_article['data']['原创标识']}")
print(f"互动数据:阅读{single_article['data']['阅读量']} | 在看{single_article['data']['在看数']} | 留言{single_article['data']['留言数']}")
print(f"摘要:{single_article['data']['摘要']}")
print(f"正文前500字:{single_article['data']['正文内容'][:500]}...")
print(f"文章状态:{single_article['data']['文章状态']}")
print("="*80)
else:
print(f"单篇文章获取失败:{single_article['error_msg']}(错误码:{single_article['error_code']})")
# 模式2:批量获取文章信息(按URL列表查询)
# article_urls = [
# "https://mp.weixin.qq.com/s/xyz123",
# "https://mp.weixin.qq.com/s/def456",
# "https://mp.weixin.qq.com/s/ghi789"
# ]
# batch_articles = batch_get_wechat_articles(article_urls)
# save_articles(batch_articles)