×

🔥 遗留系统:企业的“技术负债”与现代化集成实战指南(附Python源码)

万邦科技Lex 万邦科技Lex 发表于2026-05-19 16:57:49 浏览23 评论0

抢沙发发表评论

🔥 遗留系统:企业的“技术负债”与现代化集成实战指南(附Python源码)

在数字化转型浪潮中,遗留系统(Legacy System) 是企业最头疼的“技术负债”——它像一座“信息孤岛”,数据有价值但难以获取,功能能用但难以扩展。本文将深入剖析遗留系统的本质,并提供完整的现代化集成方案与Python实战源码。

一、 什么是遗留系统?不只是“老代码”那么简单

1. 遗留系统的精准定义

遗留系统不是简单的“旧系统”,它具备以下至少三个特征:
特征
表现
典型案例
技术过时
使用淘汰技术栈(COBOL、VB6、Delphi)
银行核心系统、制造业ERP
文档缺失
无API文档、无架构图、原开发团队离职
自研的库存管理系统
维护困难
无人敢修改、改一处崩一片
20年前的财务系统
数据孤岛
只能通过专有协议/界面访问
串口通信的工控系统
高耦合
与硬件/操作系统深度绑定
Windows XP + Access数据库应用

2. 为什么企业无法抛弃遗留系统?

  • 业务关键性:运行核心业务流程(如银行交易清算)

  • 迁移成本:重写需数年,成本数百万到数亿

  • 数据价值:积累数十年的业务数据

  • 合规要求:满足特定行业监管(如医疗HIPAA)


二、 遗留系统集成的五种策略模式

根据系统复杂度和业务需求,选择适合的集成策略:
graph TD
    A[遗留系统] --> B{选择集成策略}
    B --> C[策略1: 数据库直连]
    B --> D[策略2: 文件交换]
    B --> E[策略3: API包装]
    B --> F[策略4: 消息队列]
    B --> G[策略5: 界面自动化]
    
    C --> H[简单快速 有安全风险]
    D --> I[稳定通用 延迟高]
    E --> J[现代优雅 需改造]
    F --> K[异步解耦 架构复杂]
    G --> L[无侵入 脆弱不稳定]

三、 Python实战:五种集成策略源码实现

策略1:数据库直连模式

