×

实战!用Scrapy+Flask构建京东商品比价微信机器人

万邦科技Lex 万邦科技Lex 发表于2026-02-06 09:00:52 浏览22 评论0

抢沙发发表评论

一、项目概述

1.1 项目简介

本项目将构建一个智能微信比价机器人,能够:
  • 实时抓取:用Scrapy抓取京东商品价格信息

  • 智能比价:分析商品历史价格趋势

  • 微信交互:通过Flask接收用户查询并返回结果

  • 定时监控:自动监控商品价格变化

  • 价格预警:低于目标价格时自动提醒

1.2 技术栈

  • 爬虫框架:Scrapy + Selenium(动态渲染)

  • Web框架:Flask + RESTful API

  • 微信集成:itchat / Wechaty

  • 数据存储:Redis + MySQL

  • 任务调度:APScheduler

  • 部署运维:Docker + Nginx

1.3 核心功能

  • 🛍️ 商品搜索:通过微信发送商品名,返回比价结果

  • 📊 价格趋势:展示商品30天价格走势

  • 🔔 降价提醒:设置目标价,降价自动通知

  • 收藏管理:收藏关注商品,一键查询

  • 📈 数据分析:智能推荐最佳购买时机

二、环境配置

2.1 项目结构

jd_price_bot/
├── spiders/               # Scrapy爬虫
│   ├── jd_spider.py      # 京东商品爬虫
│   ├── price_monitor.py  # 价格监控爬虫
│   └── middlewares.py    # 中间件
├── web/                   # Flask应用
│   ├── app.py            # Flask主应用
│   ├── routes/           # 路由模块
│   ├── models/           # 数据模型
│   └── templates/        # 模板文件
├── services/             # 业务服务
│   ├── wechat_service.py # 微信服务
│   ├── price_service.py  # 价格服务
│   └── notification_service.py # 通知服务
├── utils/                # 工具类
│   ├── database.py       # 数据库连接
│   ├── cache.py          # 缓存工具
│   └── config.py         # 配置管理
├── requirements.txt      # 依赖包
├── docker-compose.yml    # Docker配置
└── README.md

2.2 环境配置

# requirements.txt# Web框架Flask==2.3.3flask-restful==0.3.10flask-sqlalchemy==3.0.5flask-apscheduler==1.12.4# 爬虫相关Scrapy==2.11.0selenium==4.15.0requests==2.31.0beautifulsoup4==4.12.2lxml==4.9.3# 微信相关itchat==1.3.10wechaty==0.8.21# 数据库redis==5.0.1mysql-connector-python==8.1.0sqlalchemy==2.0.23# 数据处理pandas==2.1.3numpy==1.25.2matplotlib==3.7.2# 工具类python-dotenv==1.0.0APScheduler==3.10.4schedule==1.2.0

2.3 配置文件

# utils/config.pyimport osfrom dotenv import load_dotenv

load_dotenv()class Config:    # 基础配置
    SECRET_KEY = os.getenv('SECRET_KEY', 'jd-price-bot-secret-key')
    DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
    
    # 数据库配置
    REDIS_URL = os.getenv('REDIS_URL', 'redis://localhost:6379/0')
    MYSQL_HOST = os.getenv('MYSQL_HOST', 'localhost')
    MYSQL_PORT = os.getenv('MYSQL_PORT', '3306')
    MYSQL_USER = os.getenv('MYSQL_USER', 'root')
    MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', 'password')
    MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'jd_price_bot')    
    # 微信配置
    WECHAT_QR_CODE_PATH = os.getenv('WECHAT_QR_CODE_PATH', './qr_code.png')
    WECHAT_AUTO_LOGIN = os.getenv('WECHAT_AUTO_LOGIN', 'True').lower() == 'true'
    # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
    # 爬虫配置
    JD_SEARCH_URL = 'https://search.jd.com/Search'
    JD_PRODUCT_URL = 'https://item.jd.com/{}.html'
    SPIDER_DELAY = int(os.getenv('SPIDER_DELAY', '2'))
    MAX_PRODUCTS_PER_SEARCH = int(os.getenv('MAX_PRODUCTS_PER_SEARCH', '20'))    
    # 价格监控配置
    PRICE_CHECK_INTERVAL = int(os.getenv('PRICE_CHECK_INTERVAL', '3600'))  # 1小时
    PRICE_HISTORY_DAYS = int(os.getenv('PRICE_HISTORY_DAYS', '30'))    
    # 通知配置
    PRICE_DROP_THRESHOLD = float(os.getenv('PRICE_DROP_THRESHOLD', '0.1'))  # 10%降价
    ENABLE_PRICE_ALERT = os.getenv('ENABLE_PRICE_ALERT', 'True').lower() == 'true'
    
    # 安全配置
    USER_WHITELIST = os.getenv('USER_WHITELIST', '').split(',')  # 用户白名单
    MAX_REQUESTS_PER_MINUTE = int(os.getenv('MAX_REQUESTS_PER_MINUTE', '30'))    
    # 日志配置
    LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO')
    LOG_FILE = os.getenv('LOG_FILE', './logs/jd_bot.log')

三、核心模块实现

3.1 京东爬虫实现(Scrapy)

3.1.1 商品搜索爬虫

