🔥 遗留系统:企业的“技术负债”与现代化集成实战指南(附Python源码)
在数字化转型浪潮中,遗留系统(Legacy System) 是企业最头疼的“技术负债”——它像一座“信息孤岛”,数据有价值但难以获取,功能能用但难以扩展。本文将深入剖析遗留系统的本质,并提供完整的现代化集成方案与Python实战源码。
一、 什么是遗留系统?不只是“老代码”那么简单
1. 遗留系统的精准定义
遗留系统不是简单的“旧系统”,它具备以下至少三个特征:
特征 | 表现 | 典型案例 |
|---|---|---|
技术过时 | 使用淘汰技术栈(COBOL、VB6、Delphi) | 银行核心系统、制造业ERP |
文档缺失 | 无API文档、无架构图、原开发团队离职 | 自研的库存管理系统 |
维护困难 | 无人敢修改、改一处崩一片 | 20年前的财务系统 |
数据孤岛 | 只能通过专有协议/界面访问 | 串口通信的工控系统 |
高耦合 | 与硬件/操作系统深度绑定 | Windows XP + Access数据库应用 |
2. 为什么企业无法抛弃遗留系统?
- 业务关键性:运行核心业务流程(如银行交易清算)
- 迁移成本:重写需数年,成本数百万到数亿
- 数据价值:积累数十年的业务数据
- 合规要求:满足特定行业监管(如医疗HIPAA)
二、 遗留系统集成的五种策略模式
根据系统复杂度和业务需求,选择适合的集成策略:
graph TD
A[遗留系统] --> B{选择集成策略}
B --> C[策略1: 数据库直连]
B --> D[策略2: 文件交换]
B --> E[策略3: API包装]
B --> F[策略4: 消息队列]
B --> G[策略5: 界面自动化]
C --> H[简单快速 有安全风险]
D --> I[稳定通用 延迟高]
E --> J[现代优雅 需改造]
F --> K[异步解耦 架构复杂]
G --> L[无侵入 脆弱不稳定]三、 Python实战:五种集成策略源码实现
策略1:数据库直连模式
适用场景:遗留系统使用标准数据库(Oracle、SQL Server),且你有只读权限。
# strategy_database.py
import pyodbc
import pandas as pd
from sqlalchemy import create_engine
from contextlib import contextmanager
import warnings
warnings.filterwarnings('ignore')
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class LegacyDatabaseConnector:
"""遗留系统数据库直连集成器"""
def __init__(self, db_type='sqlserver'):
self.db_configs = {
'sqlserver': {
'driver': '{ODBC Driver 17 for SQL Server}',
'server': 'legacy-server',
'database': 'LegacyERP',
'username': 'readonly_user',
'password': 'SecurePass123!'
},
'oracle': {
'user': 'legacy_user',
'password': 'password',
'dsn': 'legacy_db'
}
}
@contextmanager
def get_connection(self, db_type='sqlserver'):
"""安全获取数据库连接(上下文管理器确保关闭)"""
conn = None
try:
if db_type == 'sqlserver':
config = self.db_configs[db_type]
conn_str = (
f"DRIVER={config['driver']};"
f"SERVER={config['server']};"
f"DATABASE={config['database']};"
f"UID={config['username']};"
f"PWD={config['password']}"
)
conn = pyodbc.connect(conn_str)
elif db_type == 'oracle':
import cx_Oracle
config = self.db_configs[db_type]
conn = cx_Oracle.connect(
config['user'],
config['password'],
config['dsn']
)
print(f"✅ 成功连接 {db_type.upper()} 遗留数据库")
yield conn
except Exception as e:
print(f"❌ 数据库连接失败: {e}")
raise
finally:
if conn:
conn.close()
print("🔌 数据库连接已关闭")
def extract_legacy_data(self, query, params=None, db_type='sqlserver'):
"""从遗留数据库提取数据"""
with self.get_connection(db_type) as conn:
# 使用参数化查询防止SQL注入
df = pd.read_sql(query, conn, params=params)
# 数据质量检查
self._validate_data(df)
# 数据类型转换(处理遗留系统的奇怪类型)
df = self._convert_data_types(df)
return df
def sync_to_modern_db(self, legacy_query, modern_table, transform_func=None):
"""从遗留库同步到现代数据库"""
print(f"🔄 同步数据: {legacy_query} -> {modern_table}")
# 1. 从遗留系统提取
legacy_df = self.extract_legacy_data(legacy_query)
# 2. 数据转换(如需要)
if transform_func:
legacy_df = transform_func(legacy_df)
# 3. 写入现代数据库(如PostgreSQL)
modern_engine = create_engine('postgresql://user:pass@modern-db:5432/app_db')
legacy_df.to_sql(modern_table, modern_engine, if_exists='replace', index=False)
print(f"✅ 同步完成: {len(legacy_df)} 条记录")
return legacy_df
def _validate_data(self, df):
"""数据验证"""
if df.empty:
print("⚠️ 警告: 查询返回空结果")
# 检查空值比例
null_ratio = df.isnull().sum().sum() / (df.shape[0] * df.shape[1])
if null_ratio > 0.3:
print(f"⚠️ 警告: 数据空值率过高 ({null_ratio:.1%})")
def _convert_data_types(self, df):
"""处理遗留系统特有的数据类型"""
for col in df.columns:
# 处理COBOL的压缩十进制
if df[col].dtype == 'object':
try:
# 尝试转换数值
df[col] = pd.to_numeric(df[col], errors='ignore')
except:
pass
# 处理日期格式多样性
date_cols = [col for col in df.columns if 'date' in col.lower()]
for col in date_cols:
try:
df[col] = pd.to_datetime(df[col], errors='coerce', format='mixed')
except:
pass
return df
# 实战示例
if __name__ == "__main__":
connector = LegacyDatabaseConnector()
# 示例1: 同步客户数据
customer_df = connector.extract_legacy_data(
"SELECT CustID, CustName, RegDate FROM Customers WHERE Status = 'A'"
)
print(f"📊 获取到 {len(customer_df)} 条客户数据")
print(customer_df.head())
# 示例2: 全量同步到现代数据库
def transform_customers(df):
"""数据转换函数:标准化字段名"""
df = df.rename(columns={
'CustID': 'customer_id',
'CustName': 'customer_name',
'RegDate': 'registration_date'
})
return df
# connector.sync_to_modern_db(
# "SELECT * FROM Orders WHERE OrderDate > '2024-01-01'",
# "legacy_orders",
# transform_customers
# )策略2:文件交换模式
适用场景:遗留系统只能生成文件(CSV、Excel、定宽文件)。
# strategy_file.py
import pandas as pd
import os
import csv
from datetime import datetime
from watchdog.observers import Observer
from watchdog.events import FileSystemEventHandler
import time
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class LegacyFileIntegration:
"""文件交换模式集成器(监控文件夹变化)"""
def __init__(self, watch_folder='./legacy_exports/'):
self.watch_folder = watch_folder
os.makedirs(watch_folder, exist_ok=True)
def parse_fixed_width_file(self, filepath, col_specs):
"""解析定宽文本文件(银行、保险业常见)"""
print(f"📄 解析定宽文件: {filepath}")
data = []
with open(filepath, 'r', encoding='gb2312') as f: # 处理中文编码
for line in f:
if len(line.strip()) == 0:
continue
row = {}
for col_name, (start, end) in col_specs.items():
try:
value = line[start-1:end].strip()
row[col_name] = value
except:
row[col_name] = None
data.append(row)
return pd.DataFrame(data)
def parse_cobol_file(self, filepath, copybook_path):
"""解析COBOL生成的文件(需要copybook定义)"""
# 简化的COBOL解析逻辑
print(f"🖥️ 解析COBOL文件: {filepath}")
# 实际应用中可使用cb2py等库
# 这里返回模拟数据
return pd.DataFrame({
'account_no': ['001', '002', '003'],
'balance': [1000.0, 2500.0, 500.0],
'last_transaction': ['2024-05-01', '2024-05-10', '2024-05-15']
})
def watch_folder_changes(self, handler):
"""监控文件夹变化(遗留系统定期导出文件)"""
print(f"👀 开始监控文件夹: {self.watch_folder}")
event_handler = handler
observer = Observer()
observer.schedule(event_handler, self.watch_folder, recursive=False)
observer.start()
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
observer.stop()
observer.join()
class LegacyFileHandler(FileSystemEventHandler):
"""文件变化处理器"""
def on_created(self, event):
if not event.is_directory and event.src_path.endswith('.csv'):
print(f"🆕 检测到新文件: {event.src_path}")
self.process_file(event.src_path)
def process_file(self, filepath):
"""处理遗留系统生成的文件"""
try:
# 根据文件名判断处理逻辑
filename = os.path.basename(filepath)
if 'CUST' in filename.upper():
df = pd.read_csv(filepath, encoding='gbk')
print(f"👥 客户文件: {len(df)} 条记录")
# 转换后发送到消息队列或API
self.send_to_api(df, 'customers')
elif 'ORDER' in filename.upper():
df = pd.read_csv(filepath, encoding='gbk')
print(f"📦 订单文件: {len(df)} 条记录")
self.send_to_api(df, 'orders')
elif filename.endswith('.txt'):
# 处理定宽文件
col_specs = {
'field1': (1, 10),
'field2': (11, 20),
'field3': (21, 30)
}
df = self.parse_fixed_width_file(filepath, col_specs)
self.send_to_api(df, 'transactions')
# 移动已处理文件
archive_folder = './processed/'
os.makedirs(archive_folder, exist_ok=True)
os.rename(filepath, os.path.join(archive_folder, filename))
except Exception as e:
print(f"❌ 文件处理失败: {e}")
def send_to_api(self, df, endpoint):
"""发送到现代系统API"""
# 这里实现API调用逻辑
print(f"📤 发送 {len(df)} 条数据到 {endpoint} API")
return True
# 实战示例
if __name__ == "__main__":
integrator = LegacyFileIntegration()
# 示例1: 解析COBOL文件
cobol_df = integrator.parse_cobol_file('legacy_exports/ACCT20240520.DAT', 'copybook.cpy')
print("COBOL数据示例:", cobol_df.head())
# 示例2: 启动文件夹监控
# handler = LegacyFileHandler()
# integrator.watch_folder_changes(handler)策略3:API包装模式(推荐)
适用场景:遗留系统有网络接口但无REST API。
# strategy_api_wrapper.py
from flask import Flask, jsonify, request
import xmlrpc.client
import socket
import struct
import json
from typing import Any, Dict
import pandas as pd
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class LegacyAPIWrapper:
"""遗留系统API包装器(将老旧协议包装为REST API)"""
def __init__(self):
self.app = Flask(__name__)
self.setup_routes()
def setup_routes(self):
"""设置REST API路由"""
@self.app.route('/api/v1/legacy/customers/<customer_id>', methods=['GET'])
def get_customer(customer_id):
"""包装遗留系统的客户查询"""
try:
# 方式1: 通过XML-RPC调用
result = self.call_via_xmlrpc('getCustomerInfo', customer_id)
# 方式2: 通过Socket调用(自定义二进制协议)
# result = self.call_via_socket_protocol('CUST_QUERY', customer_id)
return jsonify({
'success': True,
'data': result,
'source': 'legacy_system'
})
except Exception as e:
return jsonify({
'success': False,
'error': str(e)
}), 500
@self.app.route('/api/v1/legacy/orders', methods=['POST'])
def create_order():
"""创建订单(将REST请求转换为遗留系统调用)"""
data = request.json
try:
# 数据转换
legacy_format = self.convert_to_legacy_format(data)
# 调用遗留系统
order_id = self.call_via_xmlrpc('createOrder', legacy_format)
return jsonify({
'success': True,
'order_id': order_id,
'message': 'Order created in legacy system'
}), 201
except Exception as e:
return jsonify({'error': str(e)}), 500
def call_via_xmlrpc(self, method: str, *args) -> Any:
"""通过XML-RPC调用遗留系统"""
print(f"📞 调用遗留系统XML-RPC: {method}")
try:
# 连接遗留系统的XML-RPC服务
proxy = xmlrpc.client.ServerProxy("http://legacy-server:8000/RPC2")
# 动态调用方法
result = getattr(proxy, method)(*args)
# 转换响应格式
return self.transform_legacy_response(result)
except Exception as e:
print(f"XML-RPC调用失败: {e}")
raise
def call_via_socket_protocol(self, command: str, data: str) -> Dict:
"""通过Socket调用遗留系统(自定义二进制协议)"""
print(f"🔌 Socket调用: {command}")
# 遗留系统常见的自定义协议格式
# 头部: 4字节命令码 + 4字节数据长度
# 数据: 实际数据
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(10.0)
sock.connect(('legacy-server', 9000))
# 构建请求
cmd_code = self.command_to_code(command)
data_bytes = data.encode('gbk')
# 构建协议头
header = struct.pack('!II', cmd_code, len(data_bytes))
# 发送
sock.sendall(header + data_bytes)
# 接收响应
response_header = sock.recv(8)
resp_code, resp_len = struct.unpack('!II', response_header)
response_data = sock.recv(resp_len)
sock.close()
return self.parse_binary_response(response_data)
except socket.timeout:
raise Exception("Legacy system timeout")
except Exception as e:
raise Exception(f"Socket error: {e}")
def call_via_odbc_stored_proc(self, proc_name: str, params: dict):
"""通过ODBC调用存储过程"""
import pyodbc
conn = pyodbc.connect('DSN=LegacyDB')
cursor = conn.cursor()
# 构建参数占位符
param_placeholders = ','.join(['?'] * len(params))
sql = f"{{CALL {proc_name}({param_placeholders})}}"
cursor.execute(sql, list(params.values()))
# 处理结果集
results = []
try:
while True:
row = cursor.fetchone()
if not row:
break
results.append(dict(zip([column[0] for column in cursor.description], row)))
except pyodbc.ProgrammingError:
# 没有结果集
pass
conn.close()
return results
def transform_legacy_response(self, legacy_data):
"""转换遗留系统响应格式"""
if isinstance(legacy_data, dict):
# 键名标准化
key_mapping = {
'CUST_NO': 'customer_id',
'CUST_NM': 'customer_name',
'TEL_NO': 'phone',
'ADDR': 'address'
}
transformed = {}
for k, v in legacy_data.items():
new_key = key_mapping.get(k, k.lower())
transformed[new_key] = v
return transformed
return legacy_data
def convert_to_legacy_format(self, modern_data: dict) -> dict:
"""将现代数据格式转换为遗留系统格式"""
mapping = {
'customer_id': 'CUST_NO',
'amount': 'AMT',
'order_date': 'ORD_DT'
}
legacy_data = {}
for modern_key, legacy_key in mapping.items():
if modern_key in modern_data:
legacy_data[legacy_key] = modern_data[modern_key]
return legacy_data
def command_to_code(self, command: str) -> int:
"""命令转代码(模拟遗留系统协议)"""
cmd_map = {
'CUST_QUERY': 0x1001,
'ORDER_CREATE': 0x2001,
'INV_QUERY': 0x3001
}
return cmd_map.get(command, 0)
def parse_binary_response(self, data: bytes) -> Dict:
"""解析二进制响应"""
# 简化的解析逻辑
return {'raw_data': data.hex()}
def run(self, host='0.0.0.0', port=5000):
"""启动API包装器"""
print(f"🚀 启动遗留系统API包装器: http://{host}:{port}")
self.app.run(host=host, port=port, debug=False)
# 实战示例
if __name__ == "__main__":
wrapper = LegacyAPIWrapper()
# 启动Flask服务
# wrapper.run()
# 客户端调用示例
import requests
# 模拟调用包装后的API
print("📡 测试API包装器:")
# 1. 查询客户
# response = requests.get('http://localhost:5000/api/v1/legacy/customers/1001')
# print("客户数据:", response.json())
# 2. 创建订单
order_data = {
'customer_id': '1001',
'amount': 999.99,
'order_date': '2024-05-20',
'items': [{'product_id': 'P001', 'qty': 2}]
}
# response = requests.post('http://localhost:5000/api/v1/legacy/orders', json=order_data)
# print("创建订单结果:", response.json())策略4:消息队列模式
适用场景:需要异步、解耦的集成。
# strategy_message_queue.py
import pika
import json
import threading
import time
from datetime import datetime
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class LegacyMessageQueueBridge:
"""消息队列桥接器(将遗留系统接入现代消息队列)"""
def __init__(self, mq_host='localhost'):
self.mq_host = mq_host
self.setup_rabbitmq()
def setup_rabbitmq(self):
"""设置RabbitMQ连接"""
self.connection = pika.BlockingConnection(
pika.ConnectionParameters(self.mq_host)
)
self.channel = self.connection.channel()
# 声明交换机和队列
self.channel.exchange_declare(
exchange='legacy_integration',
exchange_type='topic',
durable=True
)
# 遗留系统队列
self.channel.queue_declare(queue='legacy_system_queue', durable=True)
self.channel.queue_bind(
queue='legacy_system_queue',
exchange='legacy_integration',
routing_key='legacy.*'
)
# 现代系统队列
self.channel.queue_declare(queue='modern_system_queue', durable=True)
self.channel.queue_bind(
queue='modern_system_queue',
exchange='legacy_integration',
routing_key='modern.*'
)
def listen_to_legacy_system(self):
"""监听遗留系统消息(模拟遗留系统通过Socket发送数据)"""
print("👂 开始监听遗留系统...")
def legacy_simulator():
"""模拟遗留系统定期发送数据"""
messages = [
{'type': 'SALE', 'amount': 100.0, 'store': '001'},
{'type': 'RETURN', 'amount': 25.5, 'store': '002'},
{'type': 'INVENTORY', 'product': 'P1001', 'qty': 50}
]
for msg in messages:
time.sleep(2) # 模拟间隔
self.publish_to_queue('legacy.event', msg)
print(f"📨 遗留系统发送: {msg}")
# 启动模拟线程
thread = threading.Thread(target=legacy_simulator, daemon=True)
thread.start()
def publish_to_queue(self, routing_key, message):
"""发布消息到队列"""
self.channel.basic_publish(
exchange='legacy_integration',
routing_key=routing_key,
body=json.dumps(message),
properties=pika.BasicProperties(
delivery_mode=2, # 持久化
timestamp=int(time.time())
)
)
def consume_legacy_messages(self):
"""消费遗留系统消息(转换为现代格式)"""
print("🔄 开始处理遗留系统消息...")
def callback(ch, method, properties, body):
try:
message = json.loads(body)
print(f"📩 收到遗留消息: {message}")
# 消息转换
transformed = self.transform_legacy_message(message)
# 转发到现代系统
self.forward_to_modern_system(transformed)
# 确认消息
ch.basic_ack(delivery_tag=method.delivery_tag)
except Exception as e:
print(f"❌ 消息处理失败: {e}")
# 记录到死信队列
self.send_to_dlq(body, str(e))
self.channel.basic_consume(
queue='legacy_system_queue',
on_message_callback=callback
)
self.channel.start_consuming()
def forward_to_modern_system(self, message):
"""转发到现代系统(模拟API调用)"""
print(f"🚀 转发到现代系统: {message}")
# 这里可以实现实际的API调用
# requests.post('http://modern-system/api/data', json=message)
# 或者发布到现代系统队列
self.publish_to_queue('modern.event', message)
def transform_legacy_message(self, legacy_msg):
"""转换遗留系统消息格式"""
transformations = {
'SALE': self._transform_sale,
'RETURN': self._transform_return,
'INVENTORY': self._transform_inventory
}
msg_type = legacy_msg.get('type', 'UNKNOWN')
transformer = transformations.get(msg_type, lambda x: x)
transformed = transformer(legacy_msg)
transformed['_metadata'] = {
'source': 'legacy_system',
'transformed_at': datetime.now().isoformat(),
'original_type': msg_type
}
return transformed
def _transform_sale(self, msg):
return {
'event_type': 'sale_completed',
'transaction_amount': msg.get('amount', 0),
'store_id': msg.get('store'),
'currency': 'CNY'
}
def _transform_return(self, msg):
return {
'event_type': 'return_processed',
'return_amount': abs(msg.get('amount', 0)),
'store_id': msg.get('store'),
'refund_method': 'original'
}
def _transform_inventory(self, msg):
return {
'event_type': 'inventory_update',
'sku': msg.get('product'),
'quantity': msg.get('qty', 0),
'update_type': 'stock_adjustment'
}
def send_to_dlq(self, message, error):
"""发送到死信队列(处理失败的消息)"""
dlq_message = {
'original_message': json.loads(message) if isinstance(message, bytes) else message,
'error': error,
'failed_at': datetime.now().isoformat()
}
self.channel.queue_declare(queue='legacy_dlq', durable=True)
self.channel.basic_publish(
exchange='',
routing_key='legacy_dlq',
body=json.dumps(dlq_message)
)
print(f"⚰️ 消息发送到死信队列: {error}")
def run(self):
"""运行消息队列桥接"""
print("🚀 启动消息队列桥接器")
# 监听遗留系统
self.listen_to_legacy_system()
# 开始消费消息
self.consume_legacy_messages()
# 实战示例
if __name__ == "__main__":
bridge = LegacyMessageQueueBridge()
# 启动桥接器
# bridge.run()
print("📋 消息队列桥接器就绪")
print("模拟遗留系统会每2秒发送一条消息...")
# 实际运行需要RabbitMQ服务策略5:界面自动化模式(最后手段)
适用场景:只有图形界面,无任何接口。
# strategy_gui_automation.py
from selenium import webdriver
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.support.ui import Select
import pandas as pd
import time
import pyautogui
import pyperclip
class LegacyGUIAutomation:
"""界面自动化集成(最后手段)"""
def __init__(self, headless=False):
self.headless = headless
self.setup_driver()
def setup_driver(self):
"""设置浏览器驱动"""
options = webdriver.ChromeOptions()
if self.headless:
options.add_argument('--headless')
# 绕过反自动化检测
options.add_argument('--disable-blink-features=AutomationControlled')
options.add_experimental_option("excludeSwitches", ["enable-automation"])
options.add_experimental_option('useAutomationExtension', False)
self.driver = webdriver.Chrome(options=options)
# 修改webdriver属性
self.driver.execute_script(
"Object.defineProperty(navigator, 'webdriver', {get: () => undefined})"
)
def login_to_legacy_system(self, url, username, password):
"""登录遗留系统(模拟人工操作)"""
print(f"🔐 登录遗留系统: {url}")
self.driver.get(url)
time.sleep(2) # 等待页面加载
try:
# 尝试多种定位方式
selectors = [
"//input[@name='username']",
"//input[@id='userid']",
"//input[@type='text']"
]
for selector in selectors:
try:
user_input = self.driver.find_element(By.XPATH, selector)
user_input.send_keys(username)
break
except:
continue
# 查找密码框
pwd_input = self.driver.find_element(By.XPATH, "//input[@type='password']")
pwd_input.send_keys(password)
# 查找登录按钮
login_btn = self.driver.find_element(By.XPATH, "//input[@type='submit']")
login_btn.click()
time.sleep(3)
print("✅ 登录成功")
return True
except Exception as e:
print(f"❌ 登录失败: {e}")
return False
def extract_data_from_grid(self, grid_xpath):
"""从表格中提取数据(遗留系统常见)"""
print("📊 从表格提取数据...")
data = []
try:
# 定位表格
table = self.driver.find_element(By.XPATH, grid_xpath)
# 获取表头
headers = []
header_rows = table.find_elements(By.XPATH, ".//th")
for th in header_rows:
headers.append(th.text.strip())
# 获取数据行
rows = table.find_elements(By.XPATH, ".//tr[position()>1]")
for row in rows:
cols = row.find_elements(By.XPATH, ".//td")
if len(cols) == len(headers):
row_data = {}
for i, col in enumerate(cols):
row_data[headers[i]] = col.text.strip()
data.append(row_data)
return pd.DataFrame(data)
except Exception as e:
print(f"表格提取失败: {e}")
return pd.DataFrame()
def fill_form_and_submit(self, form_data):
"""自动填写表单并提交"""
print("📝 自动填写表单...")
for field, value in form_data.items():
try:
# 尝试多种定位方式
selectors = [
f"//input[@name='{field}']",
f"//input[@id='{field}']",
f"//*[contains(@name, '{field.upper()}')]"
]
element = None
for selector in selectors:
try:
element = self.driver.find_element(By.XPATH, selector)
break
except:
continue
if element:
element.clear()
element.send_keys(value)
else:
print(f"⚠️ 未找到字段: {field}")
except Exception as e:
print(f"字段 {field} 填写失败: {e}")
# 提交表单
try:
submit_btn = self.driver.find_element(By.XPATH, "//input[@type='submit']")
submit_btn.click()
time.sleep(2)
print("✅ 表单提交成功")
except:
print("❌ 提交失败")
def export_report(self, report_name, export_format='excel'):
"""导出报表(模拟点击导出按钮)"""
print(f"📄 导出报表: {report_name}")
try:
# 查找导出按钮
export_btns = self.driver.find_elements(
By.XPATH, "//button[contains(text(), '导出')]"
)
if not export_btns:
export_btns = self.driver.find_elements(
By.XPATH, "//a[contains(text(), 'Export')]"
)
if export_btns:
export_btns[0].click()
time.sleep(2)
# 处理文件下载对话框
self._handle_download_dialog(export_format)
return True
return False
except Exception as e:
print(f"导出失败: {e}")
return False
def _handle_download_dialog(self, file_type):
"""处理文件下载对话框(平台相关)"""
# Windows系统处理
if file_type == 'excel':
pyautogui.write('report.xlsx')
pyautogui.press('enter')
elif file_type == 'pdf':
pyautogui.write('report.pdf')
pyautogui.press('enter')
time.sleep(1)
def screen_scrape_legacy_app(self):
"""屏幕抓取(针对桌面应用)"""
print("🖥️ 屏幕抓取桌面应用...")
# 定位应用窗口(需要知道窗口标题)
app_window = pyautogui.getWindowsWithTitle('Legacy ERP System')[0]
app_window.activate()
# 截屏
screenshot = pyautogui.screenshot(region=(
app_window.left,
app_window.top,
app_window.width,
app_window.height
))
screenshot.save('legacy_app_screenshot.png')
print("📸 屏幕截图已保存")
def close(self):
"""关闭浏览器"""
if self.driver:
self.driver.quit()
print("👋 浏览器已关闭")
# 实战示例
if __name__ == "__main__":
print("⚠️ 警告: 界面自动化是集成遗留系统的最后手段")
print(" 仅当无其他接口可用时考虑此方案")
# 示例: 自动化登录和数据提取
automator = LegacyGUIAutomation(headless=True)
# 模拟登录
# success = automator.login_to_legacy_system(
# 'http://legacy-erp.internal/login',
# 'admin',
# 'password123'
# )
# if success:
# # 提取客户数据
# df = automator.extract_data_from_grid("//table[@id='custGrid']")
# print(f"提取到 {len(df)} 条客户数据")
# print(df.head())
automator.close()四、 遗留系统现代化架构演进路径
graph LR A[遗留系统] --> B[阶段1: 数据同步] B --> C[阶段2: API包装] C --> D[阶段3: 功能迁移] D --> E[阶段4: 系统退役] B --> B1[数据库/文件同步] C --> C1[REST API包装] D --> D1[微服务化改造] E --> E1[归档历史数据]
建议采用逐步演进的“绞杀者模式”:
- 先集成:用上述策略打通数据流
- 再包装:为遗留功能提供现代API
- 后替换:逐个功能迁移到新系统
- 终退役:当所有功能迁移完成后关闭旧系统
💡 总结
遗留系统集成没有“银弹”,关键在于:
- 评估现状:分析遗留系统的技术栈、数据格式、接口能力
- 选择策略:根据业务需求选择合适集成模式
- 渐进改造:采用绞杀者模式,避免“大爆炸”式重写
- 保障稳定:任何集成都要有回滚和降级方案
记住:遗留系统是企业的“数字资产”,不是“技术负债”。通过恰当的集成策略,你可以让这些“老古董”继续为数字化转型贡献力量,直到最终完成现代化改造。