适用场景:遗留系统使用标准数据库(Oracle、SQL Server),且你有只读权限
# strategy_database.py
import pyodbc
import pandas as pd
from sqlalchemy import create_engine
from contextlib import contextmanager
import warnings
warnings.filterwarnings('ignore')
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class LegacyDatabaseConnector:
    """遗留系统数据库直连集成器"""
    
    def __init__(self, db_type='sqlserver'):
        self.db_configs = {
            'sqlserver': {
                'driver': '{ODBC Driver 17 for SQL Server}',
                'server': 'legacy-server',
                'database': 'LegacyERP',
                'username': 'readonly_user',
                'password': 'SecurePass123!'
            },
            'oracle': {
                'user': 'legacy_user',
                'password': 'password',
                'dsn': 'legacy_db'
            }
        }
    
    @contextmanager
    def get_connection(self, db_type='sqlserver'):
        """安全获取数据库连接(上下文管理器确保关闭)"""
        conn = None
        try:
            if db_type == 'sqlserver':
                config = self.db_configs[db_type]
                conn_str = (
                    f"DRIVER={config['driver']};"
                    f"SERVER={config['server']};"
                    f"DATABASE={config['database']};"
                    f"UID={config['username']};"
                    f"PWD={config['password']}"
                )
                conn = pyodbc.connect(conn_str)
            elif db_type == 'oracle':
                import cx_Oracle
                config = self.db_configs[db_type]
                conn = cx_Oracle.connect(
                    config['user'], 
                    config['password'], 
                    config['dsn']
                )
            
            print(f"✅ 成功连接 {db_type.upper()} 遗留数据库")
            yield conn
        except Exception as e:
            print(f"❌ 数据库连接失败: {e}")
            raise
        finally:
            if conn:
                conn.close()
                print("🔌 数据库连接已关闭")
    
    def extract_legacy_data(self, query, params=None, db_type='sqlserver'):
        """从遗留数据库提取数据"""
        with self.get_connection(db_type) as conn:
            # 使用参数化查询防止SQL注入
            df = pd.read_sql(query, conn, params=params)
            
            # 数据质量检查
            self._validate_data(df)
            
            # 数据类型转换(处理遗留系统的奇怪类型)
            df = self._convert_data_types(df)
            
            return df
    
    def sync_to_modern_db(self, legacy_query, modern_table, transform_func=None):
        """从遗留库同步到现代数据库"""
        print(f"🔄 同步数据: {legacy_query} -> {modern_table}")
        
        # 1. 从遗留系统提取
        legacy_df = self.extract_legacy_data(legacy_query)
        
        # 2. 数据转换(如需要)
        if transform_func:
            legacy_df = transform_func(legacy_df)
        
        # 3. 写入现代数据库(如PostgreSQL)
        modern_engine = create_engine('postgresql://user:pass@modern-db:5432/app_db')
        legacy_df.to_sql(modern_table, modern_engine, if_exists='replace', index=False)
        
        print(f"✅ 同步完成: {len(legacy_df)} 条记录")
        return legacy_df
    
    def _validate_data(self, df):
        """数据验证"""
        if df.empty:
            print("⚠️ 警告: 查询返回空结果")
        
        # 检查空值比例
        null_ratio = df.isnull().sum().sum() / (df.shape[0] * df.shape[1])
        if null_ratio > 0.3:
            print(f"⚠️ 警告: 数据空值率过高 ({null_ratio:.1%})")
    
    def _convert_data_types(self, df):
        """处理遗留系统特有的数据类型"""
        for col in df.columns:
            # 处理COBOL的压缩十进制
            if df[col].dtype == 'object':
                try:
                    # 尝试转换数值
                    df[col] = pd.to_numeric(df[col], errors='ignore')
                except:
                    pass
        
        # 处理日期格式多样性
        date_cols = [col for col in df.columns if 'date' in col.lower()]
        for col in date_cols:
            try:
                df[col] = pd.to_datetime(df[col], errors='coerce', format='mixed')
            except:
                pass
        
        return df

# 实战示例
if __name__ == "__main__":
    connector = LegacyDatabaseConnector()
    
    # 示例1: 同步客户数据
    customer_df = connector.extract_legacy_data(
        "SELECT CustID, CustName, RegDate FROM Customers WHERE Status = 'A'"
    )
    print(f"📊 获取到 {len(customer_df)} 条客户数据")
    print(customer_df.head())
    
    # 示例2: 全量同步到现代数据库
    def transform_customers(df):
        """数据转换函数:标准化字段名"""
        df = df.rename(columns={
            'CustID': 'customer_id',
            'CustName': 'customer_name',
            'RegDate': 'registration_date'
        })
        return df
    
    # connector.sync_to_modern_db(
    #     "SELECT * FROM Orders WHERE OrderDate > '2024-01-01'",
    #     "legacy_orders",
    #     transform_customers
    # )

策略2:文件交换模式