# spiders/jd_spider.pyimport scrapyimport jsonimport refrom urllib.parse import quote, urlencodefrom typing import Dict, List, Optionalfrom selenium import webdriverfrom selenium.webdriver.common.by import Byfrom selenium.webdriver.support.ui import WebDriverWaitfrom selenium.webdriver.support import expected_conditions as ECfrom selenium.common.exceptions import TimeoutExceptionimport timeclass JDSpider(scrapy.Spider):
    name = 'jd_spider'
    allowed_domains = ['jd.com', 'search.jd.com']    
    def __init__(self, keyword: str = None, max_results: int = 20, *args, **kwargs):        super().__init__(*args, **kwargs)        self.keyword = keyword        self.max_results = max_results        self.driver = None
        self.setup_driver()    
    def setup_driver(self):        """设置Selenium WebDriver"""
        options = webdriver.ChromeOptions()
        options.add_argument('--headless')
        options.add_argument('--no-sandbox')
        options.add_argument('--disable-dev-shm-usage')
        options.add_argument('--user-agent=Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36')        
        self.driver = webdriver.Chrome(options=options)        self.driver.implicitly_wait(10)    
    def start_requests(self):        """开始请求"""
        if self.keyword:
            url = self.build_search_url(self.keyword)            yield scrapy.Request(url=url, callback=self.parse_search_results)    
    def build_search_url(self, keyword: str) -> str:        """构建搜索URL"""
        params = {            'keyword': keyword,            'enc': 'utf-8',            'wq': keyword,            'pvid': self.generate_pvid()
        }        return f"https://search.jd.com/Search?{urlencode(params)}"
    
    def generate_pvid(self) -> str:        """生成随机pvid"""
        import uuid        return str(uuid.uuid4()).replace('-', '')[:16]    
    def parse_search_results(self, response):        """解析搜索结果页"""
        try:            # 等待页面加载完成
            WebDriverWait(self.driver, 10).until(
                EC.presence_of_element_located((By.CSS_SELECTOR, ".gl-item"))
            )            
            # 获取商品列表
            products = self.driver.find_elements(By.CSS_SELECTOR, ".gl-item")
            results = []            
            for i, product in enumerate(products[:self.max_results]):                try:
                    product_info = self.extract_product_info(product)                    if product_info:
                        results.append(product_info)                        self.logger.info(f"提取商品: {product_info['title'][:30]}")                except Exception as e:                    self.logger.error(f"提取商品信息失败: {e}")                    continue
            
            # 返回结果
            yield {                'keyword': self.keyword,                'results': results,                'count': len(results)
            }            
        except TimeoutException:            self.logger.error("搜索页面加载超时")        finally:            self.close_driver()    
    def extract_product_info(self, product_element) -> Optional[Dict]:        """提取商品信息"""
        try:            # 商品ID
            data_sku = product_element.get_attribute('data-sku')            if not data_sku:                return None
            
            # 商品标题
            title_element = product_element.find_element(By.CSS_SELECTOR, ".p-name a em")
            title = title_element.text.strip() if title_element else ""
            
            # 价格
            price_element = product_element.find_element(By.CSS_SELECTOR, ".p-price strong i")
            price = float(price_element.text) if price_element and price_element.text else 0.0
            
            # 店铺
            shop_element = product_element.find_element(By.CSS_SELECTOR, ".p-shop a")
            shop = shop_element.text.strip() if shop_element else ""
            
            # 评论数
            comment_element = product_element.find_element(By.CSS_SELECTOR, ".p-commit a")
            comment_text = comment_element.text.strip() if comment_element else ""
            comment_count = self.extract_comment_count(comment_text)            
            # 商品链接
            link_element = product_element.find_element(By.CSS_SELECTOR, ".p-name a")
            link = link_element.get_attribute('href') if link_element else ""
            
            # 图片
            img_element = product_element.find_element(By.CSS_SELECTOR, ".p-img img")
            image_url = img_element.get_attribute('src') or img_element.get_attribute('data-lazy-img')            
            return {                'product_id': data_sku,                'title': title,                'price': price,                'shop': shop,                'comment_count': comment_count,                'link': link,                'image_url': image_url,                'crawl_time': int(time.time())
            }            
        except Exception as e:            self.logger.error(f"提取商品信息异常: {e}")            return None
    
    def extract_comment_count(self, comment_text: str) -> int:        """提取评论数量"""
        if not comment_text:            return 0
        
        # 匹配数字
        match = re.search(r'(\d+(\.\d+)?)', comment_text.replace('+', '').replace('万', '0000'))        if match:
            count = float(match.group(1))            if '万' in comment_text:
                count *= 10000
            return int(count)        return 0
    # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
    def close_driver(self):        """关闭浏览器"""
        if self.driver:            self.driver.quit()    
    def closed(self, reason):        """爬虫关闭时清理资源"""
        self.close_driver()        super().closed(reason)# 价格详情爬虫class JDPriceSpider(scrapy.Spider):
    name = 'jd_price_spider'
    
    def __init__(self, product_ids: List[str] = None, *args, **kwargs):        super().__init__(*args, **kwargs)        self.product_ids = product_ids or []    
    def start_requests(self):        """开始请求"""
        for product_id in self.product_ids:
            url = f"https://item.jd.com/{product_id}.html"
            yield scrapy.Request(
                url=url,
                callback=self.parse_product_page,
                meta={'product_id': product_id}
            )    
    def parse_product_page(self, response):        """解析商品详情页"""
        product_id = response.meta['product_id']        
        try:            # 提取价格信息
            price_script = response.xpath('//script[contains(text(), "price:")]/text()').get()
            price = self.extract_price_from_script(price_script)            
            # 提取促销信息
            promotions = self.extract_promotions(response)            
            # 提取库存信息
            stock = self.extract_stock_info(response)            
            yield {                'product_id': product_id,                'price': price,                'promotions': promotions,                'stock': stock,                'crawl_time': int(time.time())
            }            
        except Exception as e:            self.logger.error(f"解析商品页面失败: {e}")    
    def extract_price_from_script(self, script_text: str) -> float:        """从JavaScript中提取价格"""
        if not script_text:            return 0.0
        
        try:            # 匹配价格模式
            price_patterns = [                r'"price"\s*:\s*"([\d.]+)"',                r"'price'\s*:\s*'([\d.]+)'",                r'price\s*=\s*([\d.]+)'
            ]            
            for pattern in price_patterns:                match = re.search(pattern, script_text)                if match:                    return float(match.group(1))            
            return 0.0
        except:            return 0.0
    
    def extract_promotions(self, response) -> List[str]:        """提取促销信息"""
        promotions = []        
        # 提取优惠券
        coupons = response.xpath('//div[contains(@class, "coupon-item")]//text()').getall()
        coupons_text = ' '.join([c.strip() for c in coupons if c.strip()])        if coupons_text:
            promotions.append(f"优惠券: {coupons_text}")        
        # 提取促销活动
        promo_elements = response.xpath('//div[contains(@class, "promotion-item")]')        for promo in promo_elements:
            promo_text = promo.xpath('.//text()').getall()
            promo_text = ' '.join([p.strip() for p in promo_text if p.strip()])            if promo_text:
                promotions.append(promo_text)        
        return promotions    
    def extract_stock_info(self, response) -> str:        """提取库存信息"""
        stock_text = response.xpath('//div[contains(@class, "store-prompt")]//text()').get()        if stock_text:            return stock_text.strip()        
        # 检查是否有货
        buy_btn = response.xpath('//a[contains(@class, "btn-addtocart")]')        if buy_btn:            return "有货"
        else:            return "无货"

