×

🔥 人工抓取数据革命:从“人肉爬虫”到“智能数据工厂”全面转型指南

万邦科技Lex 万邦科技Lex 发表于2026-05-23 13:43:52 浏览17 评论0

抢沙发发表评论

🔥 人工抓取数据革命:从“人肉爬虫”到“智能数据工厂”全面转型指南

“人肉爬虫”时代即将终结。当企业还在用人工复制粘贴、Excel导出、微信群发索取数据时,竞争对手已经用智能数据工厂实现了分钟级数据洞察。这不是危言耸听——2024年,数据采集效率差距将决定企业生死
本文将为你提供从“人肉爬虫”到“智能数据工厂”的完整转型路线图,包含可直接部署的Python源码。

一、 现状诊断:你的企业处于哪个阶段?

# stage_diagnosis.py
class DataAcquisitionStage:
    """数据采集成熟度诊断"""
    
    STAGES = {
        0: "🔴 石器时代 - 全人工",
        1: "🟡 铁器时代 - 半自动脚本",
        2: "🟢 工业时代 - 自动化工具",
        3: "🔵 信息时代 - 智能系统",
        4: "🟣 智能时代 - 数据工厂"
    }
    
    @staticmethod
    def diagnose(your_situation: dict) -> int:
        """诊断企业数据采集成熟度"""
        score = 0
        
        # 1. 数据来源多样性
        if your_situation.get('sources', 0) > 20:
            score += 1
        if your_situation.get('has_api', False):
            score += 1
        
        # 2. 自动化程度
        if your_situation.get('automation_rate', 0) > 0.8:
            score += 1
        if your_situation.get('has_scheduler', False):
            score += 1
        
        # 3. 智能化程度
        if your_situation.get('has_ai_parsing', False):
            score += 1
        if your_situation.get('has_self_healing', False):
            score += 1
        
        # 4. 数据治理
        if your_situation.get('has_data_quality', False):
            score += 1
        if your_situation.get('has_lineage', False):
            score += 1
        
        return min(score, 4)

# 自我诊断
your_company = {
    'sources': 5,           # 数据源数量
    'has_api': False,       # 是否有API接入
    'automation_rate': 0.3, # 自动化比例
    'has_scheduler': False, # 是否有定时任务
    'has_ai_parsing': False,# 是否用AI解析
    'has_self_healing': False, # 是否自愈
    'has_data_quality': False, # 数据质量监控
    'has_lineage': False    # 数据血缘
}

stage = DataAcquisitionStage.diagnose(your_company)
print(f"📊 你的企业处于: {DataAcquisitionStage.STAGES[stage]}")
大多数企业卡在0-1阶段,表现为:
  • 👨💻 人工复制粘贴Excel

  • 📧 微信群发索取数据

  • 🔁 重复性工作占80%时间

  • 📈 数据延迟1-3天

  • ❌ 错误率5-10%


二、 转型蓝图:五级火箭模型

graph TD
    A[人肉爬虫 Stage 0] --> B[脚本辅助 Stage 1]
    B --> C[自动化工具 Stage 2]
    C --> D[智能系统 Stage 3]
    D --> E[数据工厂 Stage 4]
    
    E --> E1[数据即服务 DaaS]
    E --> E2[实时数据流]
    E --> E3[预测性采集]

三、 核心组件:智能数据工厂架构

组件1:自适应采集引擎

能自动识别网页结构变化,支持API/WebSocket/文件等多源接入。
# adaptive_crawler.py
import requests
import json
from bs4 import BeautifulSoup
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
import undetected_chromedriver as uc
import pandas as pd
import time
from urllib.parse import urlparse
import re

