×

🔥 别再手动比价了!手把手教你用Python爬虫自动监控淘宝/京东价格

万邦科技Lex 万邦科技Lex 发表于2026-05-25 16:27:13 浏览20 评论0

抢沙发发表评论

还在每天打开N个浏览器标签页手动比价?价格波动总是错过?今天我就用100行代码帮你实现自动价格监控,每天节省1小时,还能抓住最佳购买时机!


一、 价格监控爬虫的“四大核心能力”

能力
传统方式
智能爬虫
节省时间
价格采集
手动查看
自动24小时监控
每天1小时
历史追踪
Excel记录
自动记录数据库
每周2小时
降价提醒
凭记忆
自动微信/邮件提醒
不错过任何优惠
比价分析
大脑计算
自动生成报表
每月8小时
总计:每月节省 40+ 小时,相当于多出1个完整工作日!

二、 完整源码:智能价格监控系统

1. 核心爬虫类(支持淘宝/京东/拼多多)

# price_monitor.py
import requests
import json
import time
import re
from datetime import datetime
from urllib.parse import quote
import sqlite3
from typing import Dict, List, Optional
import logging
import smtplib
from email.mime.text import MIMEText
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use('Agg')  # 无GUI模式
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class PriceMonitor:
    """智能价格监控器(支持淘宝、京东、拼多多)"""
    
    def __init__(self, db_path='price_data.db'):
        # 初始化数据库
        self.init_database(db_path)
        
        # 配置请求头
        self.headers = {
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
            'Accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8',
            'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8',
            'Accept-Encoding': 'gzip, deflate, br',
            'DNT': '1',
            'Connection': 'keep-alive',
            'Upgrade-Insecure-Requests': '1',
        }
        
        # 配置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('price_monitor.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def init_database(self, db_path):
        """初始化数据库"""
        self.conn = sqlite3.connect(db_path)
        self.cursor = self.conn.cursor()
        
        # 创建商品表
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS products (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL,
                url TEXT NOT NULL UNIQUE,
                platform TEXT,
                current_price REAL,
                lowest_price REAL,
                highest_price REAL,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 创建价格历史表
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS price_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_id INTEGER,
                price REAL,
                timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                FOREIGN KEY (product_id) REFERENCES products (id)
            )
        ''')
        
        # 创建监控规则表
        self.cursor.execute('''
            CREATE TABLE IF NOT EXISTS alert_rules (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                product_id INTEGER,
                target_price REAL,
                alert_type TEXT,  -- 'below'/'above'
                enabled INTEGER DEFAULT 1,
                FOREIGN KEY (product_id) REFERENCES products (id)
            )
        ''')
        
        self.conn.commit()
    
    def add_product(self, url: str, name: str = None):
        """添加监控商品"""
        try:
            platform = self.detect_platform(url)
            
            # 如果未提供名称,尝试从URL提取
            if not name:
                name = self.extract_name_from_url(url)
            
            # 获取当前价格
            price_info = self.fetch_price(url)
            
            if price_info and price_info.get('price'):
                current_price = float(price_info['price'])
                
                # 插入或更新商品
                self.cursor.execute('''
                    INSERT OR REPLACE INTO products 
                    (name, url, platform, current_price, lowest_price, highest_price, updated_at)
                    VALUES (?, ?, ?, ?, ?, ?, CURRENT_TIMESTAMP)
                ''', (name, url, platform, current_price, current_price, current_price))
                
                product_id = self.cursor.lastrowid
                
                # 记录价格历史
                self.cursor.execute('''
                    INSERT INTO price_history (product_id, price)
                    VALUES (?, ?)
                ''', (product_id, current_price))
                
                self.conn.commit()
                
                self.logger.info(f"✅ 添加商品: {name}, 价格: ¥{current_price}")
                return product_id
            else:
                self.logger.error(f"❌ 无法获取商品价格: {url}")
                return None
                
        except Exception as e:
            self.logger.error(f"添加商品失败: {e}")
            return None
    
    def detect_platform(self, url: str) -> str:
        """检测电商平台"""
        if 'taobao.com' in url or 'tmall.com' in url:
            return 'taobao'
        elif 'jd.com' in url:
            return 'jd'
        elif 'pinduoduo.com' in url or 'yangkeduo.com' in url:
            return 'pdd'
        elif 'sunings.com' in url:
            return 'suning'
        else:
            return 'unknown'
    
    def extract_name_from_url(self, url: str) -> str:
        """从URL提取商品名称"""
        try:
            # 尝试从URL参数提取
            if 'id=' in url:
                product_id = url.split('id=')[1].split('&')[0]
                return f"商品_{product_id}"
            elif '/item/' in url:
                parts = url.split('/item/')[1].split('.')[0]
                return parts
            else:
                return url.split('/')[-1].split('.')[0]
        except:
            return "未知商品"
    
    def fetch_price(self, url: str) -> Optional[Dict]:
        """获取商品价格(支持多个平台)"""
        platform = self.detect_platform(url)
        
        try:
            if platform == 'taobao':
                return self._fetch_taobao_price(url)
            elif platform == 'jd':
                return self._fetch_jd_price(url)
            elif platform == 'pdd':
                return self._fetch_pdd_price(url)
            else:
                return self._fetch_generic_price(url)
        except Exception as e:
            self.logger.error(f"获取价格失败 [{platform}]: {e}")
            return None
    
    def _fetch_taobao_price(self, url: str) -> Dict:
        """获取淘宝/天猫价格"""
        # 方法1: 尝试从HTML页面提取
        response = requests.get(url, headers=self.headers, timeout=10)
        # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
        if response.status_code != 200:
            raise Exception(f"HTTP {response.status_code}")
        
        html = response.text
        
        # 查找价格信息
        price_patterns = [
            r'"price"\s*:\s*"([\d.]+)"',  # JSON格式
            r'"reservePrice"\s*:\s*"([\d.]+)"',  # 原价
            r'"auctionPrice"\s*:\s*"([\d.]+)"',  # 拍卖价
            r'<em class="tb-rmb-num">([\d.]+)</em>',  # 价格标签
            r'"viewPrice"\s*:\s*"([\d.]+)"',  # 视图价格
        ]
        
        price = None
        for pattern in price_patterns:
            match = re.search(pattern, html)
            if match:
                price = float(match.group(1))
                break
        
        # 查找商品名称
        name_patterns = [
            r'"title"\s*:\s*"([^"]+)"',
            r'<title>([^<]+)</title>',
            r'"itemTitle"\s*:\s*"([^"]+)"',
        ]
        
        name = None
        for pattern in name_patterns:
            match = re.search(pattern, html)
            if match:
                name = match.group(1).strip()
                if len(name) > 100:  # 标题可能太长
                    name = name[:100] + '...'
                break
        
        if not price:
            # 尝试查找促销价
            promo_pattern = r'"promotionPrice"\s*:\s*"([\d.]+)"'
            match = re.search(promo_pattern, html)
            if match:
                price = float(match.group(1))
        
        return {'price': price, 'name': name, 'url': url}
    
    def _fetch_jd_price(self, url: str) -> Dict:
        """获取京东价格"""
        response = requests.get(url, headers=self.headers, timeout=10)
        
        if response.status_code != 200:
            raise Exception(f"HTTP {response.status_code}")
        
        html = response.text
        
        # 京东价格通常在JavaScript中
        price_patterns = [
            r'"p":"([\d.]+)"',  # 价格
            r'jd\.price\s*=\s*([\d.]+)',  # 价格变量
            r'"salePrice":\s*([\d.]+)',  # 销售价
            r'<em class="price J-p-[\d]+">([\d.]+)</em>',  # 价格标签
        ]
        
        price = None
        for pattern in price_patterns:
            match = re.search(pattern, html)
            if match:
                try:
                    price = float(match.group(1))
                    break
                except:
                    continue
        
        # 查找商品名称
        name = None
        name_match = re.search(r'<title>([^<]+)</title>', html)
        if name_match:
            name = name_match.group(1).split('【')[0].split('(')[0].strip()
        
        return {'price': price, 'name': name, 'url': url}
    
    def _fetch_pdd_price(self, url: str) -> Dict:
        """获取拼多多价格"""
        # 拼多多有较强的反爬,需要简化处理
        response = requests.get(url, headers=self.headers, timeout=10)
        
        if response.status_code != 200:
            raise Exception(f"HTTP {response.status_code}")
        
        html = response.text
        
        # 尝试多种模式
        price_patterns = [
            r'"marketPrice"\s*:\s*(\d+)',
            r'"goodsPrice":\s*"([\d.]+)"',
            r'"price":\s*"([\d.]+)"',
        ]
        
        price = None
        for pattern in price_patterns:
            match = re.search(pattern, html)
            if match:
                try:
                    price = float(match.group(1))
                    break
                except:
                    continue
        
        # 查找商品名称
        name = None
        name_match = re.search(r'<title>([^<]+)</title>', html)
        if name_match:
            name = name_match.group(1).split('-')[0].strip()
        
        return {'price': price, 'name': name, 'url': url}
    
    def _fetch_generic_price(self, url: str) -> Dict:
        """通用价格获取方法"""
        response = requests.get(url, headers=self.headers, timeout=10)
        
        if response.status_code != 200:
            raise Exception(f"HTTP {response.status_code}")
        
        html = response.text
        
        # 尝试查找各种价格格式
        price_patterns = [
            r'¥\s*([\d,.]+)',  # ¥ 符号
            r'¥\s*([\d,.]+)',  # ¥ 符号
            r'价格\s*[::]\s*([\d,.]+)',  # 价格标签
            r'price["\']?\s*[:=]\s*["\']?([\d,.]+)',  # price属性
        ]
        
        price = None
        for pattern in price_patterns:
            matches = re.findall(pattern, html, re.IGNORECASE)
            if matches:
                try:
                    # 取第一个匹配的价格
                    price_str = matches[0].replace(',', '')
                    price = float(price_str)
                    break
                except:
                    continue
        
        # 查找商品名称
        name = None
        name_match = re.search(r'<title>([^<]+)</title>', html)
        if name_match:
            name = name_match.group(1).strip()
        
        return {'price': price, 'name': name, 'url': url}
    
    def update_all_prices(self):
        """更新所有商品价格"""
        self.logger.info("开始更新所有商品价格...")
        
        # 获取所有商品
        self.cursor.execute('SELECT id, url, name FROM products')
        products = self.cursor.fetchall()
        
        updated_count = 0
        for product_id, url, name in products:
            try:
                # 获取最新价格
                price_info = self.fetch_price(url)
                
                if price_info and price_info.get('price'):
                    current_price = float(price_info['price'])
                    
                    # 获取历史最低/最高价
                    self.cursor.execute(
                        'SELECT MIN(price), MAX(price) FROM price_history WHERE product_id = ?',
                        (product_id,)
                    )
                    min_price, max_price = self.cursor.fetchone()
                    
                    # 更新商品信息
                    new_min = min(min_price or current_price, current_price)
                    new_max = max(max_price or current_price, current_price)
                    
                    self.cursor.execute('''
                        UPDATE products 
                        SET current_price = ?, lowest_price = ?, highest_price = ?, updated_at = CURRENT_TIMESTAMP
                        WHERE id = ?
                    ''', (current_price, new_min, new_max, product_id))
                    
                    # 记录价格历史
                    self.cursor.execute('''
                        INSERT INTO price_history (product_id, price)
                        VALUES (?, ?)
                    ''', (product_id, current_price))
                    
                    updated_count += 1
                    
                    # 检查价格提醒
                    self.check_price_alerts(product_id, current_price, name)
                    
                    self.logger.info(f"✅ 更新: {name} -> ¥{current_price}")
                    
                else:
                    self.logger.warning(f"⚠️ 无法获取价格: {name}")
                    
                # 避免请求过快
                time.sleep(1)
                
            except Exception as e:
                self.logger.error(f"更新失败 {name}: {e}")
                continue
        
        self.conn.commit()
        self.logger.info(f"价格更新完成: {updated_count}/{len(products)} 个商品")
        
        return updated_count
    
    def add_alert_rule(self, product_id: int, target_price: float, 
                      alert_type: str = 'below'):
        """添加价格提醒规则"""
        try:
            self.cursor.execute('''
                INSERT OR REPLACE INTO alert_rules (product_id, target_price, alert_type)
                VALUES (?, ?, ?)
            ''', (product_id, target_price, alert_type))
            self.conn.commit()
            
            self.logger.info(f"✅ 添加提醒规则: 商品{product_id}, 目标价¥{target_price}")
            return True
        except Exception as e:
            self.logger.error(f"添加提醒失败: {e}")
            return False
    
    def check_price_alerts(self, product_id: int, current_price: float, product_name: str):
        """检查价格提醒"""
        self.cursor.execute('''
            SELECT target_price, alert_type 
            FROM alert_rules 
            WHERE product_id = ? AND enabled = 1
        ''', (product_id,))
        
        rules = self.cursor.fetchall()
        
        for target_price, alert_type in rules:
            should_alert = False
            message = ""
            
            if alert_type == 'below' and current_price <= target_price:
                should_alert = True
                message = f"🎉 降价啦!{product_name} 当前价: ¥{current_price} ≤ 目标价: ¥{target_price}"
            elif alert_type == 'above' and current_price >= target_price:
                should_alert = True
                message = f"📈 涨价提醒!{product_name} 当前价: ¥{current_price} ≥ 目标价: ¥{target_price}"
            
            if should_alert:
                self.send_alert(product_id, message)
    
    def send_alert(self, product_id: int, message: str):
        """发送提醒"""
        # 这里可以实现多种通知方式
        print(f"🔔 价格提醒: {message}")
        
        # 1. 控制台打印
        self.logger.info(message)
        
        # 2. 可以扩展为邮件提醒
        # self.send_email_alert(message)
        
        # 3. 可以扩展为微信提醒
        # self.send_wechat_alert(message)
        
        # 禁用已触发的提醒
        self.cursor.execute(
            'UPDATE alert_rules SET enabled = 0 WHERE product_id = ?',
            (product_id,)
        )
        self.conn.commit()
    
    def send_email_alert(self, message: str, to_email: str = None):
        """发送邮件提醒(需要配置SMTP)"""
        if not to_email:
            return
        
        try:
            # 邮件配置
            smtp_server = "smtp.163.com"
            smtp_port = 465
            sender = "your_email@163.com"
            password = "your_password"
            
            msg = MIMEText(message, 'plain', 'utf-8')
            msg['Subject'] = '🔔 价格监控提醒'
            msg['From'] = sender
            msg['To'] = to_email
            
            with smtplib.SMTP_SSL(smtp_server, smtp_port) as server:
                server.login(sender, password)
                server.send_message(msg)
            
            self.logger.info(f"📧 邮件提醒已发送: {to_email}")
            
        except Exception as e:
            self.logger.error(f"发送邮件失败: {e}")
    
    def generate_report(self, days: int = 30) -> Dict:
        """生成价格监控报告"""
        # 获取时间范围
        cutoff_date = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
        # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
        # 商品统计
        self.cursor.execute('''
            SELECT 
                COUNT(*) as total_products,
                COUNT(DISTINCT platform) as platforms,
                AVG(current_price) as avg_price,
                MIN(lowest_price) as global_min,
                MAX(highest_price) as global_max
            FROM products
        ''')
        stats = self.cursor.fetchone()
        
        # 价格变化统计
        self.cursor.execute('''
            SELECT 
                p.name,
                p.platform,
                p.current_price,
                p.lowest_price,
                p.highest_price,
                (p.current_price - p.lowest_price) as diff_from_low,
                ROUND((p.current_price - p.lowest_price) / p.lowest_price * 100, 2) as discount_pct
            FROM products p
            ORDER BY discount_pct DESC
        ''')
        
        products = self.cursor.fetchall()
        
        # 转换为DataFrame便于分析
        columns = ['name', 'platform', 'current', 'lowest', 'highest', 'diff', 'discount_pct']
        df = pd.DataFrame(products, columns=columns)
        
        report = {
            'generated_at': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
            'stats': {
                'total_products': stats[0],
                'platforms': stats[1],
                'avg_price': round(stats[2], 2) if stats[2] else 0,
                'global_min_price': stats[3],
                'global_max_price': stats[4]
            },
            'best_deals': df.nsmallest(5, 'current').to_dict('records'),
            'price_changes': self.get_price_changes(days)
        }
        
        return report
    
    def get_price_changes(self, days: int = 7) -> List[Dict]:
        """获取价格变化趋势"""
        self.cursor.execute('''
            SELECT 
                p.name,
                ph.price,
                ph.timestamp
            FROM price_history ph
            JOIN products p ON p.id = ph.product_id
            WHERE ph.timestamp >= datetime('now', ?)
            ORDER BY ph.timestamp DESC
        ''', (f'-{days} days',))
        
        return self.cursor.fetchall()
    
    def plot_price_history(self, product_id: int, save_path: str = None):
        """绘制价格历史图表"""
        self.cursor.execute('''
            SELECT price, timestamp 
            FROM price_history 
            WHERE product_id = ? 
            ORDER BY timestamp
        ''', (product_id,))
        
        data = self.cursor.fetchall()
        
        if not data:
            self.logger.warning(f"无价格数据: 商品{product_id}")
            return
        
        # 获取商品信息
        self.cursor.execute(
            'SELECT name, current_price FROM products WHERE id = ?',
            (product_id,)
        )
        name, current_price = self.cursor.fetchone()
        
        # 准备数据
        prices = [row[0] for row in data]
        timestamps = [row[1] for row in data]
        
        # 转换为datetime
        dates = [datetime.strptime(ts[:19], '%Y-%m-%d %H:%M:%S') for ts in timestamps]
        
        # 创建图表
        plt.figure(figsize=(12, 6))
        plt.plot(dates, prices, 'b-', linewidth=2, marker='o', markersize=4)
        
        # 标记当前价格
        plt.axhline(y=current_price, color='r', linestyle='--', alpha=0.5, 
                   label=f'当前: ¥{current_price}')
        
        # 标记最低价
        min_price = min(prices)
        min_date = dates[prices.index(min_price)]
        plt.scatter([min_date], [min_price], color='g', s=100, zorder=5)
        plt.annotate(f'最低: ¥{min_price}', 
                    xy=(min_date, min_price),
                    xytext=(10, 10),
                    textcoords='offset points',
                    bbox=dict(boxstyle='round,pad=0.5', fc='green', alpha=0.3))
        
        # 图表美化
        plt.title(f'{name} 价格走势图', fontsize=16, fontweight='bold')
        plt.xlabel('日期', fontsize=12)
        plt.ylabel('价格 (¥)', fontsize=12)
        plt.grid(True, alpha=0.3)
        plt.legend()
        plt.xticks(rotation=45)
        plt.tight_layout()
        
        # 保存或显示
        if save_path:
            plt.savefig(save_path, dpi=300, bbox_inches='tight')
            self.logger.info(f"📈 图表已保存: {save_path}")
        else:
            plt.show()
        
        plt.close()
    
    def close(self):
        """关闭连接"""
        if hasattr(self, 'conn'):
            self.conn.close()

# 使用示例
if __name__ == "__main__":
    print("🛒 智能价格监控系统启动!")
    print("=" * 50)
    
    # 创建监控器
    monitor = PriceMonitor()
    
    # 示例商品(替换为你想监控的商品)
    sample_products = [
        # 格式: [商品URL, 商品名称(可选)]
        ["https://item.taobao.com/item.htm?id=674904123402", "智能手表"],
        ["https://item.jd.com/100008348542.html", "无线耳机"],
        # 添加更多商品...
    ]
    
    # 1. 添加监控商品
    print("\n1. 添加监控商品...")
    for url, name in sample_products:
        product_id = monitor.add_product(url, name)
        if product_id:
            print(f"   ✅ 已添加: {name}")
        else:
            print(f"   ❌ 添加失败: {name}")
        time.sleep(2)  # 避免请求过快
    
    # 2. 设置价格提醒
    print("\n2. 设置价格提醒...")
    # 假设第一个商品ID是1,设置当价格低于500时提醒
    monitor.add_alert_rule(1, 500.0, 'below')
    print("   ✅ 已设置: 智能手表 < ¥500 时提醒")
    
    # 3. 更新所有价格
    print("\n3. 开始更新价格...")
    updated = monitor.update_all_prices()
    print(f"   更新完成: {updated} 个商品")
    
    # 4. 生成报告
    print("\n4. 生成监控报告...")
    report = monitor.generate_report(7)
    print(f"   监控商品总数: {report['stats']['total_products']}")
    print(f"   平台数量: {report['stats']['platforms']}")
    print(f"   平均价格: ¥{report['stats']['avg_price']}")
    
    # 5. 绘制价格图表
    print("\n5. 生成价格走势图...")
    monitor.plot_price_history(1, 'price_chart.png')
    print("   📈 图表已生成: price_chart.png")
    
    # 6. 关闭连接
    monitor.close()
    
    print("\n" + "=" * 50)
    print("🎯 监控系统已就绪!")
    print("定时任务可以添加到cron或Windows任务计划")

2. 自动化调度脚本

# scheduler.py
import schedule
import time
from datetime import datetime
from price_monitor import PriceMonitor
import logging
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
def job():
    """定时执行的价格更新任务"""
    print(f"\n{'='*50}")
    print(f"🕐 开始执行定时任务: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    monitor = PriceMonitor()
    
    try:
        # 更新所有价格
        updated = monitor.update_all_prices()
        
        # 生成报告
        report = monitor.generate_report(1)
        
        print(f"✅ 任务完成: 更新 {updated} 个商品")
        print(f"   今日最佳优惠:")
        
        for deal in report.get('best_deals', [])[:3]:
            print(f"   - {deal['name']}: ¥{deal['current']} (最低¥{deal['lowest']})")
            
    except Exception as e:
        print(f"❌ 任务失败: {e}")
    finally:
        monitor.close()

if __name__ == "__main__":
    print("⏰ 价格监控定时任务启动")
    print("每天 9:00, 12:00, 15:00, 18:00, 21:00 自动检查价格")
    
    # 设置定时任务
    schedule.every().day.at("09:00").do(job)
    schedule.every().day.at("12:00").do(job)
    schedule.every().day.at("15:00").do(job)
    schedule.every().day.at("18:00").do(job)
    schedule.every().day.at("21:00").do(job)
    
    # 立即执行一次
    job()
    
    # 保持运行
    while True:
        schedule.run_pending()
        time.sleep(60)  # 每分钟检查一次

3. 网页可视化界面(可选)

# web_dashboard.py
from flask import Flask, render_template, jsonify, request
import json
from price_monitor import PriceMonitor
import plotly.graph_objects as go
import plotly.utils
import pandas as pd

app = Flask(__name__)
monitor = PriceMonitor()

@app.route('/')
def index():
    """首页 - 显示监控看板"""
    return render_template('dashboard.html')

@app.route('/api/products')
def get_products():
    """获取所有监控商品"""
    monitor.cursor.execute('''
        SELECT id, name, url, platform, current_price, lowest_price, highest_price, 
               updated_at,
               ROUND((current_price - lowest_price) / lowest_price * 100, 1) as discount_pct
        FROM products
        ORDER BY updated_at DESC
    ''')
    
    products = []
    for row in monitor.cursor.fetchall():
        products.append({
            'id': row[0],
            'name': row[1],
            'url': row[2],
            'platform': row[3],
            'current_price': row[4],
            'lowest_price': row[5],
            'highest_price': row[6],
            'updated_at': row[7],
            'discount_pct': row[8],
            'is_best_price': row[4] <= row[5] * 1.05  # 接近最低价5%以内
        })
    
    return jsonify(products)

@app.route('/api/price_history/<int:product_id>')
def get_price_history(product_id):
    """获取价格历史"""
    monitor.cursor.execute('''
        SELECT price, timestamp 
        FROM price_history 
        WHERE product_id = ?
        ORDER BY timestamp
    ''', (product_id,))
    
    data = monitor.cursor.fetchall()
    
    # 转换为Plotly图表数据
    prices = [row[0] for row in data]
    timestamps = [row[1] for row in data]
    
    fig = go.Figure(
        data=go.Scatter(
            x=timestamps,
            y=prices,
            mode='lines+markers',
            name='价格走势'
        )
    )
    
    fig.update_layout(
        title='价格历史走势',
        xaxis_title='时间',
        yaxis_title='价格 (¥)',
        template='plotly_white'
    )
    
    return json.dumps(fig, cls=plotly.utils.PlotlyJSONEncoder)

@app.route('/api/add_product', methods=['POST'])
def add_product():
    """添加新商品"""
    data = request.json
    url = data.get('url')
    name = data.get('name')
    
    if not url:
        return jsonify({'error': 'URL不能为空'}), 400
    
    product_id = monitor.add_product(url, name)
    
    if product_id:
        return jsonify({'success': True, 'product_id': product_id})
    else:
        return jsonify({'error': '添加失败'}), 500

@app.route('/api/update_prices')
def update_prices():
    """手动更新价格"""
    updated = monitor.update_all_prices()
    return jsonify({'success': True, 'updated': updated})

if __name__ == '__main__':
    app.run(debug=True, port=5000)

三、 部署指南:3分钟快速上手

步骤1:安装依赖

pip install requests pandas matplotlib flask plotly schedule

步骤2:创建配置文件

# config.py
# 邮件提醒配置(可选)
EMAIL_CONFIG = {
    'smtp_server': 'smtp.163.com',
    'smtp_port': 465,
    'sender': 'your_email@163.com',
    'password': 'your_password',
    'receivers': ['your_email@163.com']
}

# 监控频率
CHECK_INTERVALS = ['09:00', '12:00', '15:00', '18:00', '21:00']

步骤3:添加你的商品

# my_products.py
MY_PRODUCTS = [
    # 淘宝/天猫
    "https://item.taobao.com/item.htm?id=674904123402",
    
    # 京东
    "https://item.jd.com/100008348542.html",
    
    # 拼多多
    "https://mobile.yangkeduo.com/goods.html?goods_id=123456789",
    
    # 添加更多...
]

步骤4:运行监控

# 一次性运行
python price_monitor.py

# 定时运行(后台)
nohup python scheduler.py > monitor.log 2>&1 &

# 网页看板
python web_dashboard.py

四、 高级功能扩展

1. 微信/钉钉机器人提醒

def send_wechat_alert(message: str):
    """发送微信机器人提醒"""
    webhook_url = "https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key=YOUR_KEY"
    
    data = {
        "msgtype": "text",
        "text": {
            "content": f"🔔 价格提醒\n{message}\n{datetime.now().strftime('%H:%M')}"
        }
    }
    
    requests.post(webhook_url, json=data)

def send_dingtalk_alert(message: str):
    """发送钉钉机器人提醒"""
    webhook_url = "https://oapi.dingtalk.com/robot/send?access_token=YOUR_TOKEN"
    
    data = {
        "msgtype": "text",
        "text": {
            "content": f"价格提醒: {message}"
        },
        "at": {
            "atMobiles": ["13800138000"],  # @某人
            "isAtAll": False
        }
    }
    
    requests.post(webhook_url, json=data)

2. 智能比价推荐

def find_best_deal(product_name: str, platform: str = None) -> Dict:
    """智能比价推荐"""
    # 1. 搜索同款商品
    search_urls = {
        'taobao': f'https://s.taobao.com/search?q={quote(product_name)}',
        'jd': f'https://search.jd.com/Search?keyword={quote(product_name)}',
    }
    
    best_deal = {
        'platform': '',
        'price': float('inf'),
        'url': '',
        'title': ''
    }
    
    for platform, url in search_urls.items():
        # 获取搜索结果页面
        # 解析多个商品价格
        # 找到最低价
        
        if price < best_deal['price']:
            best_deal.update({
                'platform': platform,
                'price': price,
                'url': item_url,
                'title': title
            })
    
    return best_deal

3. 反爬虫策略增强

def rotate_proxies():
    """轮换代理IP"""
    proxies = [
        "http://proxy1.com:8080",
        "http://proxy2.com:8080",
        # 更多代理...
    ]
    
    return random.choice(proxies)

def use_selenium_for_js():
    """使用Selenium处理JavaScript渲染"""
    from selenium import webdriver
    from selenium.webdriver.chrome.options import Options
    
    options = Options()
    options.add_argument('--headless')
    options.add_argument('--disable-blink-features=AutomationControlled')
    
    driver = webdriver.Chrome(options=options)
    driver.get(url)
    
    # 等待页面加载
    time.sleep(2)
    
    # 获取完整页面
    html = driver.page_source
    driver.quit()
    
    return html

五、 注意事项与法律合规

✅ 允许的行为:

  1. 监控自己购买意愿的商品

  2. 合理频率(每天几次)

  3. 用于个人决策参考

⚠️ 禁止的行为:

  1. 大规模采集(可能违反网站Robots协议)

  2. 用于商业竞争

  3. 影响网站正常运营

  4. 绕过付费API

📊 建议策略:

  1. 设置合理的延迟(time.sleep(1-3)秒)

  2. 使用随机User-Agent

  3. 遵守robots.txt

  4. 考虑使用官方API(如有)


💡 最后建议

立即行动清单
  1. [ ] 安装Python和依赖库

  2. [ ] 复制上面的核心代码

  3. [ ] 添加3个你最想监控的商品

  4. [ ] 运行一次,看看效果

  5. [ ] 设置定时任务,忘记比价烦恼

记住:这个工具的价值不在于技术多复杂,而在于帮你节省的时间和抓住的机会。一次大促省下的钱,可能就值回你学习Python的投入了。
互动话题:你最想监控什么商品的价格?是电子产品、化妆品还是母婴用品?评论区聊聊,我可以给你针对性的采集建议!


群贤毕至

访客