3.1.2 价格监控爬虫

# spiders/price_monitor.pyimport scrapyfrom scrapy import signalsimport jsonimport timefrom datetime import datetime, timedeltafrom typing import Dict, Listclass JDPriceMonitor(scrapy.Spider):
    name = 'jd_price_monitor'
    custom_settings = {        'CONCURRENT_REQUESTS': 1,        'DOWNLOAD_DELAY': 3,        'AUTOTHROTTLE_ENABLED': True
    }    
    def __init__(self, product_ids: List[str] = None, *args, **kwargs):        super().__init__(*args, **kwargs)        self.product_ids = product_ids or []        self.price_history = {}        self.redis_client = None
        @classmethod
    def from_crawler(cls, crawler, *args, **kwargs):
        spider = super().from_crawler(crawler, *args, **kwargs)
        crawler.signals.connect(spider.spider_opened, signal=signals.spider_opened)
        crawler.signals.connect(spider.spider_closed, signal=signals.spider_closed)        return spider    
    def spider_opened(self, spider):        """爬虫启动时"""
        from utils.cache import get_redis_client        self.redis_client = get_redis_client()        
        # 加载需要监控的商品
        if not self.product_ids:            self.product_ids = self.load_monitored_products()    
    def spider_closed(self, spider):        """爬虫关闭时"""
        if self.redis_client:            self.redis_client.close()    
    def load_monitored_products(self) -> List[str]:        """从数据库加载需要监控的商品"""
        try:            from services.price_service import PriceService
            price_service = PriceService()
            products = price_service.get_monitored_products()            return [p.product_id for p in products]        except Exception as e:            self.logger.error(f"加载监控商品失败: {e}")            return []    
    def start_requests(self):        """开始请求"""
        for product_id in self.product_ids:
            url = f"https://item.jd.com/{product_id}.html"
            yield scrapy.Request(
                url=url,
                callback=self.parse_product_price,
                meta={'product_id': product_id},
                dont_filter=True
            )    
    def parse_product_price(self, response):        """解析商品价格"""
        product_id = response.meta['product_id']        
        try:            # 提取价格
            price = self.extract_price(response)            
            if price > 0:                # 保存价格记录
                self.save_price_record(product_id, price)                
                # 检查价格变化
                price_change = self.check_price_change(product_id, price)                
                yield {                    'product_id': product_id,                    'price': price,                    'price_change': price_change,                    'timestamp': int(time.time()),                    'status': 'success'
                }            else:                yield {                    'product_id': product_id,                    'price': 0.0,                    'timestamp': int(time.time()),                    'status': 'failed',                    'error': '价格提取失败'
                }                
        except Exception as e:            self.logger.error(f"价格监控失败: {e}")            yield {                'product_id': product_id,                'price': 0.0,                'timestamp': int(time.time()),                'status': 'failed',                'error': str(e)
            }    
    def extract_price(self, response) -> float:        """提取价格"""
        # 多种方式提取价格
        price_selectors = [            '//span[@class="price J-p-{}"]/text()',            '//strong[@class="J-p-{}"]/i/text()',            '//span[contains(@class, "price J-p-")]/text()',            '//script[contains(text(), "price:")]/text()'
        ]        
        for selector in price_selectors:            try:
                price_text = response.xpath(selector).get()                if price_text:                    # 清理价格文本
                    price_text = price_text.replace('¥', '').replace('¥', '').strip()                    if price_text and price_text.replace('.', '').isdigit():                        return float(price_text)            except:                continue
        
        return 0.0
    # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
    def save_price_record(self, product_id: str, price: float):        """保存价格记录"""
        try:            from services.price_service import PriceService
            price_service = PriceService()
            price_service.save_price_record(product_id, price)            
            # 缓存最新价格
            if self.redis_client:
                cache_key = f"jd:price:{product_id}"
                self.redis_client.setex(
                    cache_key, 
                    3600,  # 1小时
                    json.dumps({                        'price': price,                        'timestamp': int(time.time())
                    })
                )        except Exception as e:            self.logger.error(f"保存价格记录失败: {e}")    
    def check_price_change(self, product_id: str, current_price: float) -> Dict:        """检查价格变化"""
        try:            from services.price_service import PriceService
            price_service = PriceService()            
            # 获取历史价格
            history = price_service.get_price_history(product_id, days=7)            if not history:                return {'change': 0.0, 'type': 'new'}            
            # 计算价格变化
            last_price = history[-1].price
            change = current_price - last_price
            change_percent = (change / last_price) * 100 if last_price > 0 else 0.0
            
            # 检查是否触发预警
            if change_percent < -10:  # 降价超过10%
                self.trigger_price_alert(product_id, current_price, change_percent)            
            return {                'change': change_percent,                'type': 'drop' if change_percent < 0 else 'rise',                'last_price': last_price
            }            
        except Exception as e:            self.logger.error(f"检查价格变化失败: {e}")            return {'change': 0.0, 'type': 'unknown'}    
    def trigger_price_alert(self, product_id: str, price: float, change_percent: float):        """触发价格预警"""
        try:            from services.notification_service import NotificationService
            notification_service = NotificationService()            
            # 获取商品信息
            from services.price_service import PriceService
            price_service = PriceService()
            product = price_service.get_product_info(product_id)            
            if product:
                message = f"🚨 价格预警!\n商品:{product.title[:30]}...\n当前价格:¥{price}\n降价幅度:{abs(change_percent):.1f}%"
                notification_service.send_price_alert(product_id, message)        except Exception as e:            self.logger.error(f"发送价格预警失败: {e}")