class AdaptiveCrawler:
    """自适应采集引擎 - 智能识别网站类型并选择最佳采集策略"""
    
    def __init__(self, use_selenium=False, headless=True):
        self.session = requests.Session()
        self.session.headers.update({
            'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36'
        })
        
        self.use_selenium = use_selenium
        if use_selenium:
            options = uc.ChromeOptions()
            if headless:
                options.add_argument('--headless')
            options.add_argument('--disable-blink-features=AutomationControlled')
            self.driver = uc.Chrome(options=options)
    
    def detect_page_type(self, url: str) -> dict:
        """智能检测页面类型"""
        try:
            response = self.session.get(url, timeout=10)
            content_type = response.headers.get('content-type', '')
            
            detection = {
                'type': 'unknown',
                'features': [],
                'suggested_method': 'requests'
            }
            
            # 检测JSON API
            if 'application/json' in content_type:
                detection['type'] = 'json_api'
                detection['suggested_method'] = 'requests'
                detection['features'].append('has_json')
                
            # 检测HTML页面
            elif 'text/html' in content_type:
                detection['type'] = 'html_page'
                soup = BeautifulSoup(response.text, 'html.parser')
                
                # 检查是否是SPA
                script_tags = soup.find_all('script')
                spa_indicators = ['react', 'vue', 'angular', 'webpack']
                is_spa = any(indicator in str(script_tags).lower() 
                           for indicator in spa_indicators)
                
                if is_spa or len(soup.find_all()) < 50:
                    detection['suggested_method'] = 'selenium'
                    detection['features'].append('is_spa')
                else:
                    detection['suggested_method'] = 'requests'
                
                # 检查是否有表格数据
                tables = soup.find_all('table')
                if tables:
                    detection['features'].append('has_tables')
                
                # 检查是否有分页
                pagination = soup.find_all(['a', 'div'], 
                                         string=re.compile(r'下一页|next|more', re.I))
                if pagination:
                    detection['features'].append('has_pagination')
            
            return detection
            
        except Exception as e:
            print(f"检测失败: {e}")
            return {'type': 'error', 'suggested_method': 'requests'}
    
    def extract_with_best_method(self, url: str, config: dict = None) -> pd.DataFrame:
        """使用最佳方法提取数据"""
        detection = self.detect_page_type(url)
        method = detection['suggested_method']
        
        print(f"🔍 检测到页面类型: {detection['type']}")
        print(f"🎯 使用采集方法: {method}")
        
        if method == 'selenium' and self.use_selenium:
            return self.extract_with_selenium(url, config)
        else:
            return self.extract_with_requests(url, config)
    
    def extract_with_requests(self, url: str, config: dict = None) -> pd.DataFrame:
        """使用Requests提取"""
        try:
            response = self.session.get(url, timeout=30)
            response.raise_for_status()
            
            # 自动识别内容类型
            content_type = response.headers.get('content-type', '')
            
            if 'application/json' in content_type:
                data = json.loads(response.text)
                # 尝试自动展平JSON
                df = self.flatten_json(data)
            else:
                # HTML解析
                df = self.parse_html_table(response.text)
            
            return df
            
        except Exception as e:
            print(f"Requests提取失败: {e}")
            return pd.DataFrame()
    
    def extract_with_selenium(self, url: str, config: dict = None) -> pd.DataFrame:
        """使用Selenium处理动态页面"""
        if not hasattr(self, 'driver'):
            print("❌ Selenium未初始化")
            return pd.DataFrame()
        
        try:
            self.driver.get(url)
            time.sleep(2)  # 等待加载
            
            # 智能等待元素出现
            wait = WebDriverWait(self.driver, 10)
            
            # 尝试多种数据提取方式
            data = []
            
            # 方式1: 提取表格
            tables = self.driver.find_elements(By.TAG_NAME, 'table')
            if tables:
                for table in tables[:3]:  # 最多处理3个表格
                    html = table.get_attribute('outerHTML')
                    df = pd.read_html(html)[0]
                    data.append(df)
            
            # 方式2: 提取列表
            if not data and config and config.get('item_selector'):
                items = self.driver.find_elements(By.CSS_SELECTOR, config['item_selector'])
                for item in items[:50]:  # 限制数量
                    item_data = {}
                    for field, selector in config.get('field_selectors', {}).items():
                        try:
                            element = item.find_element(By.CSS_SELECTOR, selector)
                            item_data[field] = element.text
                        except:
                            item_data[field] = None
                    if item_data:
                        data.append(item_data)
            
            # 合并所有数据
            if data:
                if isinstance(data[0], pd.DataFrame):
                    result = pd.concat(data, ignore_index=True)
                else:
                    result = pd.DataFrame(data)
                return result
            
            return pd.DataFrame()
            
        except Exception as e:
            print(f"Selenium提取失败: {e}")
            return pd.DataFrame()
        finally:
            # 不关闭driver以便重用
            pass
    
    def parse_html_table(self, html: str) -> pd.DataFrame:
        """解析HTML表格"""
        try:
            dfs = pd.read_html(html)
            if dfs:
                return dfs[0]  # 返回第一个表格
        except:
            pass
        
        # 尝试用BeautifulSoup解析
        soup = BeautifulSoup(html, 'html.parser')
        tables = soup.find_all('table')
        
        if tables:
            return pd.read_html(str(tables[0]))[0]
        
        return pd.DataFrame()
    
    def flatten_json(self, data: dict, prefix: str = '') -> pd.DataFrame:
        """展平嵌套JSON"""
        items = []
        
        def flatten(x, name=''):
            if isinstance(x, dict):
                for a in x:
                    flatten(x[a], f'{name}{a}.')
            elif isinstance(x, list):
                for i, a in enumerate(x):
                    flatten(a, f'{name}{i}.')
            else:
                items.append((name[:-1], x))
        
        flatten(data)
        df = pd.DataFrame(dict(items), index=[0])
        return df
    
    def close(self):
        """清理资源"""
        if hasattr(self, 'driver'):
            self.driver.quit()

