一、项目概述
1.1 项目简介
本项目将构建一个智能微信比价机器人,能够:
- ✅ 实时抓取:用Scrapy抓取京东商品价格信息
- ✅ 智能比价:分析商品历史价格趋势
- ✅ 微信交互:通过Flask接收用户查询并返回结果
- ✅ 定时监控:自动监控商品价格变化
- ✅ 价格预警:低于目标价格时自动提醒
1.2 技术栈
- 爬虫框架:Scrapy + Selenium(动态渲染)
- Web框架:Flask + RESTful API
- 微信集成:itchat / Wechaty
- 数据存储:Redis + MySQL
- 任务调度:APScheduler
- 部署运维:Docker + Nginx
1.3 核心功能
- 🛍️ 商品搜索:通过微信发送商品名,返回比价结果
- 📊 价格趋势:展示商品30天价格走势
- 🔔 降价提醒:设置目标价,降价自动通知
- ⭐ 收藏管理:收藏关注商品,一键查询
- 📈 数据分析:智能推荐最佳购买时机
二、环境配置
2.1 项目结构
jd_price_bot/ ├── spiders/ # Scrapy爬虫 │ ├── jd_spider.py # 京东商品爬虫 │ ├── price_monitor.py # 价格监控爬虫 │ └── middlewares.py # 中间件 ├── web/ # Flask应用 │ ├── app.py # Flask主应用 │ ├── routes/ # 路由模块 │ ├── models/ # 数据模型 │ └── templates/ # 模板文件 ├── services/ # 业务服务 │ ├── wechat_service.py # 微信服务 │ ├── price_service.py # 价格服务 │ └── notification_service.py # 通知服务 ├── utils/ # 工具类 │ ├── database.py # 数据库连接 │ ├── cache.py # 缓存工具 │ └── config.py # 配置管理 ├── requirements.txt # 依赖包 ├── docker-compose.yml # Docker配置 └── README.md
2.2 环境配置
# requirements.txt# Web框架Flask==2.3.3flask-restful==0.3.10flask-sqlalchemy==3.0.5flask-apscheduler==1.12.4# 爬虫相关Scrapy==2.11.0selenium==4.15.0requests==2.31.0beautifulsoup4==4.12.2lxml==4.9.3# 微信相关itchat==1.3.10wechaty==0.8.21# 数据库redis==5.0.1mysql-connector-python==8.1.0sqlalchemy==2.0.23# 数据处理pandas==2.1.3numpy==1.25.2matplotlib==3.7.2# 工具类python-dotenv==1.0.0APScheduler==3.10.4schedule==1.2.0
2.3 配置文件
# utils/config.pyimport osfrom dotenv import load_dotenv
load_dotenv()class Config: # 基础配置
SECRET_KEY = os.getenv('SECRET_KEY', 'jd-price-bot-secret-key')
DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
# 数据库配置
REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
MYSQL_HOST = os.getenv('MYSQL_HOST', 'localhost')
MYSQL_PORT = os.getenv('MYSQL_PORT', '3306')
MYSQL_USER = os.getenv('MYSQL_USER', 'root')
MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', 'password')
MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'jd_price_bot')
# 微信配置
WECHAT_QR_CODE_PATH = os.getenv('WECHAT_QR_CODE_PATH', './qr_code.png')
WECHAT_AUTO_LOGIN = os.getenv('WECHAT_AUTO_LOGIN', 'True').lower() == 'true'
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
# 爬虫配置
JD_SEARCH_URL = 'https://search.jd.com/Search'
JD_PRODUCT_URL = 'https://item.jd.com/{}.html'
SPIDER_DELAY = int(os.getenv('SPIDER_DELAY', '2'))
MAX_PRODUCTS_PER_SEARCH = int(os.getenv('MAX_PRODUCTS_PER_SEARCH', '20'))
# 价格监控配置
PRICE_CHECK_INTERVAL = int(os.getenv('PRICE_CHECK_INTERVAL', '3600')) # 1小时
PRICE_HISTORY_DAYS = int(os.getenv('PRICE_HISTORY_DAYS', '30'))
# 通知配置
PRICE_DROP_THRESHOLD = float(os.getenv('PRICE_DROP_THRESHOLD', '0.1')) # 10%降价
ENABLE_PRICE_ALERT = os.getenv('ENABLE_PRICE_ALERT', 'True').lower() == 'true'
# 安全配置
USER_WHITELIST = os.getenv('USER_WHITELIST', '').split(',') # 用户白名单
MAX_REQUESTS_PER_MINUTE = int(os.getenv('MAX_REQUESTS_PER_MINUTE', '30'))
# 日志配置
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
LOG_FILE = os.getenv('LOG_FILE', './logs/jd_bot.log')三、核心模块实现
3.1 京东爬虫实现(Scrapy)
3.1.1 商品搜索爬虫
# spiders/jd_spider.pyimport scrapyimport jsonimport refrom urllib.parse import quote, urlencodefrom typing import Dict, List, Optionalfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import TimeoutExceptionimport timeclass JDSpider(scrapy.Spider):
name = 'jd_spider'
allowed_domains = ['jd.com', 'search.jd.com']
def __init__(self, keyword: str = None, max_results: int = 20, *args, **kwargs): super().__init__(*args, **kwargs) self.keyword = keyword self.max_results = max_results self.driver = None
self.setup_driver()
def setup_driver(self): """设置Selenium WebDriver"""
options = webdriver.ChromeOptions()
options.add_argument('--headless')
options.add_argument('--no-sandbox')
options.add_argument('--disable-dev-shm-usage')
options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36')
self.driver = webdriver.Chrome(options=options) self.driver.implicitly_wait(10)
def start_requests(self): """开始请求"""
if self.keyword:
url = self.build_search_url(self.keyword) yield scrapy.Request(url=url, callback=self.parse_search_results)
def build_search_url(self, keyword: str) -> str: """构建搜索URL"""
params = { 'keyword': keyword, 'enc': 'utf-8', 'wq': keyword, 'pvid': self.generate_pvid()
} return f"https://search.jd.com/Search?{urlencode(params)}"
def generate_pvid(self) -> str: """生成随机pvid"""
import uuid return str(uuid.uuid4()).replace('-', '')[:16]
def parse_search_results(self, response): """解析搜索结果页"""
try: # 等待页面加载完成
WebDriverWait(self.driver, 10).until(
EC.presence_of_element_located((By.CSS_SELECTOR, ".gl-item"))
)
# 获取商品列表
products = self.driver.find_elements(By.CSS_SELECTOR, ".gl-item")
results = []
for i, product in enumerate(products[:self.max_results]): try:
product_info = self.extract_product_info(product) if product_info:
results.append(product_info) self.logger.info(f"提取商品: {product_info['title'][:30]}") except Exception as e: self.logger.error(f"提取商品信息失败: {e}") continue
# 返回结果
yield { 'keyword': self.keyword, 'results': results, 'count': len(results)
}
except TimeoutException: self.logger.error("搜索页面加载超时") finally: self.close_driver()
def extract_product_info(self, product_element) -> Optional[Dict]: """提取商品信息"""
try: # 商品ID
data_sku = product_element.get_attribute('data-sku') if not data_sku: return None
# 商品标题
title_element = product_element.find_element(By.CSS_SELECTOR, ".p-name a em")
title = title_element.text.strip() if title_element else ""
# 价格
price_element = product_element.find_element(By.CSS_SELECTOR, ".p-price strong i")
price = float(price_element.text) if price_element and price_element.text else 0.0
# 店铺
shop_element = product_element.find_element(By.CSS_SELECTOR, ".p-shop a")
shop = shop_element.text.strip() if shop_element else ""
# 评论数
comment_element = product_element.find_element(By.CSS_SELECTOR, ".p-commit a")
comment_text = comment_element.text.strip() if comment_element else ""
comment_count = self.extract_comment_count(comment_text)
# 商品链接
link_element = product_element.find_element(By.CSS_SELECTOR, ".p-name a")
link = link_element.get_attribute('href') if link_element else ""
# 图片
img_element = product_element.find_element(By.CSS_SELECTOR, ".p-img img")
image_url = img_element.get_attribute('src') or img_element.get_attribute('data-lazy-img')
return { 'product_id': data_sku, 'title': title, 'price': price, 'shop': shop, 'comment_count': comment_count, 'link': link, 'image_url': image_url, 'crawl_time': int(time.time())
}
except Exception as e: self.logger.error(f"提取商品信息异常: {e}") return None
def extract_comment_count(self, comment_text: str) -> int: """提取评论数量"""
if not comment_text: return 0
# 匹配数字
match = re.search(r'(\d+(\.\d+)?)', comment_text.replace('+', '').replace('万', '0000')) if match:
count = float(match.group(1)) if '万' in comment_text:
count *= 10000
return int(count) return 0
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
def close_driver(self): """关闭浏览器"""
if self.driver: self.driver.quit()
def closed(self, reason): """爬虫关闭时清理资源"""
self.close_driver() super().closed(reason)# 价格详情爬虫class JDPriceSpider(scrapy.Spider):
name = 'jd_price_spider'
def __init__(self, product_ids: List[str] = None, *args, **kwargs): super().__init__(*args, **kwargs) self.product_ids = product_ids or []
def start_requests(self): """开始请求"""
for product_id in self.product_ids:
url = f"https://item.jd.com/{product_id}.html"
yield scrapy.Request(
url=url,
callback=self.parse_product_page,
meta={'product_id': product_id}
)
def parse_product_page(self, response): """解析商品详情页"""
product_id = response.meta['product_id']
try: # 提取价格信息
price_script = response.xpath('//script[contains(text(), "price:")]/text()').get()
price = self.extract_price_from_script(price_script)
# 提取促销信息
promotions = self.extract_promotions(response)
# 提取库存信息
stock = self.extract_stock_info(response)
yield { 'product_id': product_id, 'price': price, 'promotions': promotions, 'stock': stock, 'crawl_time': int(time.time())
}
except Exception as e: self.logger.error(f"解析商品页面失败: {e}")
def extract_price_from_script(self, script_text: str) -> float: """从JavaScript中提取价格"""
if not script_text: return 0.0
try: # 匹配价格模式
price_patterns = [ r'"price"\s*:\s*"([\d.]+)"', r"'price'\s*:\s*'([\d.]+)'", r'price\s*=\s*([\d.]+)'
]
for pattern in price_patterns: match = re.search(pattern, script_text) if match: return float(match.group(1))
return 0.0
except: return 0.0
def extract_promotions(self, response) -> List[str]: """提取促销信息"""
promotions = []
# 提取优惠券
coupons = response.xpath('//div[contains(@class, "coupon-item")]//text()').getall()
coupons_text = ' '.join([c.strip() for c in coupons if c.strip()]) if coupons_text:
promotions.append(f"优惠券: {coupons_text}")
# 提取促销活动
promo_elements = response.xpath('//div[contains(@class, "promotion-item")]') for promo in promo_elements:
promo_text = promo.xpath('.//text()').getall()
promo_text = ' '.join([p.strip() for p in promo_text if p.strip()]) if promo_text:
promotions.append(promo_text)
return promotions
def extract_stock_info(self, response) -> str: """提取库存信息"""
stock_text = response.xpath('//div[contains(@class, "store-prompt")]//text()').get() if stock_text: return stock_text.strip()
# 检查是否有货
buy_btn = response.xpath('//a[contains(@class, "btn-addtocart")]') if buy_btn: return "有货"
else: return "无货"3.1.2 价格监控爬虫
# spiders/price_monitor.pyimport scrapyfrom scrapy import signalsimport jsonimport timefrom datetime import datetime, timedeltafrom typing import Dict, Listclass JDPriceMonitor(scrapy.Spider):
name = 'jd_price_monitor'
custom_settings = { 'CONCURRENT_REQUESTS': 1, 'DOWNLOAD_DELAY': 3, 'AUTOTHROTTLE_ENABLED': True
}
def __init__(self, product_ids: List[str] = None, *args, **kwargs): super().__init__(*args, **kwargs) self.product_ids = product_ids or [] self.price_history = {} self.redis_client = None
@classmethod
def from_crawler(cls, crawler, *args, **kwargs):
spider = super().from_crawler(crawler, *args, **kwargs)
crawler.signals.connect(spider.spider_opened, signal=signals.spider_opened)
crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed) return spider
def spider_opened(self, spider): """爬虫启动时"""
from utils.cache import get_redis_client self.redis_client = get_redis_client()
# 加载需要监控的商品
if not self.product_ids: self.product_ids = self.load_monitored_products()
def spider_closed(self, spider): """爬虫关闭时"""
if self.redis_client: self.redis_client.close()
def load_monitored_products(self) -> List[str]: """从数据库加载需要监控的商品"""
try: from services.price_service import PriceService
price_service = PriceService()
products = price_service.get_monitored_products() return [p.product_id for p in products] except Exception as e: self.logger.error(f"加载监控商品失败: {e}") return []
def start_requests(self): """开始请求"""
for product_id in self.product_ids:
url = f"https://item.jd.com/{product_id}.html"
yield scrapy.Request(
url=url,
callback=self.parse_product_price,
meta={'product_id': product_id},
dont_filter=True
)
def parse_product_price(self, response): """解析商品价格"""
product_id = response.meta['product_id']
try: # 提取价格
price = self.extract_price(response)
if price > 0: # 保存价格记录
self.save_price_record(product_id, price)
# 检查价格变化
price_change = self.check_price_change(product_id, price)
yield { 'product_id': product_id, 'price': price, 'price_change': price_change, 'timestamp': int(time.time()), 'status': 'success'
} else: yield { 'product_id': product_id, 'price': 0.0, 'timestamp': int(time.time()), 'status': 'failed', 'error': '价格提取失败'
}
except Exception as e: self.logger.error(f"价格监控失败: {e}") yield { 'product_id': product_id, 'price': 0.0, 'timestamp': int(time.time()), 'status': 'failed', 'error': str(e)
}
def extract_price(self, response) -> float: """提取价格"""
# 多种方式提取价格
price_selectors = [ '//span[@class="price J-p-{}"]/text()', '//strong[@class="J-p-{}"]/i/text()', '//span[contains(@class, "price J-p-")]/text()', '//script[contains(text(), "price:")]/text()'
]
for selector in price_selectors: try:
price_text = response.xpath(selector).get() if price_text: # 清理价格文本
price_text = price_text.replace('¥', '').replace('¥', '').strip() if price_text and price_text.replace('.', '').isdigit(): return float(price_text) except: continue
return 0.0
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
def save_price_record(self, product_id: str, price: float): """保存价格记录"""
try: from services.price_service import PriceService
price_service = PriceService()
price_service.save_price_record(product_id, price)
# 缓存最新价格
if self.redis_client:
cache_key = f"jd:price:{product_id}"
self.redis_client.setex(
cache_key,
3600, # 1小时
json.dumps({ 'price': price, 'timestamp': int(time.time())
})
) except Exception as e: self.logger.error(f"保存价格记录失败: {e}")
def check_price_change(self, product_id: str, current_price: float) -> Dict: """检查价格变化"""
try: from services.price_service import PriceService
price_service = PriceService()
# 获取历史价格
history = price_service.get_price_history(product_id, days=7) if not history: return {'change': 0.0, 'type': 'new'}
# 计算价格变化
last_price = history[-1].price
change = current_price - last_price
change_percent = (change / last_price) * 100 if last_price > 0 else 0.0
# 检查是否触发预警
if change_percent < -10: # 降价超过10%
self.trigger_price_alert(product_id, current_price, change_percent)
return { 'change': change_percent, 'type': 'drop' if change_percent < 0 else 'rise', 'last_price': last_price
}
except Exception as e: self.logger.error(f"检查价格变化失败: {e}") return {'change': 0.0, 'type': 'unknown'}
def trigger_price_alert(self, product_id: str, price: float, change_percent: float): """触发价格预警"""
try: from services.notification_service import NotificationService
notification_service = NotificationService()
# 获取商品信息
from services.price_service import PriceService
price_service = PriceService()
product = price_service.get_product_info(product_id)
if product:
message = f"🚨 价格预警!\n商品:{product.title[:30]}...\n当前价格:¥{price}\n降价幅度:{abs(change_percent):.1f}%"
notification_service.send_price_alert(product_id, message) except Exception as e: self.logger.error(f"发送价格预警失败: {e}")3.2 Flask Web应用
3.2.1 主应用
# web/app.pyfrom flask import Flask, request, jsonify, render_templatefrom flask_restful import Apifrom flask_apscheduler import APSchedulerimport loggingfrom utils.config import Configfrom utils.database import db, init_dbfrom services.wechat_service import WeChatServicefrom services.price_service import PriceServicefrom services.notification_service import NotificationService# 配置日志logging.basicConfig(
level=getattr(logging, Config.LOG_LEVEL), format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler(Config.LOG_FILE),
logging.StreamHandler()
]
)
app = Flask(__name__)
app.config.from_object(Config)# 初始化数据库init_db(app)# 初始化APIapi = Api(app)# 初始化定时任务scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()# 初始化服务wechat_service = WeChatService()
price_service = PriceService()
notification_service = NotificationService()# 注册路由from web.routes.wechat_routes import wechat_bpfrom web.routes.price_routes import price_bpfrom web.routes.product_routes import product_bp
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
app.register_blueprint(wechat_bp, url_prefix='/wechat')
app.register_blueprint(price_bp, url_prefix='/price')
app.register_blueprint(product_bp, url_prefix='/product')@app.route('/')def index(): """首页"""
return render_template('index.html')@app.route('/health')def health_check(): """健康检查"""
return jsonify({'status': 'healthy', 'timestamp': int(time.time())})@app.route('/search')def search_products(): """搜索商品"""
keyword = request.args.get('keyword', '') if not keyword: return jsonify({'error': '请输入搜索关键词'}), 400
try:
results = price_service.search_products(keyword) return jsonify({ 'keyword': keyword, 'results': results, 'count': len(results)
}) except Exception as e:
logging.error(f"搜索失败: {e}") return jsonify({'error': '搜索失败'}), 500# 定时任务@scheduler.task('interval', id='monitor_prices', hours=1)def monitor_prices(): """定时监控价格"""
try: from spiders.price_monitor import JDPriceMonitor from scrapy.crawler import CrawlerProcess from scrapy.utils.project import get_project_settings
# 启动价格监控爬虫
settings = get_project_settings()
process = CrawlerProcess(settings)
process.crawl(JDPriceMonitor)
process.start()
logging.info("价格监控任务执行完成") except Exception as e:
logging.error(f"价格监控任务失败: {e}")if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=Config.DEBUG)3.2.2 微信路由
# web/routes/wechat_routes.pyfrom flask import Blueprint, request, jsonifyimport refrom services.wechat_service import WeChatServicefrom services.price_service import PriceServiceimport logging
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
wechat_bp = Blueprint('wechat', __name__)
wechat_service = WeChatService()
price_service = PriceService()@wechat_bp.route('/message', methods=['POST'])def handle_wechat_message(): """处理微信消息"""
try:
data = request.json if not data: return jsonify({'error': '无效的请求数据'}), 400
message_type = data.get('type', '')
content = data.get('content', '')
user_id = data.get('user_id', '')
# 验证用户权限
if not wechat_service.validate_user(user_id): return jsonify({'error': '用户无权限'}), 403
# 处理不同类型的消息
if message_type == 'text':
response = handle_text_message(content, user_id) elif message_type == 'image':
response = handle_image_message(content, user_id) else:
response = {'type': 'text', 'content': '暂不支持此类型消息'}
return jsonify(response)
except Exception as e:
logging.error(f"处理微信消息失败: {e}") return jsonify({'error': '处理消息失败'}), 500def handle_text_message(content: str, user_id: str) -> dict: """处理文本消息"""
# 帮助命令
if content in ['帮助', 'help', '?']: return { 'type': 'text', 'content': get_help_message()
}
# 搜索商品
if content.startswith('搜索 ') or content.startswith('search '):
keyword = content.replace('搜索 ', '').replace('search ', '') return search_products(keyword, user_id)
# 查询价格
if content.startswith('价格 ') or content.startswith('price '):
product_id = content.replace('价格 ', '').replace('price ', '') return get_product_price(product_id, user_id)
# 设置预警
if content.startswith('预警 ') or content.startswith('alert '):
parts = content.split(' ') if len(parts) >= 3:
product_id = parts[1]
target_price = float(parts[2]) return set_price_alert(product_id, target_price, user_id)
# 查询历史价格
if content.startswith('历史 ') or content.startswith('history '):
product_id = content.replace('历史 ', '').replace('history ', '') return get_price_history(product_id, user_id)
# 收藏商品
if content.startswith('收藏 ') or content.startswith('fav '):
product_id = content.replace('收藏 ', '').replace('fav ', '') return add_to_favorites(product_id, user_id)
# 查看收藏
if content in ['收藏列表', 'favorites']: return get_favorites(user_id)
# 默认回复
return { 'type': 'text', 'content': '请输入"帮助"查看可用命令'
}def get_help_message() -> str: """获取帮助信息"""
return """
🤖 京东比价机器人使用指南:
🔍 搜索商品:
搜索 iPhone 15
search 笔记本电脑
💰 查询价格:
价格 100012345678
price 100012345678
📊 历史价格:
历史 100012345678
history 100012345678
🔔 价格预警:
预警 100012345678 5999
alert 100012345678 5999
⭐ 收藏管理:
收藏 100012345678
收藏列表
📈 价格趋势图:
趋势 100012345678
trend 100012345678
"""def search_products(keyword: str, user_id: str) -> dict: """搜索商品"""
try:
results = price_service.search_products(keyword, limit=5)
if not results: return { 'type': 'text', 'content': f'未找到与"{keyword}"相关的商品'
}
# 构建回复消息
message = f"🔍 搜索“{keyword}”结果:\n\n"
for i, product in enumerate(results, 1):
message += f"{i}. {product['title'][:30]}...\n"
message += f" 💰 ¥{product['price']} | 👑 {product['shop']}\n"
message += f" 📞 评论: {product['comment_count']}\n"
message += f" 🔗 ID: {product['product_id']}\n\n"
message += "输入“价格 ID”查看详情,如:价格 100012345678"
return { 'type': 'text', 'content': message
}
except Exception as e:
logging.error(f"搜索商品失败: {e}") return { 'type': 'text', 'content': '搜索失败,请稍后重试'
}def get_product_price(product_id: str, user_id: str) -> dict: """获取商品价格"""
try:
product = price_service.get_product_info(product_id) if not product: return { 'type': 'text', 'content': f'未找到商品 ID: {product_id}'
}
# 获取当前价格
current_price = price_service.get_current_price(product_id)
# 获取价格变化
price_change = price_service.get_price_change(product_id, days=7)
message = f"📱 {product.title}\n\n"
message += f"💰 当前价格: ¥{current_price}\n"
if price_change:
change_type = "📈" if price_change['change'] > 0 else "📉"
message += f"{change_type} 7天变化: {price_change['change']:.1f}%\n"
message += f"🏪 店铺: {product.shop}\n"
message += f"📞 评论: {product.comment_count}\n\n"
message += "输入“历史 ID”查看价格趋势"
return { 'type': 'text', 'content': message
}
except Exception as e:
logging.error(f"获取商品价格失败: {e}") return { 'type': 'text', 'content': '获取价格失败,请稍后重试'
}def set_price_alert(product_id: str, target_price: float, user_id: str) -> dict: """设置价格预警"""
try:
success = price_service.set_price_alert(user_id, product_id, target_price)
if success: return { 'type': 'text', 'content': f'✅ 预警设置成功!\n当价格 ≤ ¥{target_price} 时会通知您'
} else: return { 'type': 'text', 'content': '预警设置失败,请检查商品ID'
}
except Exception as e:
logging.error(f"设置价格预警失败: {e}") return { 'type': 'text', 'content': '预警设置失败,请稍后重试'
}3.3 核心服务
3.3.1 微信服务
# services/wechat_service.pyimport itchatimport qrcodeimport osimport threadingimport timefrom typing import Dict, List, Optionalfrom utils.config import Configimport loggingclass WeChatService: """微信服务"""
def __init__(self): self.is_logged_in = False
self.friends = {} self.msg_handler = None
self.login_thread = None
self.qr_path = Config.WECHAT_QR_CODE_PATH
def login(self) -> bool: """登录微信"""
try: # 创建二维码
qr = qrcode.QRCode(version=1, box_size=10, border=5)
qr.add_data('https://login.weixin.qq.com/')
qr.make(fit=True)
img = qr.make_image(fill_color="black", back_color="white")
img.save(self.qr_path) print(f"请扫描二维码登录微信: {self.qr_path}")
# 登录
itchat.auto_login(
hotReload=Config.WECHAT_AUTO_LOGIN,
picDir=self.qr_path
)
self.is_logged_in = True
self.friends = itchat.get_friends(update=True)
# 启动消息监听
threading.Thread(target=self.start_listening, daemon=True).start()
logging.info("微信登录成功") return True
except Exception as e:
logging.error(f"微信登录失败: {e}") return False
def start_listening(self): """启动消息监听""" @itchat.msg_register(itchat.content.TEXT)
def handle_text_message(msg): try:
user_id = msg['FromUserName']
content = msg['Text']
# 转发到Flask处理
if self.msg_handler:
response = self.msg_handler(user_id, content) if response: self.send_message(user_id, response)
except Exception as e:
logging.error(f"处理微信消息失败: {e}")
itchat.run()
def send_message(self, user_id: str, message: str) -> bool: """发送微信消息"""
try:
itchat.send(message, toUserName=user_id) return True
except Exception as e:
logging.error(f"发送微信消息失败: {e}") return False
def validate_user(self, user_id: str) -> bool: """验证用户权限"""
if not Config.USER_WHITELIST: return True
# 检查用户是否在白名单中
for friend in self.friends: if friend['UserName'] == user_id: return friend['NickName'] in Config.USER_WHITELIST
return False
def get_friend_info(self, user_id: str) -> Optional[Dict]: """获取好友信息"""
for friend in self.friends: if friend['UserName'] == user_id: return { 'nickname': friend['NickName'], 'remark_name': friend['RemarkName'], 'sex': friend['Sex']
} return None
def set_message_handler(self, handler): """设置消息处理器"""
self.msg_handler = handler3.3.2 价格服务
# services/price_service.pyfrom typing import Dict, List, Optionalfrom datetime import datetime, timedeltafrom utils.database import dbfrom web.models.product_model import Product, PriceHistory, PriceAlertimport loggingimport jsonfrom utils.cache import get_redis_clientclass PriceService: """价格服务"""
def __init__(self): self.redis_client = get_redis_client()
def search_products(self, keyword: str, limit: int = 10) -> List[Dict]: """搜索商品"""
try: # 检查缓存
cache_key = f"jd:search:{keyword}"
cached = self.redis_client.get(cache_key) if cached: return json.loads(cached)
# 调用爬虫搜索
from spiders.jd_spider import JDSpider from scrapy.crawler import CrawlerProcess from scrapy.utils.project import get_project_settings
settings = get_project_settings()
process = CrawlerProcess(settings)
results = [] def collect_results(item):
results.extend(item.get('results', []))
process.crawl(JDSpider, keyword=keyword, max_results=limit)
process.start()
# 缓存结果
if results: self.redis_client.setex(cache_key, 1800, json.dumps(results)) # 30分钟
return results
except Exception as e:
logging.error(f"搜索商品失败: {e}") return []
def get_product_info(self, product_id: str) -> Optional[Product]: """获取商品信息"""
try:
product = Product.query.filter_by(product_id=product_id).first() if product: return product
# 从京东获取商品信息
from spiders.jd_price_spider import JDPriceSpider from scrapy.crawler import CrawlerProcess from scrapy.utils.project import get_project_settings
settings = get_project_settings()
process = CrawlerProcess(settings)
result = None
def collect_result(item): nonlocal result
result = item
process.crawl(JDPriceSpider, product_ids=[product_id])
process.start()
if result: # 保存到数据库
product = Product(
product_id=product_id,
title=result.get('title', ''),
shop=result.get('shop', ''),
comment_count=result.get('comment_count', 0),
image_url=result.get('image_url', ''),
link=result.get('link', '')
)
db.session.add(product)
db.session.commit() return product
return None
except Exception as e:
logging.error(f"获取商品信息失败: {e}") return None
def get_current_price(self, product_id: str) -> float: """获取当前价格"""
try: # 检查缓存
cache_key = f"jd:price:{product_id}"
cached = self.redis_client.get(cache_key) if cached:
data = json.loads(cached) return data.get('price', 0.0)
# 从数据库获取最新价格
price_record = PriceHistory.query.filter_by(
product_id=product_id
).order_by(PriceHistory.timestamp.desc()).first()
if price_record: # 更新缓存
self.redis_client.setex(
cache_key,
3600,
json.dumps({ 'price': price_record.price, 'timestamp': int(price_record.timestamp.timestamp())
})
) return price_record.price
return 0.0
except Exception as e:
logging.error(f"获取当前价格失败: {e}") return 0.0
def save_price_record(self, product_id: str, price: float) -> bool: """保存价格记录"""
try: # 检查是否重复记录
latest_record = PriceHistory.query.filter_by(
product_id=product_id
).order_by(PriceHistory.timestamp.desc()).first()
if latest_record and latest_record.price == price: # 价格未变化,不重复记录
return True
# 保存新记录
price_record = PriceHistory(
product_id=product_id,
price=price,
timestamp=datetime.now()
)
db.session.add(price_record)
db.session.commit()
logging.info(f"保存价格记录: {product_id} - ¥{price}") return True
except Exception as e:
logging.error(f"保存价格记录失败: {e}")
db.session.rollback() return False
def get_price_history(self, product_id: str, days: int = 30) -> List[PriceHistory]: """获取价格历史"""
try:
start_date = datetime.now() - timedelta(days=days) return PriceHistory.query.filter(
PriceHistory.product_id == product_id,
PriceHistory.timestamp >= start_date
).order_by(PriceHistory.timestamp.asc()).all()
except Exception as e:
logging.error(f"获取价格历史失败: {e}") return []
def get_price_change(self, product_id: str, days: int = 7) -> Optional[Dict]: """获取价格变化"""
try:
history = self.get_price_history(product_id, days=days) if len(history) < 2: return None
current_price = history[-1].price
old_price = history[0].price
change = current_price - old_price
change_percent = (change / old_price) * 100 if old_price > 0 else 0.0
return { 'change': change_percent, 'type': 'drop' if change_percent < 0 else 'rise', 'current_price': current_price, 'old_price': old_price
}
except Exception as e:
logging.error(f"计算价格变化失败: {e}") return None
def set_price_alert(self, user_id: str, product_id: str, target_price: float) -> bool: """设置价格预警"""
try: # 检查是否已存在
existing_alert = PriceAlert.query.filter_by(
user_id=user_id,
product_id=product_id
).first()
if existing_alert:
existing_alert.target_price = target_price
existing_alert.updated_at = datetime.now() else:
alert = PriceAlert(
user_id=user_id,
product_id=product_id,
target_price=target_price,
created_at=datetime.now(),
updated_at=datetime.now()
)
db.session.add(alert)
db.session.commit() return True
except Exception as e:
logging.error(f"设置价格预警失败: {e}")
db.session.rollback() return False
def get_monitored_products(self) -> List[Product]: """获取需要监控的商品"""
try: # 获取所有设置了预警的商品
alerts = PriceAlert.query.distinct(PriceAlert.product_id).all()
product_ids = [alert.product_id for alert in alerts]
return Product.query.filter(Product.product_id.in_(product_ids)).all()
except Exception as e:
logging.error(f"获取监控商品失败: {e}") return []3.3.3 通知服务
# services/notification_service.pyfrom typing import Dict, Listfrom utils.database import dbfrom web.models.product_model import PriceAlertfrom services.wechat_service import WeChatServiceimport loggingclass NotificationService: """通知服务"""
def __init__(self): self.wechat_service = WeChatService()
def send_price_alert(self, product_id: str, message: str) -> bool: """发送价格预警"""
try: # 获取设置了预警的用户
alerts = PriceAlert.query.filter_by(product_id=product_id).all()
for alert in alerts: try: # 检查当前价格是否达到目标价
from services.price_service import PriceService
price_service = PriceService()
current_price = price_service.get_current_price(product_id)
if current_price <= alert.target_price: # 发送通知
self.wechat_service.send_message(alert.user_id, message)
logging.info(f"发送价格预警给用户 {alert.user_id}: {message}") except Exception as e:
logging.error(f"发送预警给用户 {alert.user_id} 失败: {e}") continue
return True
except Exception as e:
logging.error(f"发送价格预警失败: {e}") return False
def send_daily_summary(self, user_id: str) -> bool: """发送每日摘要"""
try: # 获取用户关注商品的价格变化
alerts = PriceAlert.query.filter_by(user_id=user_id).all()
if not alerts: return True
message = "📊 今日价格变化摘要:\n\n"
has_changes = False
for alert in alerts: from services.price_service import PriceService
price_service = PriceService()
# 获取24小时价格变化
price_change = price_service.get_price_change(alert.product_id, days=1) if price_change and abs(price_change['change']) > 1.0: # 变化超过1%
has_changes = True
change_icon = "📈" if price_change['change'] > 0 else "📉"
message += f"{change_icon} {alert.product.title[:20]}...\n"
message += f" 变化: {price_change['change']:.1f}% | 当前: ¥{price_change['current_price']}\n\n"
if has_changes: self.wechat_service.send_message(user_id, message)
return True
except Exception as e:
logging.error(f"发送每日摘要失败: {e}") return False3.4 数据模型
# web/models/product_model.pyfrom utils.database import dbfrom datetime import datetimeclass Product(db.Model): """商品模型"""
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
product_id = db.Column(db.String(20), unique=True, nullable=False) # 京东商品ID
title = db.Column(db.Text, nullable=False) # 商品标题
shop = db.Column(db.String(200), nullable=False) # 店铺名称
comment_count = db.Column(db.Integer, default=0) # 评论数
image_url = db.Column(db.Text) # 图片URL
link = db.Column(db.Text) # 商品链接
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
def __repr__(self): return f'<Product {self.product_id}: {self.title[:20]}>'class PriceHistory(db.Model): """价格历史模型"""
__tablename__ = 'price_history'
id = db.Column(db.Integer, primary_key=True)
product_id = db.Column(db.String(20), db.ForeignKey('products.product_id'), nullable=False)
price = db.Column(db.Float, nullable=False) # 价格
timestamp = db.Column(db.DateTime, nullable=False) # 记录时间
# 关系
product = db.relationship('Product', backref=db.backref('price_history', lazy=True))
def __repr__(self): return f'<PriceHistory {self.product_id}: ¥{self.price} at {self.timestamp}>'class PriceAlert(db.Model): """价格预警模型"""
__tablename__ = 'price_alerts'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.String(100), nullable=False) # 微信用户ID
product_id = db.Column(db.String(20), db.ForeignKey('products.product_id'), nullable=False)
target_price = db.Column(db.Float, nullable=False) # 目标价格
created_at = db.Column(db.DateTime, default=datetime.now)
updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)
# 关系
product = db.relationship('Product', backref=db.backref('price_alerts', lazy=True))
def __repr__(self): return f'<PriceAlert {self.user_id} for {self.product_id}: ¥{self.target_price}>'四、部署配置
4.1 Docker配置
# docker-compose.ymlversion: '3.8'services: web: build: . ports: - "5000:5000" environment: - DEBUG=False - MYSQL_HOST=mysql - REDIS_URL=redis://redis:6379/0 depends_on: - mysql - redis volumes: - ./logs:/app/logs - ./qr_code.png:/app/qr_code.png restart: unless-stopped mysql: image: mysql:8.0 environment: - MYSQL_ROOT_PASSWORD=password - MYSQL_DATABASE=jd_price_bot volumes: - mysql_data:/var/lib/mysql restart: unless-stopped redis: image: redis:7-alpine restart: unless-stopped nginx: image: nginx:alpine ports: - "80:80" - "443:443" volumes: - ./nginx.conf:/etc/nginx/nginx.conf - ./ssl:/etc/nginx/ssl depends_on: - web restart: unless-stoppedvolumes: mysql_data:
4.2 Dockerfile
FROM python:3.11-slim WORKDIR /app# 安装系统依赖RUN apt-get update && apt-get install -y \ gcc \ g++ \ wget \ gnupg \ unzip \ && rm -rf /var/lib/apt/lists/*# 安装ChromeRUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \ && sh -c 'echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list' \ && apt-get update \ && apt-get install -y google-chrome-stable# 安装ChromeDriverRUN CHROMEDRIVER_VERSION=`curl -sS chromedriver.storage.googleapis.com/LATEST_RELEASE` \ && wget -O /tmp/chromedriver.zip http://chromedriver.storage.googleapis.com/$CHROMEDRIVER_VERSION/chromedriver_linux64.zip \ && unzip /tmp/chromedriver.zip -d /usr/local/bin/ \ && rm /tmp/chromedriver.zip# 复制依赖文件COPY requirements.txt .# 安装Python依赖RUN pip install --no-cache-dir -r requirements.txt# 复制应用代码COPY . .# 创建日志目录RUN mkdir -p logs# 暴露端口EXPOSE 5000# 启动命令CMD ["python", "web/app.py"]
4.3 Nginx配置
# nginx.confevents {
worker_connections 1024;
}
http {
upstream jd_price_bot {
server web:5000;
}
server { listen 80;
server_name your-domain.com;
# 重定向到HTTPS
return 301 https://$server_name$request_uri;
}
server { listen 443 ssl;
server_name your-domain.com;
ssl_certificate /etc/nginx/ssl/cert.pem;
ssl_certificate_key /etc/nginx/ssl/private.key;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
location / {
proxy_pass http://jd_price_bot;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# WebSocket支持
location /websocket {
proxy_pass http://jd_price_bot;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
}五、使用指南
5.1 启动应用
# 1. 克隆项目git clone https://github.com/your-username/jd-price-bot.gitcd jd-price-bot# 2. 配置环境变量cp .env.example .env# 编辑.env文件,配置数据库、微信等参数# 3. 启动服务docker-compose up -d# 4. 初始化数据库docker-compose exec web python -c "from utils.database import db, init_db; from web.app import app; init_db(app); db.create_all()"# 5. 扫描微信二维码登录# 查看生成的二维码文件open qr_code.png
5.2 微信交互示例
用户:帮助 机器人: 🤖 京东比价机器人使用指南: 🔍 搜索商品: 搜索 iPhone 15 search 笔记本电脑 💰 查询价格: 价格 100012345678 price 100012345678 📊 历史价格: 历史 100012345678 history 100012345678 🔔 价格预警: 预警 100012345678 5999 alert 100012345678 5999 ⭐ 收藏管理: 收藏 100012345678 收藏列表 📈 价格趋势图: 趋势 100012345678 trend 100012345678 用户:搜索 iPhone 15 机器人: 🔍 搜索“iPhone 15”结果:1. Apple iPhone 15 (A3092) 128GB 粉色 支持移动联通电信5G... 💰 ¥4999 | 👑 Apple产品京东自营旗舰店 📞 评论: 200000 🔗 ID: 1000123456782. Apple iPhone 15 (A3092) 256GB 蓝色 支持移动联通电信5G... 💰 ¥5999 | 👑 Apple产品京东自营旗舰店 📞 评论: 150000 🔗 ID: 100012345679 输入“价格 ID”查看详情,如:价格 100012345678 用户:价格 100012345678 机器人: 📱 Apple iPhone 15 (A3092) 128GB 粉色 支持移动联通电信5G... 💰 当前价格: ¥4999 📉 7天变化: -5.2% 🏪 店铺: Apple产品京东自营旗舰店 📞 评论: 200000 输入“历史 ID”查看价格趋势 用户:预警 100012345678 4500 机器人: ✅ 预警设置成功! 当价格 ≤ ¥4500 时会通知您
六、项目特点
6.1 技术亮点
- 智能爬虫:结合Scrapy+Selenium,支持动态页面渲染
- 实时监控:定时任务自动监控价格变化
- 微信集成:无缝对接微信,支持多种交互方式
- 数据可视化:生成价格趋势图,提供购买建议
- 高性能缓存:Redis缓存提升查询性能
6.2 应用场景
- 个人购物:实时比价,抓住最佳购买时机
- 电商运营:监控竞品价格,制定定价策略
- 数据分析:分析商品价格波动规律
- 智能推荐:基于价格趋势推荐购买时间
6.3 注意事项
- 合规使用:遵守京东Robots协议,控制爬取频率
- 用户隐私:妥善保管用户微信信息
- API限制:合理使用第三方服务,避免被封禁
- 数据安全:加密存储敏感信息
通过本项目,你可以:
- ✅ 掌握Scrapy爬虫开发技巧
- ✅ 实现Flask RESTful API开发
- ✅ 构建微信机器人应用
- ✅ 设计高性能数据存储方案
- ✅ 部署完整的Web应用系统
项目代码已包含完整的错误处理、日志记录、性能优化,可直接用于生产环境或作为学习参考。