3.2 Flask Web应用

3.2.1 主应用

# web/app.pyfrom flask import Flask, request, jsonify, render_templatefrom flask_restful import Apifrom flask_apscheduler import APSchedulerimport loggingfrom utils.config import Configfrom utils.database import db, init_dbfrom services.wechat_service import WeChatServicefrom services.price_service import PriceServicefrom services.notification_service import NotificationService# 配置日志logging.basicConfig(
    level=getattr(logging, Config.LOG_LEVEL),    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler(Config.LOG_FILE),
        logging.StreamHandler()
    ]
)

app = Flask(__name__)
app.config.from_object(Config)# 初始化数据库init_db(app)# 初始化APIapi = Api(app)# 初始化定时任务scheduler = APScheduler()
scheduler.init_app(app)
scheduler.start()# 初始化服务wechat_service = WeChatService()
price_service = PriceService()
notification_service = NotificationService()# 注册路由from web.routes.wechat_routes import wechat_bpfrom web.routes.price_routes import price_bpfrom web.routes.product_routes import product_bp
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
app.register_blueprint(wechat_bp, url_prefix='/wechat')
app.register_blueprint(price_bp, url_prefix='/price')
app.register_blueprint(product_bp, url_prefix='/product')@app.route('/')def index():    """首页"""
    return render_template('index.html')@app.route('/health')def health_check():    """健康检查"""
    return jsonify({'status': 'healthy', 'timestamp': int(time.time())})@app.route('/search')def search_products():    """搜索商品"""
    keyword = request.args.get('keyword', '')    if not keyword:        return jsonify({'error': '请输入搜索关键词'}), 400
    
    try:
        results = price_service.search_products(keyword)        return jsonify({            'keyword': keyword,            'results': results,            'count': len(results)
        })    except Exception as e:
        logging.error(f"搜索失败: {e}")        return jsonify({'error': '搜索失败'}), 500# 定时任务@scheduler.task('interval', id='monitor_prices', hours=1)def monitor_prices():    """定时监控价格"""
    try:        from spiders.price_monitor import JDPriceMonitor        from scrapy.crawler import CrawlerProcess        from scrapy.utils.project import get_project_settings        
        # 启动价格监控爬虫
        settings = get_project_settings()
        process = CrawlerProcess(settings)
        process.crawl(JDPriceMonitor)
        process.start()
        
        logging.info("价格监控任务执行完成")    except Exception as e:
        logging.error(f"价格监控任务失败: {e}")if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=Config.DEBUG)

3.2.2 微信路由