适用场景:遗留系统只能生成文件(CSV、Excel、定宽文件)。
# strategy_file.py
import pandas as pd
import os
import csv
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class LegacyFileIntegration:
    """文件交换模式集成器(监控文件夹变化)"""
    
    def __init__(self, watch_folder='./legacy_exports/'):
        self.watch_folder = watch_folder
        os.makedirs(watch_folder, exist_ok=True)
        
    def parse_fixed_width_file(self, filepath, col_specs):
        """解析定宽文本文件(银行、保险业常见)"""
        print(f"📄 解析定宽文件: {filepath}")
        
        data = []
        with open(filepath, 'r', encoding='gb2312') as f:  # 处理中文编码
            for line in f:
                if len(line.strip()) == 0:
                    continue
                
                row = {}
                for col_name, (start, end) in col_specs.items():
                    try:
                        value = line[start-1:end].strip()
                        row[col_name] = value
                    except:
                        row[col_name] = None
                data.append(row)
        
        return pd.DataFrame(data)
    
    def parse_cobol_file(self, filepath, copybook_path):
        """解析COBOL生成的文件(需要copybook定义)"""
        # 简化的COBOL解析逻辑
        print(f"🖥️ 解析COBOL文件: {filepath}")
        
        # 实际应用中可使用cb2py等库
        # 这里返回模拟数据
        return pd.DataFrame({
            'account_no': ['001', '002', '003'],
            'balance': [1000.0, 2500.0, 500.0],
            'last_transaction': ['2024-05-01', '2024-05-10', '2024-05-15']
        })
    
    def watch_folder_changes(self, handler):
        """监控文件夹变化(遗留系统定期导出文件)"""
        print(f"👀 开始监控文件夹: {self.watch_folder}")
        
        event_handler = handler
        observer = Observer()
        observer.schedule(event_handler, self.watch_folder, recursive=False)
        observer.start()
        
        try:
            while True:
                time.sleep(1)
        except KeyboardInterrupt:
            observer.stop()
        observer.join()

class LegacyFileHandler(FileSystemEventHandler):
    """文件变化处理器"""
    
    def on_created(self, event):
        if not event.is_directory and event.src_path.endswith('.csv'):
            print(f"🆕 检测到新文件: {event.src_path}")
            self.process_file(event.src_path)
    
    def process_file(self, filepath):
        """处理遗留系统生成的文件"""
        try:
            # 根据文件名判断处理逻辑
            filename = os.path.basename(filepath)
            
            if 'CUST' in filename.upper():
                df = pd.read_csv(filepath, encoding='gbk')
                print(f"👥 客户文件: {len(df)} 条记录")
                # 转换后发送到消息队列或API
                self.send_to_api(df, 'customers')
                
            elif 'ORDER' in filename.upper():
                df = pd.read_csv(filepath, encoding='gbk')
                print(f"📦 订单文件: {len(df)} 条记录")
                self.send_to_api(df, 'orders')
                
            elif filename.endswith('.txt'):
                # 处理定宽文件
                col_specs = {
                    'field1': (1, 10),
                    'field2': (11, 20),
                    'field3': (21, 30)
                }
                df = self.parse_fixed_width_file(filepath, col_specs)
                self.send_to_api(df, 'transactions')
            
            # 移动已处理文件
            archive_folder = './processed/'
            os.makedirs(archive_folder, exist_ok=True)
            os.rename(filepath, os.path.join(archive_folder, filename))
            
        except Exception as e:
            print(f"❌ 文件处理失败: {e}")
    
    def send_to_api(self, df, endpoint):
        """发送到现代系统API"""
        # 这里实现API调用逻辑
        print(f"📤 发送 {len(df)} 条数据到 {endpoint} API")
        return True

# 实战示例
if __name__ == "__main__":
    integrator = LegacyFileIntegration()
    
    # 示例1: 解析COBOL文件
    cobol_df = integrator.parse_cobol_file('legacy_exports/ACCT20240520.DAT', 'copybook.cpy')
    print("COBOL数据示例:", cobol_df.head())
    
    # 示例2: 启动文件夹监控
    # handler = LegacyFileHandler()
    # integrator.watch_folder_changes(handler)

策略3:API包装模式(推荐)