# 使用示例
if __name__ == "__main__":
    crawler = AdaptiveCrawler(use_selenium=True, headless=True)
    
    # 测试不同网站
    test_urls = [
        "https://httpbin.org/json",  # JSON API
        "https://news.ycombinator.com",  # 传统HTML
        "https://quotes.toscrape.com/js/",  # JavaScript渲染
    ]
    
    for url in test_urls:
        print(f"\n{'='*50}")
        print(f"测试URL: {url}")
        
        df = crawler.extract_with_best_method(url)
        
        if not df.empty:
            print(f"✅ 成功提取 {len(df)} 行数据")
            print(df.head())
        else:
            print("❌ 提取失败")
    
    crawler.close()

组件2:智能解析器

用AI识别非结构化数据,自动适配网站改版。
# smart_parser.py
import cv2
import pytesseract
import numpy as np
from PIL import Image
import io
import pandas as pd
from transformers import pipeline
import easyocr
import re

class SmartParser:
    """智能解析器 - 处理图片、PDF、复杂HTML"""
    
    def __init__(self):
        # 初始化OCR
        self.reader = easyocr.Reader(['ch_sim', 'en'])
        
        # 初始化NLP模型
        try:
            self.ner_pipeline = pipeline("ner", grouped_entities=True)
        except:
            self.ner_pipeline = None
        
        # 规则库
        self.rules = self.load_parsing_rules()
    
    def parse_with_ai(self, content, content_type='html', domain=None):
        """智能解析内容"""
        if content_type == 'image':
            return self.parse_image(content)
        elif content_type == 'pdf':
            return self.parse_pdf(content)
        elif content_type == 'html':
            return self.parse_html(content, domain)
        elif content_type == 'text':
            return self.parse_text(content)
        else:
            return self.parse_unknown(content)
    
    def parse_image(self, image_data):
        """解析图片中的文字和表格"""
        results = []
        
        # 方法1: EasyOCR
        ocr_result = self.reader.readtext(image_data)
        for detection in ocr_result:
            bbox, text, confidence = detection
            if confidence > 0.5:  # 置信度阈值
                results.append({
                    'text': text,
                    'confidence': confidence,
                    'bbox': bbox,
                    'source': 'easyocr'
                })
        
        # 方法2: 表格检测
        img_array = np.array(Image.open(io.BytesIO(image_data)))
        tables = self.detect_tables(img_array)
        
        for table in tables:
            results.append({
                'type': 'table',
                'data': table,
                'source': 'table_detection'
            })
        
        return results
    
    def detect_tables(self, image):
        """检测图片中的表格"""
        # 使用OpenCV检测直线
        gray = cv2.cvtColor(image, cv2.COLOR_BGR2GRAY)
        edges = cv2.Canny(gray, 50, 150, apertureSize=3)
        
        # 霍夫直线检测
        lines = cv2.HoughLinesP(edges, 1, np.pi/180, 100, 
                               minLineLength=100, maxLineGap=10)
        
        tables = []
        if lines is not None:
            # 这里简化处理,实际需要更复杂的表格检测逻辑
            table_data = [['检测到表格', '需进一步处理']]
            tables.append(table_data)
        
        return tables
    
    def parse_pdf(self, pdf_data):
        """解析PDF文档"""
        import PyPDF2
        
        pdf_reader = PyPDF2.PdfReader(io.BytesIO(pdf_data))
        text = ""
        
        for page in pdf_reader.pages:
            text += page.extract_text() + "\n"
        
        # 提取结构化信息
        structured_data = self.extract_structured_info(text)
        
        return {
            'raw_text': text,
            'structured': structured_data,
            'page_count': len(pdf_reader.pages)
        }
    
    def parse_html(self, html, domain=None):
        """智能解析HTML"""
        from bs4 import BeautifulSoup
        
        soup = BeautifulSoup(html, 'html.parser')
        
        # 移除脚本和样式
        for script in soup(["script", "style"]):
            script.decompose()
        
        # 尝试匹配已知网站规则
        if domain and domain in self.rules:
            return self.apply_rules(soup, self.rules[domain])
        
        # 自动检测数据模式
        data = self.auto_detect_data(soup)
        
        return data
    
    def auto_detect_data(self, soup):
        """自动检测HTML中的数据模式"""
        data = {}
        
        # 检测表格
        tables = soup.find_all('table')
        if tables:
            data['tables'] = []
            for table in tables[:3]:  # 最多处理3个表格
                try:
                    df = pd.read_html(str(table))[0]
                    data['tables'].append(df.to_dict('records'))
                except:
                    pass
        
        # 检测列表
        list_items = soup.find_all(['li', 'div', 'tr'])
        if len(list_items) > 3:
            items = []
            for item in list_items[:50]:  # 限制数量
                text = item.get_text(strip=True)
                if text and len(text) > 3:
                    items.append(text)
            if items:
                data['list_items'] = items
        
        # 检测键值对
        key_value_pairs = {}
        for dt in soup.find_all(['dt', 'th', 'strong', 'b']):
            key = dt.get_text(strip=True).rstrip(':')
            # 查找相邻的值
            next_sibling = dt.find_next_sibling(['dd', 'td', 'span', 'div'])
            if next_sibling:
                value = next_sibling.get_text(strip=True)
                if key and value:
                    key_value_pairs[key] = value
        
        if key_value_pairs:
            data['key_value'] = key_value_pairs
        
        return data
    
    def parse_text(self, text):
        """解析纯文本"""
        # 使用NLP提取实体
        entities = []
        if self.ner_pipeline:
            ner_results = self.ner_pipeline(text[:512])  # 限制长度
            entities = [{"entity": ent["entity_group"], "word": ent["word"]} 
                       for ent in ner_results]
        
        # 提取电话号码、邮箱等
        patterns = {
            'phone': r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
            'email': r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
            'url': r'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+',
            'date': r'\b\d{4}[-/]\d{1,2}[-/]\d{1,2}\b'
        }
        
        extracted = {}
        for name, pattern in patterns.items():
            matches = re.findall(pattern, text, re.IGNORECASE)
            if matches:
                extracted[name] = matches
        
        return {
            'entities': entities,
            'patterns': extracted,
            'length': len(text)
        }
    
    def load_parsing_rules(self):
        """加载解析规则库"""
        return {
            'github.com': {
                'repositories': '.repo-list li',
                'name': 'h3 a',
                'description': 'p',
                'stars': '.muted-link'
            },
            'amazon.com': {
                'products': '[data-component-type="s-search-result"]',
                'title': 'h2 a span',
                'price': '.a-price-whole',
                'rating': '.a-icon-alt'
            }
        }
    
    def apply_rules(self, soup, rules):
        """应用解析规则"""
        data = {}
        
        container_selector = rules.get('container')
        if container_selector:
            items = soup.select(container_selector)
            
            records = []
            for item in items[:100]:  # 限制数量
                record = {}
                for field, selector in rules.items():
                    if field != 'container':
                        element = item.select_one(selector)
                        if element:
                            record[field] = element.get_text(strip=True)
                if record:
                    records.append(record)
            
            data['records'] = records
        
        return data