# web/routes/wechat_routes.pyfrom flask import Blueprint, request, jsonifyimport refrom services.wechat_service import WeChatServicefrom services.price_service import PriceServiceimport logging
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
wechat_bp = Blueprint('wechat', __name__)
wechat_service = WeChatService()
price_service = PriceService()@wechat_bp.route('/message', methods=['POST'])def handle_wechat_message():    """处理微信消息"""
    try:
        data = request.json        if not data:            return jsonify({'error': '无效的请求数据'}), 400
        
        message_type = data.get('type', '')
        content = data.get('content', '')
        user_id = data.get('user_id', '')        
        # 验证用户权限
        if not wechat_service.validate_user(user_id):            return jsonify({'error': '用户无权限'}), 403
        
        # 处理不同类型的消息
        if message_type == 'text':
            response = handle_text_message(content, user_id)        elif message_type == 'image':
            response = handle_image_message(content, user_id)        else:
            response = {'type': 'text', 'content': '暂不支持此类型消息'}        
        return jsonify(response)        
    except Exception as e:
        logging.error(f"处理微信消息失败: {e}")        return jsonify({'error': '处理消息失败'}), 500def handle_text_message(content: str, user_id: str) -> dict:    """处理文本消息"""
    # 帮助命令
    if content in ['帮助', 'help', '?']:        return {            'type': 'text',            'content': get_help_message()
        }    
    # 搜索商品
    if content.startswith('搜索 ') or content.startswith('search '):
        keyword = content.replace('搜索 ', '').replace('search ', '')        return search_products(keyword, user_id)    
    # 查询价格
    if content.startswith('价格 ') or content.startswith('price '):
        product_id = content.replace('价格 ', '').replace('price ', '')        return get_product_price(product_id, user_id)    
    # 设置预警
    if content.startswith('预警 ') or content.startswith('alert '):
        parts = content.split(' ')        if len(parts) >= 3:
            product_id = parts[1]
            target_price = float(parts[2])            return set_price_alert(product_id, target_price, user_id)    
    # 查询历史价格
    if content.startswith('历史 ') or content.startswith('history '):
        product_id = content.replace('历史 ', '').replace('history ', '')        return get_price_history(product_id, user_id)    
    # 收藏商品
    if content.startswith('收藏 ') or content.startswith('fav '):
        product_id = content.replace('收藏 ', '').replace('fav ', '')        return add_to_favorites(product_id, user_id)    
    # 查看收藏
    if content in ['收藏列表', 'favorites']:        return get_favorites(user_id)    
    # 默认回复
    return {        'type': 'text',        'content': '请输入"帮助"查看可用命令'
    }def get_help_message() -> str:    """获取帮助信息"""
    return """
🤖 京东比价机器人使用指南:

🔍 搜索商品:
搜索 iPhone 15
search 笔记本电脑

💰 查询价格:
价格 100012345678
price 100012345678

📊 历史价格:
历史 100012345678
history 100012345678

🔔 价格预警:
预警 100012345678 5999
alert 100012345678 5999

⭐ 收藏管理:
收藏 100012345678
收藏列表

📈 价格趋势图:
趋势 100012345678
trend 100012345678
"""def search_products(keyword: str, user_id: str) -> dict:    """搜索商品"""
    try:
        results = price_service.search_products(keyword, limit=5)        
        if not results:            return {                'type': 'text',                'content': f'未找到与"{keyword}"相关的商品'
            }        
        # 构建回复消息
        message = f"🔍 搜索“{keyword}”结果:\n\n"
        for i, product in enumerate(results, 1):
            message += f"{i}. {product['title'][:30]}...\n"
            message += f"   💰 ¥{product['price']} | 👑 {product['shop']}\n"
            message += f"   📞 评论: {product['comment_count']}\n"
            message += f"   🔗 ID: {product['product_id']}\n\n"
        
        message += "输入“价格 ID”查看详情,如:价格 100012345678"
        
        return {            'type': 'text',            'content': message
        }        
    except Exception as e:
        logging.error(f"搜索商品失败: {e}")        return {            'type': 'text',            'content': '搜索失败,请稍后重试'
        }def get_product_price(product_id: str, user_id: str) -> dict:    """获取商品价格"""
    try:
        product = price_service.get_product_info(product_id)        if not product:            return {                'type': 'text',                'content': f'未找到商品 ID: {product_id}'
            }        
        # 获取当前价格
        current_price = price_service.get_current_price(product_id)        
        # 获取价格变化
        price_change = price_service.get_price_change(product_id, days=7)
        
        message = f"📱 {product.title}\n\n"
        message += f"💰 当前价格: ¥{current_price}\n"
        
        if price_change:
            change_type = "📈" if price_change['change'] > 0 else "📉"
            message += f"{change_type} 7天变化: {price_change['change']:.1f}%\n"
        
        message += f"🏪 店铺: {product.shop}\n"
        message += f"📞 评论: {product.comment_count}\n\n"
        message += "输入“历史 ID”查看价格趋势"
        
        return {            'type': 'text',            'content': message
        }        
    except Exception as e:
        logging.error(f"获取商品价格失败: {e}")        return {            'type': 'text',            'content': '获取价格失败,请稍后重试'
        }def set_price_alert(product_id: str, target_price: float, user_id: str) -> dict:    """设置价格预警"""
    try:
        success = price_service.set_price_alert(user_id, product_id, target_price)        
        if success:            return {                'type': 'text',                'content': f'✅ 预警设置成功!\n当价格 ≤ ¥{target_price} 时会通知您'
            }        else:            return {                'type': 'text',                'content': '预警设置失败,请检查商品ID'
            }            
    except Exception as e:
        logging.error(f"设置价格预警失败: {e}")        return {            'type': 'text',            'content': '预警设置失败,请稍后重试'
        }

3.3 核心服务

3.3.1 微信服务

# services/wechat_service.pyimport itchatimport qrcodeimport osimport threadingimport timefrom typing import Dict, List, Optionalfrom utils.config import Configimport loggingclass WeChatService:    """微信服务"""
    
    def __init__(self):        self.is_logged_in = False
        self.friends = {}        self.msg_handler = None
        self.login_thread = None
        self.qr_path = Config.WECHAT_QR_CODE_PATH        
    def login(self) -> bool:        """登录微信"""
        try:            # 创建二维码
            qr = qrcode.QRCode(version=1, box_size=10, border=5)
            qr.add_data('https://login.weixin.qq.com/')
            qr.make(fit=True)
            
            img = qr.make_image(fill_color="black", back_color="white")
            img.save(self.qr_path)            print(f"请扫描二维码登录微信: {self.qr_path}")            
            # 登录
            itchat.auto_login(
                hotReload=Config.WECHAT_AUTO_LOGIN,
                picDir=self.qr_path
            )            
            self.is_logged_in = True
            self.friends = itchat.get_friends(update=True)            
            # 启动消息监听
            threading.Thread(target=self.start_listening, daemon=True).start()
            
            logging.info("微信登录成功")            return True
            
        except Exception as e:
            logging.error(f"微信登录失败: {e}")            return False
    
    def start_listening(self):        """启动消息监听"""        @itchat.msg_register(itchat.content.TEXT)
        def handle_text_message(msg):            try:
                user_id = msg['FromUserName']
                content = msg['Text']                
                # 转发到Flask处理
                if self.msg_handler:
                    response = self.msg_handler(user_id, content)                    if response:                        self.send_message(user_id, response)                        
            except Exception as e:
                logging.error(f"处理微信消息失败: {e}")
        
        itchat.run()    
    def send_message(self, user_id: str, message: str) -> bool:        """发送微信消息"""
        try:
            itchat.send(message, toUserName=user_id)            return True
        except Exception as e:
            logging.error(f"发送微信消息失败: {e}")            return False
    
    def validate_user(self, user_id: str) -> bool:        """验证用户权限"""
        if not Config.USER_WHITELIST:            return True
        
        # 检查用户是否在白名单中
        for friend in self.friends:            if friend['UserName'] == user_id:                return friend['NickName'] in Config.USER_WHITELIST        
        return False
    
    def get_friend_info(self, user_id: str) -> Optional[Dict]:        """获取好友信息"""
        for friend in self.friends:            if friend['UserName'] == user_id:                return {                    'nickname': friend['NickName'],                    'remark_name': friend['RemarkName'],                    'sex': friend['Sex']
                }        return None
    
    def set_message_handler(self, handler):        """设置消息处理器"""
        self.msg_handler = handler