适用场景:遗留系统有网络接口但无REST API。
# strategy_api_wrapper.py
from flask import Flask, jsonify, request
import xmlrpc.client
import socket
import struct
import json
from typing import Any, Dict
import pandas as pd
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class LegacyAPIWrapper:
    """遗留系统API包装器(将老旧协议包装为REST API)"""
    
    def __init__(self):
        self.app = Flask(__name__)
        self.setup_routes()
        
    def setup_routes(self):
        """设置REST API路由"""
        
        @self.app.route('/api/v1/legacy/customers/<customer_id>', methods=['GET'])
        def get_customer(customer_id):
            """包装遗留系统的客户查询"""
            try:
                # 方式1: 通过XML-RPC调用
                result = self.call_via_xmlrpc('getCustomerInfo', customer_id)
                
                # 方式2: 通过Socket调用(自定义二进制协议)
                # result = self.call_via_socket_protocol('CUST_QUERY', customer_id)
                
                return jsonify({
                    'success': True,
                    'data': result,
                    'source': 'legacy_system'
                })
            except Exception as e:
                return jsonify({
                    'success': False,
                    'error': str(e)
                }), 500
        
        @self.app.route('/api/v1/legacy/orders', methods=['POST'])
        def create_order():
            """创建订单(将REST请求转换为遗留系统调用)"""
            data = request.json
            try:
                # 数据转换
                legacy_format = self.convert_to_legacy_format(data)
                
                # 调用遗留系统
                order_id = self.call_via_xmlrpc('createOrder', legacy_format)
                
                return jsonify({
                    'success': True,
                    'order_id': order_id,
                    'message': 'Order created in legacy system'
                }), 201
            except Exception as e:
                return jsonify({'error': str(e)}), 500
    
    def call_via_xmlrpc(self, method: str, *args) -> Any:
        """通过XML-RPC调用遗留系统"""
        print(f"📞 调用遗留系统XML-RPC: {method}")
        
        try:
            # 连接遗留系统的XML-RPC服务
            proxy = xmlrpc.client.ServerProxy("http://legacy-server:8000/RPC2")
            
            # 动态调用方法
            result = getattr(proxy, method)(*args)
            
            # 转换响应格式
            return self.transform_legacy_response(result)
        except Exception as e:
            print(f"XML-RPC调用失败: {e}")
            raise
    
    def call_via_socket_protocol(self, command: str, data: str) -> Dict:
        """通过Socket调用遗留系统(自定义二进制协议)"""
        print(f"🔌 Socket调用: {command}")
        
        # 遗留系统常见的自定义协议格式
        # 头部: 4字节命令码 + 4字节数据长度
        # 数据: 实际数据
        
        try:
            sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            sock.settimeout(10.0)
            sock.connect(('legacy-server', 9000))
            
            # 构建请求
            cmd_code = self.command_to_code(command)
            data_bytes = data.encode('gbk')
            
            # 构建协议头
            header = struct.pack('!II', cmd_code, len(data_bytes))
            
            # 发送
            sock.sendall(header + data_bytes)
            
            # 接收响应
            response_header = sock.recv(8)
            resp_code, resp_len = struct.unpack('!II', response_header)
            
            response_data = sock.recv(resp_len)
            sock.close()
            
            return self.parse_binary_response(response_data)
            
        except socket.timeout:
            raise Exception("Legacy system timeout")
        except Exception as e:
            raise Exception(f"Socket error: {e}")
    
    def call_via_odbc_stored_proc(self, proc_name: str, params: dict):
        """通过ODBC调用存储过程"""
        import pyodbc
        
        conn = pyodbc.connect('DSN=LegacyDB')
        cursor = conn.cursor()
        
        # 构建参数占位符
        param_placeholders = ','.join(['?'] * len(params))
        sql = f"{{CALL {proc_name}({param_placeholders})}}"
        
        cursor.execute(sql, list(params.values()))
        
        # 处理结果集
        results = []
        try:
            while True:
                row = cursor.fetchone()
                if not row:
                    break
                results.append(dict(zip([column[0] for column in cursor.description], row)))
        except pyodbc.ProgrammingError:
            # 没有结果集
            pass
        
        conn.close()
        return results
    
    def transform_legacy_response(self, legacy_data):
        """转换遗留系统响应格式"""
        if isinstance(legacy_data, dict):
            # 键名标准化
            key_mapping = {
                'CUST_NO': 'customer_id',
                'CUST_NM': 'customer_name',
                'TEL_NO': 'phone',
                'ADDR': 'address'
            }
            
            transformed = {}
            for k, v in legacy_data.items():
                new_key = key_mapping.get(k, k.lower())
                transformed[new_key] = v
            
            return transformed
        
        return legacy_data
    
    def convert_to_legacy_format(self, modern_data: dict) -> dict:
        """将现代数据格式转换为遗留系统格式"""
        mapping = {
            'customer_id': 'CUST_NO',
            'amount': 'AMT',
            'order_date': 'ORD_DT'
        }
        
        legacy_data = {}
        for modern_key, legacy_key in mapping.items():
            if modern_key in modern_data:
                legacy_data[legacy_key] = modern_data[modern_key]
        
        return legacy_data
    
    def command_to_code(self, command: str) -> int:
        """命令转代码(模拟遗留系统协议)"""
        cmd_map = {
            'CUST_QUERY': 0x1001,
            'ORDER_CREATE': 0x2001,
            'INV_QUERY': 0x3001
        }
        return cmd_map.get(command, 0)
    
    def parse_binary_response(self, data: bytes) -> Dict:
        """解析二进制响应"""
        # 简化的解析逻辑
        return {'raw_data': data.hex()}
    
    def run(self, host='0.0.0.0', port=5000):
        """启动API包装器"""
        print(f"🚀 启动遗留系统API包装器: http://{host}:{port}")
        self.app.run(host=host, port=port, debug=False)