# 使用示例
if __name__ == "__main__":
    parser = SmartParser()
    
    # 测试文本解析
    sample_text = """
    联系人: 张三
    电话: 138-1234-5678
    邮箱: zhangsan@example.com
    地址: 北京市海淀区中关村大街1号
    网址: https://www.example.com
    日期: 2024-05-20
    """
    
    result = parser.parse_text(sample_text)
    print("📄 文本解析结果:")
    print(f"实体识别: {result.get('entities', [])}")
    print(f"模式匹配: {result.get('patterns', {})}")

组件3:自愈式调度器

自动重试、故障转移、数据质量校验。
# self_healing_scheduler.py
import schedule
import time
import threading
from datetime import datetime, timedelta
import json
import sqlite3
from typing import Dict, List, Callable
import logging
from dataclasses import dataclass
from enum import Enum
import hashlib
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class TaskStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    SUCCESS = "success"
    FAILED = "failed"
    RETRYING = "retrying"

@dataclass
class Task:
    """采集任务定义"""
    id: str
    name: str
    url: str
    schedule: str  # cron表达式
    parser_config: Dict
    max_retries: int = 3
    timeout: int = 300
    created_at: datetime = None
    
    def __post_init__(self):
        if self.created_at is None:
            self.created_at = datetime.now()