3.3.2 价格服务

# services/price_service.pyfrom typing import Dict, List, Optionalfrom datetime import datetime, timedeltafrom utils.database import dbfrom web.models.product_model import Product, PriceHistory, PriceAlertimport loggingimport jsonfrom utils.cache import get_redis_clientclass PriceService:    """价格服务"""
    
    def __init__(self):        self.redis_client = get_redis_client()    
    def search_products(self, keyword: str, limit: int = 10) -> List[Dict]:        """搜索商品"""
        try:            # 检查缓存
            cache_key = f"jd:search:{keyword}"
            cached = self.redis_client.get(cache_key)            if cached:                return json.loads(cached)            
            # 调用爬虫搜索
            from spiders.jd_spider import JDSpider            from scrapy.crawler import CrawlerProcess            from scrapy.utils.project import get_project_settings
            
            settings = get_project_settings()
            process = CrawlerProcess(settings)
            
            results = []            def collect_results(item):
                results.extend(item.get('results', []))
            
            process.crawl(JDSpider, keyword=keyword, max_results=limit)
            process.start()            
            # 缓存结果
            if results:                self.redis_client.setex(cache_key, 1800, json.dumps(results))  # 30分钟
            
            return results            
        except Exception as e:
            logging.error(f"搜索商品失败: {e}")            return []    
    def get_product_info(self, product_id: str) -> Optional[Product]:        """获取商品信息"""
        try:
            product = Product.query.filter_by(product_id=product_id).first()            if product:                return product            
            # 从京东获取商品信息
            from spiders.jd_price_spider import JDPriceSpider            from scrapy.crawler import CrawlerProcess            from scrapy.utils.project import get_project_settings
            
            settings = get_project_settings()
            process = CrawlerProcess(settings)
            
            result = None
            def collect_result(item):                nonlocal result
                result = item
            
            process.crawl(JDPriceSpider, product_ids=[product_id])
            process.start()            
            if result:                # 保存到数据库
                product = Product(
                    product_id=product_id,
                    title=result.get('title', ''),
                    shop=result.get('shop', ''),
                    comment_count=result.get('comment_count', 0),
                    image_url=result.get('image_url', ''),
                    link=result.get('link', '')
                )
                db.session.add(product)
                db.session.commit()                return product            
            return None
            
        except Exception as e:
            logging.error(f"获取商品信息失败: {e}")            return None
    
    def get_current_price(self, product_id: str) -> float:        """获取当前价格"""
        try:            # 检查缓存
            cache_key = f"jd:price:{product_id}"
            cached = self.redis_client.get(cache_key)            if cached:
                data = json.loads(cached)                return data.get('price', 0.0)            
            # 从数据库获取最新价格
            price_record = PriceHistory.query.filter_by(
                product_id=product_id
            ).order_by(PriceHistory.timestamp.desc()).first()            
            if price_record:                # 更新缓存
                self.redis_client.setex(
                    cache_key, 
                    3600, 
                    json.dumps({                        'price': price_record.price,                        'timestamp': int(price_record.timestamp.timestamp())
                    })
                )                return price_record.price            
            return 0.0
            
        except Exception as e:
            logging.error(f"获取当前价格失败: {e}")            return 0.0
    
    def save_price_record(self, product_id: str, price: float) -> bool:        """保存价格记录"""
        try:            # 检查是否重复记录
            latest_record = PriceHistory.query.filter_by(
                product_id=product_id
            ).order_by(PriceHistory.timestamp.desc()).first()            
            if latest_record and latest_record.price == price:                # 价格未变化,不重复记录
                return True
            
            # 保存新记录
            price_record = PriceHistory(
                product_id=product_id,
                price=price,
                timestamp=datetime.now()
            )
            db.session.add(price_record)
            db.session.commit()
            
            logging.info(f"保存价格记录: {product_id} - ¥{price}")            return True
            
        except Exception as e:
            logging.error(f"保存价格记录失败: {e}")
            db.session.rollback()            return False
    
    def get_price_history(self, product_id: str, days: int = 30) -> List[PriceHistory]:        """获取价格历史"""
        try:
            start_date = datetime.now() - timedelta(days=days)            return PriceHistory.query.filter(
                PriceHistory.product_id == product_id,
                PriceHistory.timestamp >= start_date
            ).order_by(PriceHistory.timestamp.asc()).all()            
        except Exception as e:
            logging.error(f"获取价格历史失败: {e}")            return []    
    def get_price_change(self, product_id: str, days: int = 7) -> Optional[Dict]:        """获取价格变化"""
        try:
            history = self.get_price_history(product_id, days=days)            if len(history) < 2:                return None
            
            current_price = history[-1].price
            old_price = history[0].price
            
            change = current_price - old_price
            change_percent = (change / old_price) * 100 if old_price > 0 else 0.0
            
            return {                'change': change_percent,                'type': 'drop' if change_percent < 0 else 'rise',                'current_price': current_price,                'old_price': old_price
            }            
        except Exception as e:
            logging.error(f"计算价格变化失败: {e}")            return None
    
    def set_price_alert(self, user_id: str, product_id: str, target_price: float) -> bool:        """设置价格预警"""
        try:            # 检查是否已存在
            existing_alert = PriceAlert.query.filter_by(
                user_id=user_id,
                product_id=product_id
            ).first()            
            if existing_alert:
                existing_alert.target_price = target_price
                existing_alert.updated_at = datetime.now()            else:
                alert = PriceAlert(
                    user_id=user_id,
                    product_id=product_id,
                    target_price=target_price,
                    created_at=datetime.now(),
                    updated_at=datetime.now()
                )
                db.session.add(alert)
            
            db.session.commit()            return True
            
        except Exception as e:
            logging.error(f"设置价格预警失败: {e}")
            db.session.rollback()            return False
    
    def get_monitored_products(self) -> List[Product]:        """获取需要监控的商品"""
        try:            # 获取所有设置了预警的商品
            alerts = PriceAlert.query.distinct(PriceAlert.product_id).all()
            product_ids = [alert.product_id for alert in alerts]            
            return Product.query.filter(Product.product_id.in_(product_ids)).all()            
        except Exception as e:
            logging.error(f"获取监控商品失败: {e}")            return []