# 实战示例
if __name__ == "__main__":
    wrapper = LegacyAPIWrapper()
    
    # 启动Flask服务
    # wrapper.run()
    
    # 客户端调用示例
    import requests
    
    # 模拟调用包装后的API
    print("📡 测试API包装器:")
    
    # 1. 查询客户
    # response = requests.get('http://localhost:5000/api/v1/legacy/customers/1001')
    # print("客户数据:", response.json())
    
    # 2. 创建订单
    order_data = {
        'customer_id': '1001',
        'amount': 999.99,
        'order_date': '2024-05-20',
        'items': [{'product_id': 'P001', 'qty': 2}]
    }
    # response = requests.post('http://localhost:5000/api/v1/legacy/orders', json=order_data)
    # print("创建订单结果:", response.json())

策略4:消息队列模式

适用场景:需要异步、解耦的集成。
# strategy_message_queue.py
import pika
import json
import threading
import time
from datetime import datetime
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class LegacyMessageQueueBridge:
    """消息队列桥接器(将遗留系统接入现代消息队列)"""
    
    def __init__(self, mq_host='localhost'):
        self.mq_host = mq_host
        self.setup_rabbitmq()
    
    def setup_rabbitmq(self):
        """设置RabbitMQ连接"""
        self.connection = pika.BlockingConnection(
            pika.ConnectionParameters(self.mq_host)
        )
        self.channel = self.connection.channel()
        
        # 声明交换机和队列
        self.channel.exchange_declare(
            exchange='legacy_integration',
            exchange_type='topic',
            durable=True
        )
        
        # 遗留系统队列
        self.channel.queue_declare(queue='legacy_system_queue', durable=True)
        self.channel.queue_bind(
            queue='legacy_system_queue',
            exchange='legacy_integration',
            routing_key='legacy.*'
        )
        
        # 现代系统队列
        self.channel.queue_declare(queue='modern_system_queue', durable=True)
        self.channel.queue_bind(
            queue='modern_system_queue',
            exchange='legacy_integration',
            routing_key='modern.*'
        )
    
    def listen_to_legacy_system(self):
        """监听遗留系统消息(模拟遗留系统通过Socket发送数据)"""
        print("👂 开始监听遗留系统...")
        
        def legacy_simulator():
            """模拟遗留系统定期发送数据"""
            messages = [
                {'type': 'SALE', 'amount': 100.0, 'store': '001'},
                {'type': 'RETURN', 'amount': 25.5, 'store': '002'},
                {'type': 'INVENTORY', 'product': 'P1001', 'qty': 50}
            ]
            
            for msg in messages:
                time.sleep(2)  # 模拟间隔
                self.publish_to_queue('legacy.event', msg)
                print(f"📨 遗留系统发送: {msg}")
        
        # 启动模拟线程
        thread = threading.Thread(target=legacy_simulator, daemon=True)
        thread.start()
    
    def publish_to_queue(self, routing_key, message):
        """发布消息到队列"""
        self.channel.basic_publish(
            exchange='legacy_integration',
            routing_key=routing_key,
            body=json.dumps(message),
            properties=pika.BasicProperties(
                delivery_mode=2,  # 持久化
                timestamp=int(time.time())
            )
        )
    
    def consume_legacy_messages(self):
        """消费遗留系统消息(转换为现代格式)"""
        print("🔄 开始处理遗留系统消息...")
        
        def callback(ch, method, properties, body):
            try:
                message = json.loads(body)
                print(f"📩 收到遗留消息: {message}")
                
                # 消息转换
                transformed = self.transform_legacy_message(message)
                
                # 转发到现代系统
                self.forward_to_modern_system(transformed)
                
                # 确认消息
                ch.basic_ack(delivery_tag=method.delivery_tag)
                
            except Exception as e:
                print(f"❌ 消息处理失败: {e}")
                # 记录到死信队列
                self.send_to_dlq(body, str(e))
        
        self.channel.basic_consume(
            queue='legacy_system_queue',
            on_message_callback=callback
        )
        
        self.channel.start_consuming()
    
    def forward_to_modern_system(self, message):
        """转发到现代系统(模拟API调用)"""
        print(f"🚀 转发到现代系统: {message}")
        
        # 这里可以实现实际的API调用
        # requests.post('http://modern-system/api/data', json=message)
        
        # 或者发布到现代系统队列
        self.publish_to_queue('modern.event', message)
    
    def transform_legacy_message(self, legacy_msg):
        """转换遗留系统消息格式"""
        transformations = {
            'SALE': self._transform_sale,
            'RETURN': self._transform_return,
            'INVENTORY': self._transform_inventory
        }
        
        msg_type = legacy_msg.get('type', 'UNKNOWN')
        transformer = transformations.get(msg_type, lambda x: x)
        
        transformed = transformer(legacy_msg)
        transformed['_metadata'] = {
            'source': 'legacy_system',
            'transformed_at': datetime.now().isoformat(),
            'original_type': msg_type
        }
        
        return transformed
    
    def _transform_sale(self, msg):
        return {
            'event_type': 'sale_completed',
            'transaction_amount': msg.get('amount', 0),
            'store_id': msg.get('store'),
            'currency': 'CNY'
        }
    
    def _transform_return(self, msg):
        return {
            'event_type': 'return_processed',
            'return_amount': abs(msg.get('amount', 0)),
            'store_id': msg.get('store'),
            'refund_method': 'original'
        }
    
    def _transform_inventory(self, msg):
        return {
            'event_type': 'inventory_update',
            'sku': msg.get('product'),
            'quantity': msg.get('qty', 0),
            'update_type': 'stock_adjustment'
        }
    
    def send_to_dlq(self, message, error):
        """发送到死信队列(处理失败的消息)"""
        dlq_message = {
            'original_message': json.loads(message) if isinstance(message, bytes) else message,
            'error': error,
            'failed_at': datetime.now().isoformat()
        }
        
        self.channel.queue_declare(queue='legacy_dlq', durable=True)
        self.channel.basic_publish(
            exchange='',
            routing_key='legacy_dlq',
            body=json.dumps(dlq_message)
        )
        print(f"⚰️ 消息发送到死信队列: {error}")
    
    def run(self):
        """运行消息队列桥接"""
        print("🚀 启动消息队列桥接器")
        
        # 监听遗留系统
        self.listen_to_legacy_system()
        
        # 开始消费消息
        self.consume_legacy_messages()