class SelfHealingScheduler:
    """自愈式任务调度器"""
    
    def __init__(self, db_path='tasks.db'):
        self.db_path = db_path
        self.tasks: Dict[str, Task] = {}
        self.task_handlers: Dict[str, Callable] = {}
        self.running = False
        self.worker_thread = None
        
        # 初始化数据库
        self.init_database()
        
        # 加载任务
        self.load_tasks()
        
        # 设置日志
        logging.basicConfig(
            level=logging.INFO,
            format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
            handlers=[
                logging.FileHandler('scheduler.log'),
                logging.StreamHandler()
            ]
        )
        self.logger = logging.getLogger(__name__)
    
    def init_database(self):
        """初始化任务数据库"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        # 任务表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS tasks (
                id TEXT PRIMARY KEY,
                name TEXT,
                url TEXT,
                schedule TEXT,
                parser_config TEXT,
                max_retries INTEGER,
                timeout INTEGER,
                created_at TIMESTAMP,
                enabled INTEGER DEFAULT 1
            )
        ''')
        
        # 执行记录表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS executions (
                id TEXT PRIMARY KEY,
                task_id TEXT,
                started_at TIMESTAMP,
                finished_at TIMESTAMP,
                status TEXT,
                error_message TEXT,
                data_count INTEGER,
                execution_time REAL,
                retry_count INTEGER,
                FOREIGN KEY (task_id) REFERENCES tasks (id)
            )
        ''')
        
        # 数据质量表
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS data_quality (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                task_id TEXT,
                execution_id TEXT,
                check_time TIMESTAMP,
                metric_name TEXT,
                metric_value REAL,
                threshold REAL,
                passed INTEGER,
                FOREIGN KEY (task_id) REFERENCES tasks (id),
                FOREIGN KEY (execution_id) REFERENCES executions (id)
            )
        ''')
        
        conn.commit()
        conn.close()
    
    def add_task(self, task: Task, handler: Callable):
        """添加任务"""
        self.tasks[task.id] = task
        self.task_handlers[task.id] = handler
        
        # 保存到数据库
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT OR REPLACE INTO tasks 
            (id, name, url, schedule, parser_config, max_retries, timeout, created_at)
            VALUES (?, ?, ?, ?, ?, ?, ?, ?)
        ''', (
            task.id,
            task.name,
            task.url,
            task.schedule,
            json.dumps(task.parser_config),
            task.max_retries,
            task.timeout,
            task.created_at.isoformat()
        ))
        
        conn.commit()
        conn.close()
        
        self.logger.info(f"✅ 添加任务: {task.name}")
    
    def remove_task(self, task_id: str):
        """移除任务"""
        if task_id in self.tasks:
            del self.tasks[task_id]
            del self.task_handlers[task_id]
            
            conn = sqlite3.connect(self.db_path)
            cursor = conn.cursor()
            cursor.execute('DELETE FROM tasks WHERE id = ?', (task_id,))
            conn.commit()
            conn.close()
            
            self.logger.info(f"🗑️ 移除任务: {task_id}")
    
    def load_tasks(self):
        """从数据库加载任务"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        cursor.execute('SELECT * FROM tasks WHERE enabled = 1')
        
        rows = cursor.fetchall()
        for row in rows:
            task = Task(
                id=row[0],
                name=row[1],
                url=row[2],
                schedule=row[3],
                parser_config=json.loads(row[4]),
                max_retries=row[5],
                timeout=row[6],
                created_at=datetime.fromisoformat(row[7])
            )
            self.tasks[task.id] = task
        
        conn.close()
        self.logger.info(f"📥 加载 {len(self.tasks)} 个任务")
    
    def schedule_all_tasks(self):
        """调度所有任务"""
        for task_id, task in self.tasks.items():
            # 解析cron表达式
            if task.schedule == 'hourly':
                schedule.every().hour.do(self.run_task, task_id)
            elif task.schedule == 'daily':
                schedule.every().day.at("00:00").do(self.run_task, task_id)
            elif task.schedule.startswith('every'):
                # 解析 every.10.minutes
                parts = task.schedule.split('.')
                if len(parts) == 3:
                    interval = int(parts[1])
                    unit = parts[2]
                    
                    if unit == 'minutes':
                        schedule.every(interval).minutes.do(self.run_task, task_id)
                    elif unit == 'hours':
                        schedule.every(interval).hours.do(self.run_task, task_id)
                    elif unit == 'days':
                        schedule.every(interval).days.do(self.run_task, task_id)
            else:
                # 默认每小时
                schedule.every().hour.do(self.run_task, task_id)
            
            self.logger.info(f"⏰ 调度任务: {task.name} - {task.schedule}")
    
    def run_task(self, task_id: str, retry_count: int = 0):
        """执行单个任务"""
        if task_id not in self.tasks or task_id not in self.task_handlers:
            self.logger.error(f"任务不存在: {task_id}")
            return
        
        task = self.tasks[task_id]
        handler = self.task_handlers[task_id]
        
        execution_id = f"{task_id}_{datetime.now().strftime('%Y%m%d%H%M%S')}"
        started_at = datetime.now()
        
        self.logger.info(f"🚀 开始执行: {task.name} (尝试 {retry_count + 1})")
        
        # 记录执行开始
        self.record_execution_start(execution_id, task_id, started_at)
        
        try:
            # 执行任务
            result = handler(task.url, task.parser_config)
            
            finished_at = datetime.now()
            execution_time = (finished_at - started_at).total_seconds()
            
            # 数据质量检查
            quality_passed = self.check_data_quality(task_id, execution_id, result)
            
            if quality_passed:
                status = TaskStatus.SUCCESS
                error_message = None
                self.logger.info(f"✅ 任务成功: {task.name}, 耗时: {execution_time:.1f}s")
            else:
                status = TaskStatus.FAILED
                error_message = "数据质量检查失败"
                self.logger.warning(f"⚠️ 数据质量检查失败: {task.name}")
            
            # 记录执行结果
            self.record_execution_finish(
                execution_id, finished_at, status.value, 
                error_message, len(result) if isinstance(result, list) else 0,
                execution_time, retry_count
            )
            
            return result
            # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
        except Exception as e:
            finished_at = datetime.now()
            execution_time = (finished_at - started_at).total_seconds()
            
            error_msg = str(e)
            self.logger.error(f"❌ 任务失败: {task.name}, 错误: {error_msg}")
            
            # 记录失败
            self.record_execution_finish(
                execution_id, finished_at, TaskStatus.FAILED.value,
                error_msg, 0, execution_time, retry_count
            )
            
            # 重试逻辑
            if retry_count < task.max_retries:
                delay = 2 ** retry_count  # 指数退避
                self.logger.info(f"🔄 将在 {delay} 秒后重试...")
                
                time.sleep(delay)
                return self.run_task(task_id, retry_count + 1)
            else:
                self.logger.error(f"💥 任务重试次数用尽: {task.name}")
                
                # 发送告警
                self.send_alert(task, error_msg)
                
                return None
    
    def check_data_quality(self, task_id: str, execution_id: str, data) -> bool:
        """检查数据质量"""
        if not data:
            self.record_quality_check(task_id, execution_id, 'has_data', 0, 1, False)
            return False
        
        checks = [
            ('data_count', len(data) if isinstance(data, list) else 1, 1),
            ('null_ratio', self.calculate_null_ratio(data), 0.3),
        ]
        
        passed_all = True
        
        for metric_name, metric_value, threshold in checks:
            passed = metric_value <= threshold if metric_name == 'null_ratio' else metric_value >= threshold
            self.record_quality_check(task_id, execution_id, metric_name, metric_value, threshold, passed)
            
            if not passed:
                passed_all = False
                self.logger.warning(f"数据质量检查失败: {metric_name}={metric_value}, 阈值={threshold}")
        
        return passed_all
    
    def calculate_null_ratio(self, data):
        """计算空值比例"""
        if not isinstance(data, list) or not data:
            return 1.0
        
        if isinstance(data[0], dict):
            total_cells = len(data) * len(data[0])
            null_cells = sum(
                1 for row in data 
                for value in row.values() 
                if value is None or str(value).strip() == ''
            )
            return null_cells / total_cells if total_cells > 0 else 1.0
        
        return 0.0
    
    def record_execution_start(self, execution_id: str, task_id: str, started_at: datetime):
        """记录执行开始"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO executions (id, task_id, started_at, status)
            VALUES (?, ?, ?, ?)
        ''', (execution_id, task_id, started_at.isoformat(), TaskStatus.RUNNING.value))
        
        conn.commit()
        conn.close()
    
    def record_execution_finish(self, execution_id: str, finished_at: datetime, 
                              status: str, error_message: str, 
                              data_count: int, execution_time: float, 
                              retry_count: int):
        """记录执行完成"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            UPDATE executions 
            SET finished_at = ?, status = ?, error_message = ?, 
                data_count = ?, execution_time = ?, retry_count = ?
            WHERE id = ?
        ''', (
            finished_at.isoformat(), status, error_message,
            data_count, execution_time, retry_count, execution_id
        ))
        
        conn.commit()
        conn.close()
    
    def record_quality_check(self, task_id: str, execution_id: str, 
                           metric_name: str, metric_value: float, 
                           threshold: float, passed: bool):
        """记录质量检查结果"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cursor.execute('''
            INSERT INTO data_quality 
            (task_id, execution_id, check_time, metric_name, metric_value, threshold, passed)
            VALUES (?, ?, ?, ?, ?, ?, ?)
        ''', (
            task_id, execution_id, datetime.now().isoformat(),
            metric_name, metric_value, threshold, 1 if passed else 0
        ))
        
        conn.commit()
        conn.close()
    
    def send_alert(self, task: Task, error_message: str):
        """发送告警"""
        # 这里可以实现邮件、钉钉、企业微信告警
        alert_message = f"""
        🚨 数据采集告警
        任务: {task.name}
        URL: {task.url}
        时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
        错误: {error_message}
        重试次数: {task.max_retries} 次已用尽
        """
        # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
        self.logger.error(alert_message)
        
        # 示例: 打印到控制台
        print(alert_message)
    
    def start(self):
        """启动调度器"""
        self.running = True
        self.schedule_all_tasks()
        
        self.worker_thread = threading.Thread(target=self.run_scheduler, daemon=True)
        self.worker_thread.start()
        
        self.logger.info("🚀 调度器已启动")
    
    def run_scheduler(self):
        """调度器主循环"""
        while self.running:
            schedule.run_pending()
            time.sleep(1)
    
    def stop(self):
        """停止调度器"""
        self.running = False
        if self.worker_thread:
            self.worker_thread.join(timeout=5)
        
        self.logger.info("🛑 调度器已停止")
    
    def get_task_stats(self, days: int = 7) -> Dict:
        """获取任务统计"""
        conn = sqlite3.connect(self.db_path)
        cursor = conn.cursor()
        
        cutoff = (datetime.now() - timedelta(days=days)).isoformat()
        
        # 成功率
        cursor.execute('''
            SELECT 
                task_id,
                COUNT(*) as total,
                SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) as success
            FROM executions 
            WHERE started_at > ?
            GROUP BY task_id
        ''', (cutoff,))
        
        stats = {}
        for row in cursor.fetchall():
            task_id, total, success = row
            if total > 0:
                stats[task_id] = {
                    'total': total,
                    'success': success,
                    'success_rate': success / total
                }
        
        # 平均执行时间
        cursor.execute('''
            SELECT task_id, AVG(execution_time) as avg_time
            FROM executions 
            WHERE started_at > ? AND status = 'success'
            GROUP BY task_id
        ''', (cutoff,))
        
        for row in cursor.fetchall():
            task_id, avg_time = row
            if task_id in stats:
                stats[task_id]['avg_time'] = avg_time
        
        conn.close()
        return stats

# 使用示例
if __name__ == "__main__":
    # 创建调度器
    scheduler = SelfHealingScheduler()
    
    # 定义任务处理器
    def sample_handler(url, config):
        """示例任务处理器"""
        import requests
        response = requests.get(url, timeout=10)
        
        if response.status_code == 200:
            # 模拟数据处理
            return [{'id': i, 'data': f'sample_{i}'} for i in range(10)]
        else:
            raise Exception(f"HTTP {response.status_code}")
    
    # 添加任务
    task = Task(
        id='test_task_1',
        name='示例采集任务',
        url='https://httpbin.org/json',
        schedule='every.5.minutes',  # 每5分钟
        parser_config={'type': 'json'},
        max_retries=2,
        timeout=30
    )
    
    scheduler.add_task(task, sample_handler)
    
    # 启动调度器
    print("⏰ 启动调度器,运行5分钟后停止...")
    scheduler.start()
    
    try:
        # 运行5分钟
        time.sleep(300)
        
        # 获取统计
        stats = scheduler.get_task_stats(1)
        print("\n📊 任务统计:")
        for task_id, stat in stats.items():
            print(f"{task_id}: 成功率 {stat['success_rate']:.1%}, "
                  f"平均耗时 {stat.get('avg_time', 0):.1f}s")
        
    finally:
        scheduler.stop()

四、 实施路径:12周转型计划

第1-2周:现状评估与试点

  1. 盘点数据源:列出所有人工采集的数据源

  2. 选择试点:选取3-5个高价值、规则明确的数据源

  3. 建立基线:记录当前人工采集的时间、成本、错误率

第3-6周:工具开发与部署

  1. 开发核心引擎:使用上述代码构建基础采集系统

  2. 建立数据管道:从采集→清洗→存储的完整流程

  3. 培训关键用户:教会业务人员使用新工具

第7-9周:规模化推广

  1. 迁移更多数据源:每周迁移5-10个数据源

  2. 建立监控体系:数据质量、系统健康度监控

  3. 优化性能:处理速度、稳定性优化

第10-12周:智能化升级

  1. 引入AI能力:智能解析、异常检测

  2. 建立告警机制:自动发现问题并通知

  3. 编制操作手册:SOP、故障处理指南


五、 成功案例:转型前后对比

指标
转型前(人工)
转型后(智能工厂)
提升
采集效率
1人天/数据源
5分钟/数据源
96倍
数据延迟
1-3天
分钟级
99%减少
错误率
5-10%
<0.1%
99%降低
人力成本
5人团队
1人维护
80%降低
数据价值
滞后决策
实时洞察
无法量化

💡 最后建议

不要试图一步到位。从最有痛点的1-2个场景开始,快速验证价值,然后逐步扩展。记住:完成比完美更重要
立即行动清单
  1. [ ] 盘点你团队中还在人工采集的数据

  2. [ ] 选择一个本周就可以自动化的数据源

  3. [ ] 运行上面的示例代码,看看效果

  4. [ ] 制定你的12周转型计划

数据是新时代的石油,但人工采集就像用勺子挖油井。是时候升级到智能钻井平台了。
互动话题:你的团队还在人工采集哪些数据?最大的痛点是什么?评论区聊聊,我可以给你针对性的自动化建议!


群贤毕至

访客