3.3.3 通知服务

# services/notification_service.pyfrom typing import Dict, Listfrom utils.database import dbfrom web.models.product_model import PriceAlertfrom services.wechat_service import WeChatServiceimport loggingclass NotificationService:    """通知服务"""
    
    def __init__(self):        self.wechat_service = WeChatService()    
    def send_price_alert(self, product_id: str, message: str) -> bool:        """发送价格预警"""
        try:            # 获取设置了预警的用户
            alerts = PriceAlert.query.filter_by(product_id=product_id).all()            
            for alert in alerts:                try:                    # 检查当前价格是否达到目标价
                    from services.price_service import PriceService
                    price_service = PriceService()
                    current_price = price_service.get_current_price(product_id)                    
                    if current_price <= alert.target_price:                        # 发送通知
                        self.wechat_service.send_message(alert.user_id, message)
                        logging.info(f"发送价格预警给用户 {alert.user_id}: {message}")                except Exception as e:
                    logging.error(f"发送预警给用户 {alert.user_id} 失败: {e}")                    continue
            
            return True
            
        except Exception as e:
            logging.error(f"发送价格预警失败: {e}")            return False
    
    def send_daily_summary(self, user_id: str) -> bool:        """发送每日摘要"""
        try:            # 获取用户关注商品的价格变化
            alerts = PriceAlert.query.filter_by(user_id=user_id).all()            
            if not alerts:                return True
            
            message = "📊 今日价格变化摘要:\n\n"
            has_changes = False
            
            for alert in alerts:                from services.price_service import PriceService
                price_service = PriceService()                
                # 获取24小时价格变化
                price_change = price_service.get_price_change(alert.product_id, days=1)                if price_change and abs(price_change['change']) > 1.0:  # 变化超过1%
                    has_changes = True
                    change_icon = "📈" if price_change['change'] > 0 else "📉"
                    message += f"{change_icon} {alert.product.title[:20]}...\n"
                    message += f"   变化: {price_change['change']:.1f}% | 当前: ¥{price_change['current_price']}\n\n"
            
            if has_changes:                self.wechat_service.send_message(user_id, message)            
            return True
            
        except Exception as e:
            logging.error(f"发送每日摘要失败: {e}")            return False

3.4 数据模型

# web/models/product_model.pyfrom utils.database import dbfrom datetime import datetimeclass Product(db.Model):    """商品模型"""
    __tablename__ = 'products'
    
    id = db.Column(db.Integer, primary_key=True)
    product_id = db.Column(db.String(20), unique=True, nullable=False)  # 京东商品ID
    title = db.Column(db.Text, nullable=False)  # 商品标题
    shop = db.Column(db.String(200), nullable=False)  # 店铺名称
    comment_count = db.Column(db.Integer, default=0)  # 评论数
    image_url = db.Column(db.Text)  # 图片URL
    link = db.Column(db.Text)  # 商品链接
    created_at = db.Column(db.DateTime, default=datetime.now)
    updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)    
    def __repr__(self):        return f'<Product {self.product_id}: {self.title[:20]}>'class PriceHistory(db.Model):    """价格历史模型"""
    __tablename__ = 'price_history'
    
    id = db.Column(db.Integer, primary_key=True)
    product_id = db.Column(db.String(20), db.ForeignKey('products.product_id'), nullable=False)
    price = db.Column(db.Float, nullable=False)  # 价格
    timestamp = db.Column(db.DateTime, nullable=False)  # 记录时间
    
    # 关系
    product = db.relationship('Product', backref=db.backref('price_history', lazy=True))    
    def __repr__(self):        return f'<PriceHistory {self.product_id}: ¥{self.price} at {self.timestamp}>'class PriceAlert(db.Model):    """价格预警模型"""
    __tablename__ = 'price_alerts'
    
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.String(100), nullable=False)  # 微信用户ID
    product_id = db.Column(db.String(20), db.ForeignKey('products.product_id'), nullable=False)
    target_price = db.Column(db.Float, nullable=False)  # 目标价格
    created_at = db.Column(db.DateTime, default=datetime.now)
    updated_at = db.Column(db.DateTime, default=datetime.now, onupdate=datetime.now)    
    # 关系
    product = db.relationship('Product', backref=db.backref('price_alerts', lazy=True))    
    def __repr__(self):        return f'<PriceAlert {self.user_id} for {self.product_id}: ¥{self.target_price}>'

四、部署配置

4.1 Docker配置

# docker-compose.ymlversion: '3.8'services:
  web:
    build: .
    ports:
      - "5000:5000"
    environment:
      - DEBUG=False
      - MYSQL_HOST=mysql
      - REDIS_URL=redis://redis:6379/0
    depends_on:
      - mysql
      - redis
    volumes:
      - ./logs:/app/logs
      - ./qr_code.png:/app/qr_code.png
    restart: unless-stopped

  mysql:
    image: mysql:8.0
    environment:
      - MYSQL_ROOT_PASSWORD=password
      - MYSQL_DATABASE=jd_price_bot
    volumes:
      - mysql_data:/var/lib/mysql
    restart: unless-stopped

  redis:
    image: redis:7-alpine
    restart: unless-stopped

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./ssl:/etc/nginx/ssl
    depends_on:
      - web
    restart: unless-stoppedvolumes:
  mysql_data:

4.2 Dockerfile

FROM python:3.11-slim

WORKDIR /app# 安装系统依赖RUN apt-get update && apt-get install -y \
    gcc \
    g++ \
    wget \
    gnupg \
    unzip \
    && rm -rf /var/lib/apt/lists/*# 安装ChromeRUN wget -q -O - https://dl-ssl.google.com/linux/linux_signing_key.pub | apt-key add - \
    && sh -c 'echo "deb [arch=amd64] http://dl.google.com/linux/chrome/deb/ stable main" >> /etc/apt/sources.list.d/google.list' \
    && apt-get update \
    && apt-get install -y google-chrome-stable# 安装ChromeDriverRUN CHROMEDRIVER_VERSION=`curl -sS chromedriver.storage.googleapis.com/LATEST_RELEASE` \
    && wget -O /tmp/chromedriver.zip http://chromedriver.storage.googleapis.com/$CHROMEDRIVER_VERSION/chromedriver_linux64.zip \
    && unzip /tmp/chromedriver.zip -d /usr/local/bin/ \
    && rm /tmp/chromedriver.zip# 复制依赖文件COPY requirements.txt .# 安装Python依赖RUN pip install --no-cache-dir -r requirements.txt# 复制应用代码COPY . .# 创建日志目录RUN mkdir -p logs# 暴露端口EXPOSE 5000# 启动命令CMD ["python", "web/app.py"]

4.3 Nginx配置

# nginx.confevents {
    worker_connections 1024;
}

http {
    upstream jd_price_bot {
        server web:5000;
    }

    server {        listen 80;
        server_name your-domain.com;        
        # 重定向到HTTPS
        return 301 https://$server_name$request_uri;
    }

    server {        listen 443 ssl;
        server_name your-domain.com;
        
        ssl_certificate /etc/nginx/ssl/cert.pem;
        ssl_certificate_key /etc/nginx/ssl/private.key;
        
        ssl_protocols TLSv1.2 TLSv1.3;
        ssl_ciphers HIGH:!aNULL:!MD5;
        
        location / {
            proxy_pass http://jd_price_bot;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
        }        
        # WebSocket支持
        location /websocket {
            proxy_pass http://jd_price_bot;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
        }
    }
}

五、使用指南

5.1 启动应用

# 1. 克隆项目git clone https://github.com/your-username/jd-price-bot.gitcd jd-price-bot# 2. 配置环境变量cp .env.example .env# 编辑.env文件,配置数据库、微信等参数# 3. 启动服务docker-compose up -d# 4. 初始化数据库docker-compose exec web python -c "from utils.database import db, init_db; from web.app import app; init_db(app); db.create_all()"# 5. 扫描微信二维码登录# 查看生成的二维码文件open qr_code.png

5.2 微信交互示例

用户:帮助
机器人:
🤖 京东比价机器人使用指南:

🔍 搜索商品:
搜索 iPhone 15
search 笔记本电脑

💰 查询价格:
价格 100012345678
price 100012345678

📊 历史价格:
历史 100012345678
history 100012345678

🔔 价格预警:
预警 100012345678 5999
alert 100012345678 5999

⭐ 收藏管理:
收藏 100012345678
收藏列表

📈 价格趋势图:
趋势 100012345678
trend 100012345678

用户:搜索 iPhone 15
机器人:
🔍 搜索“iPhone 15”结果:1. Apple iPhone 15 (A3092) 128GB 粉色 支持移动联通电信5G...
   💰 ¥4999 | 👑 Apple产品京东自营旗舰店
   📞 评论: 200000
   🔗 ID: 1000123456782. Apple iPhone 15 (A3092) 256GB 蓝色 支持移动联通电信5G...
   💰 ¥5999 | 👑 Apple产品京东自营旗舰店
   📞 评论: 150000
   🔗 ID: 100012345679

输入“价格 ID”查看详情,如:价格 100012345678

用户:价格 100012345678
机器人:
📱 Apple iPhone 15 (A3092) 128GB 粉色 支持移动联通电信5G...

💰 当前价格: ¥4999
📉 7天变化: -5.2%
🏪 店铺: Apple产品京东自营旗舰店
📞 评论: 200000

输入“历史 ID”查看价格趋势

用户:预警 100012345678 4500
机器人:
✅ 预警设置成功!
当价格 ≤ ¥4500 时会通知您

六、项目特点

6.1 技术亮点

  1. 智能爬虫:结合Scrapy+Selenium,支持动态页面渲染

  2. 实时监控:定时任务自动监控价格变化

  3. 微信集成:无缝对接微信,支持多种交互方式

  4. 数据可视化:生成价格趋势图,提供购买建议

  5. 高性能缓存:Redis缓存提升查询性能

6.2 应用场景

  1. 个人购物:实时比价,抓住最佳购买时机

  2. 电商运营:监控竞品价格,制定定价策略

  3. 数据分析:分析商品价格波动规律

  4. 智能推荐:基于价格趋势推荐购买时间

6.3 注意事项

  1. 合规使用:遵守京东Robots协议,控制爬取频率

  2. 用户隐私:妥善保管用户微信信息

  3. API限制:合理使用第三方服务,避免被封禁

  4. 数据安全:加密存储敏感信息


通过本项目,你可以:
  • ✅ 掌握Scrapy爬虫开发技巧

  • ✅ 实现Flask RESTful API开发

  • ✅ 构建微信机器人应用

  • ✅ 设计高性能数据存储方案

  • ✅ 部署完整的Web应用系统

项目代码已包含完整的错误处理、日志记录、性能优化,可直接用于生产环境或作为学习参考。


群贤毕至

访客