# 实战示例
if __name__ == "__main__":
    bridge = LegacyMessageQueueBridge()
    
    # 启动桥接器
    # bridge.run()
    
    print("📋 消息队列桥接器就绪")
    print("模拟遗留系统会每2秒发送一条消息...")
    
    # 实际运行需要RabbitMQ服务

策略5:界面自动化模式(最后手段)

适用场景:只有图形界面,无任何接口。
# strategy_gui_automation.py
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import Select
import pandas as pd
import time
import pyautogui
import pyperclip

class LegacyGUIAutomation:
    """界面自动化集成(最后手段)"""
    
    def __init__(self, headless=False):
        self.headless = headless
        self.setup_driver()
    
    def setup_driver(self):
        """设置浏览器驱动"""
        options = webdriver.ChromeOptions()
        if self.headless:
            options.add_argument('--headless')
        
        # 绕过反自动化检测
        options.add_argument('--disable-blink-features=AutomationControlled')
        options.add_experimental_option("excludeSwitches", ["enable-automation"])
        options.add_experimental_option('useAutomationExtension', False)
        
        self.driver = webdriver.Chrome(options=options)
        
        # 修改webdriver属性
        self.driver.execute_script(
            "Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
        )
    
    def login_to_legacy_system(self, url, username, password):
        """登录遗留系统(模拟人工操作)"""
        print(f"🔐 登录遗留系统: {url}")
        
        self.driver.get(url)
        time.sleep(2)  # 等待页面加载
        
        try:
            # 尝试多种定位方式
            selectors = [
                "//input[@name='username']",
                "//input[@id='userid']",
                "//input[@type='text']"
            ]
            
            for selector in selectors:
                try:
                    user_input = self.driver.find_element(By.XPATH, selector)
                    user_input.send_keys(username)
                    break
                except:
                    continue
            
            # 查找密码框
            pwd_input = self.driver.find_element(By.XPATH, "//input[@type='password']")
            pwd_input.send_keys(password)
            
            # 查找登录按钮
            login_btn = self.driver.find_element(By.XPATH, "//input[@type='submit']")
            login_btn.click()
            
            time.sleep(3)
            print("✅ 登录成功")
            return True
            
        except Exception as e:
            print(f"❌ 登录失败: {e}")
            return False
    
    def extract_data_from_grid(self, grid_xpath):
        """从表格中提取数据(遗留系统常见)"""
        print("📊 从表格提取数据...")
        
        data = []
        try:
            # 定位表格
            table = self.driver.find_element(By.XPATH, grid_xpath)
            
            # 获取表头
            headers = []
            header_rows = table.find_elements(By.XPATH, ".//th")
            for th in header_rows:
                headers.append(th.text.strip())
            
            # 获取数据行
            rows = table.find_elements(By.XPATH, ".//tr[position()>1]")
            
            for row in rows:
                cols = row.find_elements(By.XPATH, ".//td")
                if len(cols) == len(headers):
                    row_data = {}
                    for i, col in enumerate(cols):
                        row_data[headers[i]] = col.text.strip()
                    data.append(row_data)
            
            return pd.DataFrame(data)
            
        except Exception as e:
            print(f"表格提取失败: {e}")
            return pd.DataFrame()
    
    def fill_form_and_submit(self, form_data):
        """自动填写表单并提交"""
        print("📝 自动填写表单...")
        
        for field, value in form_data.items():
            try:
                # 尝试多种定位方式
                selectors = [
                    f"//input[@name='{field}']",
                    f"//input[@id='{field}']",
                    f"//*[contains(@name, '{field.upper()}')]"
                ]
                
                element = None
                for selector in selectors:
                    try:
                        element = self.driver.find_element(By.XPATH, selector)
                        break
                    except:
                        continue
                
                if element:
                    element.clear()
                    element.send_keys(value)
                else:
                    print(f"⚠️ 未找到字段: {field}")
                    
            except Exception as e:
                print(f"字段 {field} 填写失败: {e}")
        
        # 提交表单
        try:
            submit_btn = self.driver.find_element(By.XPATH, "//input[@type='submit']")
            submit_btn.click()
            time.sleep(2)
            print("✅ 表单提交成功")
        except:
            print("❌ 提交失败")
    
    def export_report(self, report_name, export_format='excel'):
        """导出报表(模拟点击导出按钮)"""
        print(f"📄 导出报表: {report_name}")
        
        try:
            # 查找导出按钮
            export_btns = self.driver.find_elements(
                By.XPATH, "//button[contains(text(), '导出')]"
            )
            if not export_btns:
                export_btns = self.driver.find_elements(
                    By.XPATH, "//a[contains(text(), 'Export')]"
                )
            
            if export_btns:
                export_btns[0].click()
                time.sleep(2)
                
                # 处理文件下载对话框
                self._handle_download_dialog(export_format)
                
                return True
            return False
            
        except Exception as e:
            print(f"导出失败: {e}")
            return False
    
    def _handle_download_dialog(self, file_type):
        """处理文件下载对话框(平台相关)"""
        # Windows系统处理
        if file_type == 'excel':
            pyautogui.write('report.xlsx')
            pyautogui.press('enter')
        elif file_type == 'pdf':
            pyautogui.write('report.pdf')
            pyautogui.press('enter')
        
        time.sleep(1)
    
    def screen_scrape_legacy_app(self):
        """屏幕抓取(针对桌面应用)"""
        print("🖥️ 屏幕抓取桌面应用...")
        
        # 定位应用窗口(需要知道窗口标题)
        app_window = pyautogui.getWindowsWithTitle('Legacy ERP System')[0]
        app_window.activate()
        
        # 截屏
        screenshot = pyautogui.screenshot(region=(
            app_window.left, 
            app_window.top, 
            app_window.width, 
            app_window.height
        ))
        
        screenshot.save('legacy_app_screenshot.png')
        print("📸 屏幕截图已保存")
    
    def close(self):
        """关闭浏览器"""
        if self.driver:
            self.driver.quit()
            print("👋 浏览器已关闭")

# 实战示例
if __name__ == "__main__":
    print("⚠️ 警告: 界面自动化是集成遗留系统的最后手段")
    print("     仅当无其他接口可用时考虑此方案")
    
    # 示例: 自动化登录和数据提取
    automator = LegacyGUIAutomation(headless=True)
    
    # 模拟登录
    # success = automator.login_to_legacy_system(
    #     'http://legacy-erp.internal/login',
    #     'admin',
    #     'password123'
    # )
    
    # if success:
    #     # 提取客户数据
    #     df = automator.extract_data_from_grid("//table[@id='custGrid']")
    #     print(f"提取到 {len(df)} 条客户数据")
    #     print(df.head())
    
    automator.close()

四、 遗留系统现代化架构演进路径

graph LR
    A[遗留系统] --> B[阶段1: 数据同步]
    B --> C[阶段2: API包装]
    C --> D[阶段3: 功能迁移]
    D --> E[阶段4: 系统退役]
    
    B --> B1[数据库/文件同步]
    C --> C1[REST API包装]
    D --> D1[微服务化改造]
    E --> E1[归档历史数据]
建议采用逐步演进的“绞杀者模式”
  1. 先集成:用上述策略打通数据流

  2. 再包装:为遗留功能提供现代API

  3. 后替换:逐个功能迁移到新系统

  4. 终退役:当所有功能迁移完成后关闭旧系统


💡 总结

遗留系统集成没有“银弹”,关键在于:
  1. 评估现状:分析遗留系统的技术栈、数据格式、接口能力

  2. 选择策略:根据业务需求选择合适集成模式

  3. 渐进改造:采用绞杀者模式,避免“大爆炸”式重写

  4. 保障稳定:任何集成都要有回滚和降级方案

记住:遗留系统是企业的“数字资产”,不是“技术负债”。通过恰当的集成策略,你可以让这些“老古董”继续为数字化转型贡献力量,直到最终完成现代化改造。


群贤毕至

访客