ERP 是企业资源规划系统的简称,但它的本质远不止一个软件——它是企业的“数字中枢神经系统”。让我用一个简单的比喻帮你理解:如果把企业比作人体,ERP 就是大脑+神经系统,负责协调所有器官(部门)的工作。
一、 ERP 的核心本质:从“信息孤岛”到“数据统一”
ERP 解决的三大核心痛点:
痛点 | 无 ERP 时 | 有 ERP 后 |
|---|---|---|
数据孤岛 | 销售不知道库存,财务不知道采购 | 所有数据实时同步 |
流程割裂 | 订单→生产→发货要跑5个部门 | 全流程自动化流转 |
决策盲目 | 靠 Excel 和“感觉”做决策 | 数据驱动的智能决策 |
ERP 的“三层蛋糕”结构:
应用层(用户看到) 销售、采购、生产、财务、HR ↓ 业务层(逻辑核心) 订单处理、库存管理、生产排程 ↓ 数据层(基础底座) 客户、产品、供应商、BOM
二、 ERP 六大核心模块详解
ERP 通常包含几十个模块,但最核心的是这六个:
1. 销售与分销(SD)
作用:从报价到回款的完整销售流程管理
# sales_module.py
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class OrderStatus(Enum):
QUOTATION = "quotation" # 报价
CONFIRMED = "confirmed" # 已确认
IN_PRODUCTION = "in_production" # 生产中
SHIPPED = "shipped" # 已发货
DELIVERED = "delivered" # 已交付
INVOICED = "invoiced" # 已开票
PAID = "paid" # 已付款
@dataclass
class Customer:
"""客户主数据"""
code: str
name: str
credit_limit: float # 信用额度
payment_terms: int # 付款条件(天数)
current_balance: float = 0.0
@dataclass
class SalesOrderItem:
"""销售订单行项目"""
product_code: str
quantity: int
unit_price: float
delivery_date: datetime
@property
def line_total(self):
return self.quantity * self.unit_price
class SalesOrder:
"""销售订单(ERP核心对象)"""
def __init__(self, order_number: str, customer: Customer):
self.order_number = order_number
self.customer = customer
self.order_date = datetime.now()
self.status = OrderStatus.QUOTATION
self.items: List[SalesOrderItem] = []
self.delivery_address: str = ""
self.created_at = datetime.now()
self.updated_at = datetime.now()
def add_item(self, product_code: str, quantity: int,
unit_price: float, delivery_date: datetime):
"""添加订单行"""
# 检查库存可用性(调用库存模块)
if not self.check_inventory(product_code, quantity):
raise ValueError(f"产品 {product_code} 库存不足")
item = SalesOrderItem(product_code, quantity, unit_price, delivery_date)
self.items.append(item)
self.updated_at = datetime.now()
print(f"✅ 添加订单行: {product_code} x{quantity}")
def confirm_order(self):
"""确认订单(触发后续流程)"""
if self.status != OrderStatus.QUOTATION:
raise ValueError("只有报价单才能确认")
# 检查信用额度
order_total = self.get_total_amount()
if order_total + self.customer.current_balance > self.customer.credit_limit:
raise ValueError(f"超出信用额度 {self.customer.credit_limit}")
self.status = OrderStatus.CONFIRMED
self.updated_at = datetime.now()
# 触发后续流程
self.trigger_downstream_processes()
print(f"📋 订单 {self.order_number} 已确认,总额: ¥{order_total:,.2f}")
def get_total_amount(self) -> float:
"""计算订单总额"""
return sum(item.line_total for item in self.items)
def check_inventory(self, product_code: str, quantity: int) -> bool:
"""检查库存(模拟调用库存模块)"""
# 实际中这里会调用库存管理模块
inventory = {
"P001": 100, # 产品编码 -> 可用库存
"P002": 50,
"P003": 200
}
return inventory.get(product_code, 0) >= quantity
def trigger_downstream_processes(self):
"""触发下游流程"""
# 1. 创建生产计划
self.create_production_order()
# 2. 预留库存
self.reserve_inventory()
# 3. 安排发货
self.plan_delivery()
print("🚀 已触发生产、库存、发货流程")
def create_production_order(self):
"""创建生产订单(调用生产模块)"""
for item in self.items:
# 这里会调用生产模块的API
print(f"🏭 为 {item.product_code} 创建生产计划")
def reserve_inventory(self):
"""预留库存(调用库存模块)"""
for item in self.items:
# 这里会调用库存模块的API
print(f"📦 预留库存: {item.product_code} x{item.quantity}")
def plan_delivery(self):
"""安排发货(调用物流模块)"""
earliest_date = min(item.delivery_date for item in self.items)
print(f"🚚 安排发货,最早交付: {earliest_date.strftime('%Y-%m-%d')}")
def to_dict(self):
"""转换为字典(用于API或存储)"""
return {
"order_number": self.order_number,
"customer": self.customer.name,
"status": self.status.value,
"order_date": self.order_date.strftime("%Y-%m-%d"),
"total_amount": self.get_total_amount(),
"items": [
{
"product": item.product_code,
"quantity": item.quantity,
"price": item.unit_price,
"line_total": item.line_total
}
for item in self.items
],
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
# 使用示例
if __name__ == "__main__":
print("🎯 ERP 销售模块演示")
print("-" * 40)
# 创建客户
customer = Customer("C001", "阿里巴巴集团", credit_limit=1000000, payment_terms=30)
# 创建销售订单
order = SalesOrder("SO20240520001", customer)
# 添加产品
order.add_item("P001", 10, 2999.00, datetime.now() + timedelta(days=7))
order.add_item("P002", 5, 599.00, datetime.now() + timedelta(days=10))
# 确认订单
try:
order.confirm_order()
except ValueError as e:
print(f"❌ 确认失败: {e}")
# 查看订单详情
print("\n📄 订单详情:")
import json
print(json.dumps(order.to_dict(), indent=2, ensure_ascii=False))2. 物料管理(MM)
作用:采购、库存、供应商管理
# material_module.py
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import Dict, List
import heapq
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class InventoryItem:
"""库存物料"""
def __init__(self, material_code: str, description: str,
unit: str, unit_cost: float):
self.material_code = material_code
self.description = description
self.unit = unit
self.unit_cost = unit_cost
self.batches = [] # 批次库存
def add_batch(self, quantity: int, purchase_date: datetime,
expiry_date: datetime = None):
"""添加库存批次(先进先出)"""
batch = {
'quantity': quantity,
'purchase_date': purchase_date,
'expiry_date': expiry_date,
'remaining': quantity
}
self.batches.append(batch)
# 按采购日期排序(FIFO)
self.batches.sort(key=lambda x: x['purchase_date'])
def consume(self, quantity: int) -> float:
"""消耗库存(先进先出)返回成本"""
if self.get_available_qty() < quantity:
raise ValueError(f"库存不足: 需要 {quantity}, 可用 {self.get_available_qty()}")
total_cost = 0.0
remaining = quantity
for batch in self.batches:
if remaining <= 0:
break
if batch['remaining'] > 0:
consume_qty = min(batch['remaining'], remaining)
batch['remaining'] -= consume_qty
remaining -= consume_qty
total_cost += consume_qty * self.unit_cost
return total_cost
def get_available_qty(self) -> int:
"""获取可用数量"""
return sum(batch['remaining'] for batch in self.batches)
def get_value(self) -> float:
"""获取库存价值"""
return self.get_available_qty() * self.unit_cost
class MRPPlanner:
"""物料需求计划(MRP)计算器"""
def __init__(self):
self.bom = {} # 物料清单 {成品: [(物料, 数量)]}
def add_bom(self, finished_product: str, components: List[tuple]):
"""添加物料清单"""
self.bom[finished_product] = components
def calculate_requirements(self, product: str, quantity: int,
lead_time: int = 0) -> Dict:
"""
计算物料需求
Args:
product: 要生产的产品
quantity: 需求数量
lead_time: 提前期(天)
Returns:
各物料的需求量
"""
requirements = {}
self._calculate_recursive(product, quantity, requirements)
# 计算需求日期
requirement_date = datetime.now() + timedelta(days=lead_time)
return {
'product': product,
'quantity': quantity,
'requirement_date': requirement_date.strftime('%Y-%m-%d'),
'material_requirements': requirements
}
def _calculate_recursive(self, product: str, quantity: int,
requirements: Dict, level: int = 0):
"""递归计算物料需求"""
indent = " " * level
if product not in self.bom:
# 原材料
requirements[product] = requirements.get(product, 0) + quantity
print(f"{indent}📦 {product}: {quantity}")
return
# 半成品,需要展开
print(f"{indent}🔧 {product} (需要 {quantity}):")
for component, comp_qty in self.bom[product]:
total_needed = comp_qty * quantity
self._calculate_recursive(component, total_needed, requirements, level + 1)
class PurchaseRequisition:
"""采购申请"""
def __init__(self, pr_number: str):
self.pr_number = pr_number
self.items = [] # [(物料, 数量, 需求日期)]
self.status = "draft"
self.created_at = datetime.now()
def add_item(self, material: str, quantity: int, required_date: datetime):
self.items.append((material, quantity, required_date))
def approve(self):
self.status = "approved"
print(f"✅ 采购申请 {self.pr_number} 已批准")
return self.generate_po()
def generate_po(self) -> str:
"""生成采购订单"""
po_number = f"PO{datetime.now().strftime('%Y%m%d%H%M%S')}"
print(f"📄 生成采购订单 {po_number}")
return po_number
# 使用示例
if __name__ == "__main__":
print("\n📦 ERP 物料管理模块演示")
print("=" * 40)
# 1. 库存管理
print("\n1. 库存管理(FIFO):")
item = InventoryItem("RAW001", "铝合金板", "KG", 50.0)
# 添加不同批次
item.add_batch(100, datetime(2024, 5, 1))
item.add_batch(200, datetime(2024, 5, 10))
item.add_batch(150, datetime(2024, 5, 20))
print(f"初始库存: {item.get_available_qty()} KG")
# 消耗 120 KG
cost = item.consume(120)
print(f"消耗 120KG 后:")
print(f" 可用库存: {item.get_available_qty()} KG")
print(f" 消耗成本: ¥{cost:,.2f}")
print(f" 库存价值: ¥{item.get_value():,.2f}")
# 2. MRP 计算
print("\n2. MRP 物料需求计划:")
planner = MRPPlanner()
# 定义BOM(物料清单)
planner.add_bom("成品A", [("半成品B", 2), ("包装材料", 1)])
planner.add_bom("半成品B", [("原材料X", 3), ("原材料Y", 2)])
# 计算生产100个成品A的需求
requirements = planner.calculate_requirements("成品A", 100, lead_time=7)
print(f"\n需求汇总:")
for material, qty in requirements['material_requirements'].items():
print(f" {material}: {qty}")
# 3. 采购申请
print("\n3. 采购流程:")
pr = PurchaseRequisition("PR20240520001")
pr.add_item("原材料X", 600, datetime.now() + timedelta(days=5))
pr.add_item("原材料Y", 400, datetime.now() + timedelta(days=5))
po_number = pr.approve()
print(f"生成的采购订单: {po_number}")3. 生产计划(PP)
作用:生产订单、排程、报工
# production_module.py
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict
import json
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
@dataclass
class WorkCenter:
"""工作中心(生产线)"""
code: str
name: str
capacity_hours: float # 日产能(小时)
setup_time: float # 换线时间(小时)
hourly_cost: float # 小时成本
def __post_init__(self):
self.schedule = [] # 排程计划
self.available_hours = self.capacity_hours # 剩余可用时间
class ProductionOrder:
"""生产订单"""
def __init__(self, order_number: str, material: str, quantity: int):
self.order_number = order_number
self.material = material
self.quantity = quantity
self.status = "created"
self.created_date = datetime.now()
self.start_date = None
self.end_date = None
self.routing = [] # 工艺路线
self.actual_output = 0
self.scrap_qty = 0
def add_routing_step(self, work_center: WorkCenter,
operation_time: float, sequence: int):
"""添加工序"""
self.routing.append({
'sequence': sequence,
'work_center': work_center.code,
'operation': f"OP{sequence:02d}",
'std_time': operation_time, # 标准工时(小时/件)
'total_time': operation_time * self.quantity,
'status': 'pending'
})
# 按工序排序
self.routing.sort(key=lambda x: x['sequence'])
def calculate_lead_time(self) -> float:
"""计算总生产时间"""
return sum(step['total_time'] for step in self.routing)
def schedule_production(self, start_date: datetime):
"""安排生产计划"""
self.start_date = start_date
current_date = start_date
for step in self.routing:
# 计算工序所需天数
days_needed = step['total_time'] / 8 # 按8小时工作制
step['planned_start'] = current_date
step['planned_end'] = current_date + timedelta(days=days_needed)
# 更新当前日期(考虑工序间等待时间)
current_date = step['planned_end'] + timedelta(days=0.5)
self.end_date = self.routing[-1]['planned_end']
self.status = "scheduled"
def report_progress(self, operation_seq: int,
good_qty: int, scrap_qty: int = 0):
"""报工"""
for step in self.routing:
if step['sequence'] == operation_seq:
step['status'] = 'completed'
step['actual_good'] = good_qty
step['actual_scrap'] = scrap_qty
break
self.actual_output += good_qty
self.scrap_qty += scrap_qty
# 检查是否全部完成
if all(step['status'] == 'completed' for step in self.routing):
self.status = "completed"
self.end_date = datetime.now()
def get_efficiency(self) -> float:
"""计算生产效率"""
if self.actual_output == 0:
return 0.0
planned_output = self.quantity - self.scrap_qty
return self.actual_output / planned_output
def to_gantt_data(self) -> List[Dict]:
"""生成甘特图数据"""
gantt_data = []
for step in self.routing:
gantt_data.append({
'Task': f"{self.order_number}-{step['operation']}",
'Start': step['planned_start'].strftime('%Y-%m-%d'),
'End': step['planned_end'].strftime('%Y-%m-%d'),
'Resource': step['work_center']
})
return gantt_data
class ProductionScheduler:
"""生产排程器"""
def __init__(self):
self.work_centers: Dict[str, WorkCenter] = {}
self.orders: List[ProductionOrder] = []
def add_work_center(self, wc: WorkCenter):
self.work_centers[wc.code] = wc
def add_order(self, order: ProductionOrder):
self.orders.append(order)
def schedule_all(self):
"""安排所有订单的生产计划"""
# 按优先级排序(这里简化按创建时间)
self.orders.sort(key=lambda x: x.created_date)
current_date = datetime.now()
for order in self.orders:
if order.status == "created":
# 检查物料可用性
if self.check_material_availability(order):
order.schedule_production(current_date)
# 更新下一个订单的开始时间
if order.end_date:
current_date = order.end_date + timedelta(days=1)
print(f"📅 安排订单 {order.order_number}: "
f"{order.start_date.strftime('%m-%d')} 到 "
f"{order.end_date.strftime('%m-%d')}")
else:
print(f"⚠️ 订单 {order.order_number} 物料不足,无法安排")
def check_material_availability(self, order: ProductionOrder) -> bool:
"""检查物料可用性(简化)"""
# 实际中会调用库存模块
return True
# 使用示例
if __name__ == "__main__":
print("\n🏭 ERP 生产计划模块演示")
print("=" * 40)
# 创建工作中心
wc1 = WorkCenter("WC001", "冲压车间", 16.0, 1.0, 200.0)
wc2 = WorkCenter("WC002", "组装线", 20.0, 0.5, 150.0)
wc3 = WorkCenter("WC003", "喷漆车间", 12.0, 2.0, 180.0)
# 创建生产订单
order1 = ProductionOrder("PO001", "成品A", 100)
order1.add_routing_step(wc1, 0.5, 1) # 冲压 0.5小时/件
order1.add_routing_step(wc2, 0.8, 2) # 组装 0.8小时/件
order1.add_routing_step(wc3, 0.3, 3) # 喷漆 0.3小时/件
# 计算生产时间
total_time = order1.calculate_lead_time()
print(f"订单 {order1.order_number} 总工时: {total_time:.1f} 小时")
print(f"标准生产周期: {total_time/8:.1f} 天 (按8小时/天)")
# 排程
scheduler = ProductionScheduler()
scheduler.add_work_center(wc1)
scheduler.add_work_center(wc2)
scheduler.add_work_center(wc3)
scheduler.add_order(order1)
scheduler.schedule_all()
# 模拟报工
print("\n📊 生产进度跟踪:")
order1.report_progress(1, 100, 5) # 工序1完成,100良品,5废品
order1.report_progress(2, 95, 3) # 工序2完成
order1.report_progress(3, 92, 1) # 工序3完成
print(f"生产完成! 状态: {order1.status}")
print(f"良品数量: {order1.actual_output}")
print(f"废品数量: {order1.scrap_qty}")
print(f"生产效率: {order1.get_efficiency():.1%}")
# 生成甘特图数据
gantt_data = order1.to_gantt_data()
print(f"\n📅 甘特图数据:")
print(json.dumps(gantt_data, indent=2, ensure_ascii=False))4. 财务会计(FI)
作用:总账、应收应付、固定资产
# finance_module.py
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
import json
class AccountType(Enum):
ASSET = "asset" # 资产
LIABILITY = "liability" # 负债
EQUITY = "equity" # 所有者权益
REVENUE = "revenue" # 收入
EXPENSE = "expense" # 费用
class TransactionType(Enum):
DEBIT = "debit" # 借方
CREDIT = "credit" # 贷方
@dataclass
class Account:
"""会计科目"""
code: str
name: str
type: AccountType
parent_code: str = None
def __post_init__(self):
self.balance = 0.0
self.transactions: List[Transaction] = []
@dataclass
class Transaction:
"""会计凭证分录"""
account_code: str
amount: float
type: TransactionType
date: datetime
description: str
reference: str # 参考凭证号
class GeneralLedger:
"""总账系统"""
def __init__(self):
self.accounts: Dict[str, Account] = {}
self.transactions: List[Transaction] = []
self.period_start: datetime = None
self.period_end: datetime = None
# 初始化常用科目
self.init_chart_of_accounts()
def init_chart_of_accounts(self):
"""初始化会计科目表"""
# 资产类
self.add_account(Account("1001", "现金", AccountType.ASSET))
self.add_account(Account("1122", "应收账款", AccountType.ASSET))
self.add_account(Account("1405", "库存商品", AccountType.ASSET))
# 负债类
self.add_account(Account("2202", "应付账款", AccountType.LIABILITY))
# 权益类
self.add_account(Account("3001", "实收资本", AccountType.EQUITY))
# 损益类
self.add_account(Account("6001", "主营业务收入", AccountType.REVENUE))
self.add_account(Account("6401", "主营业务成本", AccountType.EXPENSE))
self.add_account(Account("6602", "管理费用", AccountType.EXPENSE))
def add_account(self, account: Account):
self.accounts[account.code] = account
def make_journal_entry(self, entries: List[Transaction],
description: str = ""):
"""制作日记账凭证(借贷必相等)"""
# 验证借贷平衡
total_debit = sum(e.amount for e in entries if e.type == TransactionType.DEBIT)
total_credit = sum(e.amount for e in entries if e.type == TransactionType.CREDIT)
if abs(total_debit - total_credit) > 0.01: # 允许微小误差
raise ValueError(f"借贷不平衡: 借{total_debit:.2f} ≠ 贷{total_credit:.2f}")
# 生成凭证号
voucher_no = f"V{datetime.now().strftime('%Y%m%d%H%M%S')}"
# 记录交易
for entry in entries:
entry.reference = voucher_no
self.transactions.append(entry)
# 更新账户余额
account = self.accounts[entry.account_code]
if entry.type == TransactionType.DEBIT:
if account.type in [AccountType.ASSET, AccountType.EXPENSE]:
account.balance += entry.amount
else:
account.balance -= entry.amount
else: # CREDIT
if account.type in [AccountType.ASSET, AccountType.EXPENSE]:
account.balance -= entry.amount
else:
account.balance += entry.amount
account.transactions.append(entry)
print(f"📒 凭证 {voucher_no}: {description}")
return voucher_no
def post_sales_revenue(self, order_no: str, amount: float,
customer: str, cost: float = 0):
"""过账销售收入"""
# 1. 确认收入
entries = [
Transaction("1122", amount, TransactionType.DEBIT,
datetime.now(), f"销售给{customer}", order_no),
Transaction("6001", amount, TransactionType.CREDIT,
datetime.now(), f"销售收入", order_no)
]
self.make_journal_entry(entries, f"确认销售收入 {order_no}")
# 2. 结转成本(如果有)
if cost > 0:
entries = [
Transaction("6401", cost, TransactionType.DEBIT,
datetime.now(), f"销售成本", order_no),
Transaction("1405", cost, TransactionType.CREDIT,
datetime.now(), f"减少库存", order_no)
]
self.make_journal_entry(entries, f"结转销售成本 {order_no}")
def post_purchase(self, po_no: str, amount: float,
supplier: str, is_inventory: bool = True):
"""过账采购"""
debit_account = "1405" if is_inventory else "6602" # 库存或费用
entries = [
Transaction(debit_account, amount, TransactionType.DEBIT,
datetime.now(), f"采购自{supplier}", po_no),
Transaction("2202", amount, TransactionType.CREDIT,
datetime.now(), f"应付账款", po_no)
]
self.make_journal_entry(entries, f"采购入库 {po_no}")
def get_trial_balance(self) -> Dict:
"""生成试算平衡表"""
trial_balance = []
for account in self.accounts.values():
trial_balance.append({
'account_code': account.code,
'account_name': account.name,
'account_type': account.type.value,
'debit': account.balance if account.balance > 0 else 0,
'credit': -account.balance if account.balance < 0 else 0
})
total_debit = sum(item['debit'] for item in trial_balance)
total_credit = sum(item['credit'] for item in trial_balance)
return {
'report_date': datetime.now().strftime('%Y-%m-%d'),
'trial_balance': trial_balance,
'total_debit': total_debit,
'total_credit': total_credit,
'is_balanced': abs(total_debit - total_credit) < 0.01
}
def generate_income_statement(self, start_date: datetime,
end_date: datetime) -> Dict:
"""生成利润表"""
revenue = 0.0
expense = 0.0
for account in self.accounts.values():
if account.type == AccountType.REVENUE:
revenue += account.balance
elif account.type == AccountType.EXPENSE:
expense += account.balance
net_income = revenue - expense
return {
'period': f"{start_date.strftime('%Y-%m-%d')} 至 {end_date.strftime('%Y-%m-%d')}",
'revenue': revenue,
'expense': expense,
'net_income': net_income,
'gross_margin': (revenue - expense) / revenue if revenue > 0 else 0
}
def generate_balance_sheet(self) -> Dict:
"""生成资产负债表"""
assets = 0.0
liabilities = 0.0
equity = 0.0
for account in self.accounts.values():
if account.type == AccountType.ASSET:
assets += account.balance
elif account.type == AccountType.LIABILITY:
liabilities += account.balance
elif account.type == AccountType.EQUITY:
equity += account.balance
# 检查平衡: 资产 = 负债 + 所有者权益
is_balanced = abs(assets - (liabilities + equity)) < 0.01
return {
'report_date': datetime.now().strftime('%Y-%m-%d'),
'assets': assets,
'liabilities': liabilities,
'equity': equity,
'total_liabilities_equity': liabilities + equity,
'is_balanced': is_balanced
}
# 使用示例
if __name__ == "__main__":
print("\n💰 ERP 财务会计模块演示")
print("=" * 40)
ledger = GeneralLedger()
# 模拟一个月业务
print("1. 初始投资:")
entries = [
Transaction("1001", 1000000, TransactionType.DEBIT,
datetime.now(), "收到投资款", "INV001"),
Transaction("3001", 1000000, TransactionType.CREDIT,
datetime.now(), "实收资本", "INV001")
]
ledger.make_journal_entry(entries, "公司成立收到投资")
print("\n2. 采购原材料:")
ledger.post_purchase("PO001", 200000, "供应商A", is_inventory=True)
print("\n3. 支付管理费用:")
entries = [
Transaction("6602", 50000, TransactionType.DEBIT,
datetime.now(), "支付工资", "PAY001"),
Transaction("1001", 50000, TransactionType.CREDIT,
datetime.now(), "银行支付", "PAY001")
]
ledger.make_journal_entry(entries, "支付员工工资")
print("\n4. 销售产品:")
ledger.post_sales_revenue("SO001", 350000, "客户A", cost=120000)
# 生成财务报表
print("\n" + "=" * 40)
print("📈 财务报表")
print("=" * 40)
# 试算平衡表
trial_balance = ledger.get_trial_balance()
print("\n试算平衡表:")
print(f" 总借方: ¥{trial_balance['total_debit']:,.2f}")
print(f" 总贷方: ¥{trial_balance['total_credit']:,.2f}")
print(f" 是否平衡: {'✅' if trial_balance['is_balanced'] else '❌'}")
# 利润表
income_stmt = ledger.generate_income_statement(
datetime.now() - timedelta(days=30),
datetime.now()
)
print("\n利润表:")
print(f" 期间: {income_stmt['period']}")
print(f" 收入: ¥{income_stmt['revenue']:,.2f}")
print(f" 费用: ¥{income_stmt['expense']:,.2f}")
print(f" 净利润: ¥{income_stmt['net_income']:,.2f}")
print(f" 毛利率: {income_stmt['gross_margin']:.1%}")
# 资产负债表
balance_sheet = ledger.generate_balance_sheet()
print("\n资产负债表:")
print(f" 资产: ¥{balance_sheet['assets']:,.2f}")
print(f" 负债: ¥{balance_sheet['liabilities']:,.2f}")
print(f" 所有者权益: ¥{balance_sheet['equity']:,.2f}")
print(f" 负债+权益: ¥{balance_sheet['total_liabilities_equity']:,.2f}")
print(f" 会计恒等式: {'✅' if balance_sheet['is_balanced'] else '❌'}")5. 控制(CO)
作用:成本中心、利润中心、内部订单
# controlling_module.py
from datetime import datetime
from dataclasses import dataclass
from typing import Dict, List
import json
@dataclass
class CostCenter:
"""成本中心"""
code: str
name: str
manager: str
department: str
cost_element: str # 成本要素
def __post_init__(self):
self.planned_cost = 0.0
self.actual_cost = 0.0
self.variance = 0.0
@dataclass
class ProfitCenter:
"""利润中心"""
code: str
name: str
responsible: str
def __post_init__(self):
self.revenue = 0.0
self.cost = 0.0
self.profit = 0.0
class InternalOrder:
"""内部订单(用于项目成本归集)"""
def __init__(self, order_no: str, description: str,
cost_center: str, budget: float):
self.order_no = order_no
self.description = description
self.cost_center = cost_center
self.budget = budget
self.actual_cost = 0.0
self.status = "open"
self.start_date = datetime.now()
self.end_date = None
def post_cost(self, amount: float, description: str):
"""过账成本"""
if self.actual_cost + amount > self.budget:
print(f"⚠️ 警告: 内部订单 {self.order_no} 将超出预算")
self.actual_cost += amount
print(f"📊 内部订单 {self.order_no}: 增加成本 ¥{amount:,.2f} ({description})")
def calculate_variance(self) -> float:
"""计算预算差异"""
return self.actual_cost - self.budget
def close_order(self):
"""关闭内部订单"""
self.status = "closed"
self.end_date = datetime.now()
variance = self.calculate_variance()
print(f"🔒 关闭内部订单 {self.order_no}")
print(f" 预算: ¥{self.budget:,.2f}")
print(f" 实际: ¥{self.actual_cost:,.2f}")
print(f" 差异: ¥{variance:,.2f} ({'超支' if variance > 0 else '节约'})")
return variance
class CostController:
"""成本控制器"""
def __init__(self):
self.cost_centers: Dict[str, CostCenter] = {}
self.profit_centers: Dict[str, ProfitCenter] = {}
self.internal_orders: Dict[str, InternalOrder] = {}
def add_cost_center(self, cc: CostCenter):
self.cost_centers[cc.code] = cc
def add_profit_center(self, pc: ProfitCenter):
self.profit_centers[pc.code] = pc
def create_internal_order(self, order: InternalOrder):
self.internal_orders[order.order_no] = order
def allocate_cost(self, from_cc: str, to_cc: str, amount: float,
description: str):
"""成本分配"""
if from_cc in self.cost_centers and to_cc in self.cost_centers:
self.cost_centers[from_cc].actual_cost -= amount
self.cost_centers[to_cc].actual_cost += amount
print(f"🔄 成本分配: {from_cc} → {to_cc} ¥{amount:,.2f}")
def calculate_center_performance(self):
"""计算中心绩效"""
print("\n" + "=" * 40)
print("📊 成本中心绩效报告")
print("=" * 40)
for cc in self.cost_centers.values():
cc.variance = cc.actual_cost - cc.planned_cost
variance_pct = (cc.variance / cc.planned_cost * 100) if cc.planned_cost > 0 else 0
status = "✅" if cc.variance <= 0 else "⚠️"
print(f"{cc.code} {cc.name}:")
print(f" 预算: ¥{cc.planned_cost:,.2f}")
print(f" 实际: ¥{cc.actual_cost:,.2f}")
print(f" 差异: {status} ¥{cc.variance:,.2f} ({variance_pct:+.1f}%)")
print("\n" + "=" * 40)
print("📈 利润中心绩效报告")
print("=" * 40)
for pc in self.profit_centers.values():
pc.profit = pc.revenue - pc.cost
margin = (pc.profit / pc.revenue * 100) if pc.revenue > 0 else 0
print(f"{pc.code} {pc.name}:")
print(f" 收入: ¥{pc.revenue:,.2f}")
print(f" 成本: ¥{pc.cost:,.2f}")
print(f" 利润: ¥{pc.profit:,.2f} (利润率: {margin:.1f}%)")
def generate_cost_report(self, report_type: str = "monthly"):
"""生成成本报告"""
report = {
"report_type": report_type,
"generated_at": datetime.now().isoformat(),
"cost_centers": [],
"profit_centers": [],
"internal_orders": []
}
for cc in self.cost_centers.values():
report["cost_centers"].append({
"code": cc.code,
"name": cc.name,
"planned": cc.planned_cost,
"actual": cc.actual_cost,
"variance": cc.variance
})
for pc in self.profit_centers.values():
report["profit_centers"].append({
"code": pc.code,
"name": pc.name,
"revenue": pc.revenue,
"cost": pc.cost,
"profit": pc.profit
})
for io in self.internal_orders.values():
report["internal_orders"].append({
"order_no": io.order_no,
"description": io.description,
"budget": io.budget,
"actual": io.actual_cost,
"variance": io.calculate_variance(),
"status": io.status
})
return report
# 使用示例
if __name__ == "__main__":
print("\n📈 ERP 控制模块演示")
print("=" * 40)
controller = CostController()
# 创建成本中心
cc1 = CostCenter("CC001", "生产一部", "张三", "生产部", "500101")
cc2 = CostCenter("CC002", "研发中心", "李四", "研发部", "500102")
cc3 = CostCenter("CC003", "销售部", "王五", "销售部", "500103")
cc1.planned_cost = 500000
cc2.planned_cost = 300000
cc3.planned_cost = 200000
controller.add_cost_center(cc1)
controller.add_cost_center(cc2)
controller.add_cost_center(cc3)
# 创建利润中心
pc1 = ProfitCenter("PC001", "华东大区", "赵六")
pc2 = ProfitCenter("PC002", "华南大区", "钱七")
pc1.revenue = 1500000
pc1.cost = 900000
pc2.revenue = 1200000
pc2.cost = 750000
controller.add_profit_center(pc1)
controller.add_profit_center(pc2)
# 创建内部订单(项目)
project1 = InternalOrder("IO202405001", "新产品研发项目", "CC002", 200000)
project2 = InternalOrder("IO202405002", "营销活动项目", "CC003", 80000)
controller.create_internal_order(project1)
controller.create_internal_order(project2)
# 模拟成本发生
print("\n💰 模拟业务发生:")
cc1.actual_cost = 520000 # 生产一部实际成本
cc2.actual_cost = 280000 # 研发中心实际成本
cc3.actual_cost = 210000 # 销售部实际成本
project1.post_cost(150000, "研发人员工资")
project1.post_cost(40000, "研发材料")
project1.post_cost(30000, "测试费用")
project2.post_cost(50000, "广告费")
project2.post_cost(25000, "活动费用")
# 成本分配
controller.allocate_cost("CC002", "CC001", 50000, "技术支持费用")
# 计算绩效
controller.calculate_center_performance()
# 关闭内部订单
print("\n🔒 关闭内部订单:")
project1.close_order()
project2.close_order()
# 生成报告
report = controller.generate_cost_report()
print(f"\n📄 成本控制报告已生成,包含:")
print(f" - 成本中心: {len(report['cost_centers'])} 个")
print(f" - 利润中心: {len(report['profit_centers'])} 个")
print(f" - 内部订单: {len(report['internal_orders'])} 个")6. 人力资源(HR)
作用:组织、人事、薪酬、考勤
# hr_module.py
from datetime import datetime, timedelta
from dataclasses import dataclass
from typing import List, Dict
from enum import Enum
import json
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class EmployeeStatus(Enum):
ACTIVE = "active"
PROBATION = "probation"
LEAVE = "leave"
RESIGNED = "resigned"
RETIRED = "retired"
class AttendanceStatus(Enum):
PRESENT = "present"
ABSENT = "absent"
LATE = "late"
LEAVE = "leave"
OVERTIME = "overtime"
@dataclass
class Employee:
"""员工主数据"""
employee_id: str
name: str
department: str
position: str
hire_date: datetime
base_salary: float
def __post_init__(self):
self.status = EmployeeStatus.ACTIVE
self.email = f"{self.name.lower().replace(' ', '.')}@company.com"
self.manager_id: str = None
self.attendance_records: List[Dict] = []
self.leave_balance = 0 # 剩余年假
self.salary_history: List[Dict] = []
def get_service_years(self) -> float:
"""计算司龄"""
days = (datetime.now() - self.hire_date).days
return days / 365.25
def apply_leave(self, start_date: datetime, end_date: datetime,
leave_type: str, reason: str) -> bool:
"""申请休假"""
if self.leave_balance <= 0 and leave_type == "annual":
print(f"❌ {self.name} 年假余额不足")
return False
leave_days = (end_date - start_date).days + 1
if leave_type == "annual":
if leave_days > self.leave_balance:
print(f"❌ 申请 {leave_days} 天,但只有 {self.leave_balance} 天年假")
return False
self.leave_balance -= leave_days
print(f"✅ {self.name} 申请 {leave_type} 假 {leave_days} 天")
return True
def record_attendance(self, date: datetime, status: AttendanceStatus,
hours: float = 8.0, notes: str = ""):
"""记录考勤"""
record = {
"date": date.strftime("%Y-%m-%d"),
"status": status.value,
"check_in": "09:00",
"check_out": "18:00",
"hours": hours,
"notes": notes
}
self.attendance_records.append(record)
status_icon = {
AttendanceStatus.PRESENT: "✅",
AttendanceStatus.ABSENT: "❌",
AttendanceStatus.LATE: "⏰",
AttendanceStatus.OVERTIME: "💪"
}.get(status, "📝")
print(f"{status_icon} {self.name} {date.strftime('%m-%d')}: {status.value}")
class PayrollCalculator:
"""薪资计算器"""
def __init__(self):
self.tax_rates = [
(36000, 0.03, 0),
(144000, 0.10, 2520),
(300000, 0.20, 16920),
(420000, 0.25, 31920),
(660000, 0.30, 52920),
(960000, 0.35, 85920),
(float('inf'), 0.45, 181920)
]
self.social_insurance_rate = 0.105 # 社保比例
self.housing_fund_rate = 0.12 # 公积金比例
def calculate_tax(self, annual_income: float) -> float:
"""计算个人所得税"""
for threshold, rate, deduction in self.tax_rates:
if annual_income <= threshold:
return annual_income * rate - deduction
return 0
def calculate_salary(self, employee: Employee,
attendance_days: int = 22,
overtime_hours: float = 0,
bonuses: float = 0) -> Dict:
"""计算薪资"""
# 基本工资
daily_rate = employee.base_salary / 22
basic_salary = daily_rate * attendance_days
# 加班费
overtime_rate = daily_rate / 8 * 1.5
overtime_pay = overtime_hours * overtime_rate
# 应发工资
gross_salary = basic_salary + overtime_pay + bonuses
# 扣除项
social_insurance = gross_salary * self.social_insurance_rate
housing_fund = gross_salary * self.housing_fund_rate
# 个人所得税(简化计算)
annual_income = gross_salary * 12
income_tax = self.calculate_tax(annual_income) / 12
# 实发工资
net_salary = gross_salary - social_insurance - housing_fund - income_tax
payslip = {
"employee_id": employee.employee_id,
"employee_name": employee.name,
"pay_month": datetime.now().strftime("%Y-%m"),
"basic_salary": round(basic_salary, 2),
"overtime_pay": round(overtime_pay, 2),
"bonuses": round(bonuses, 2),
"gross_salary": round(gross_salary, 2),
"deductions": {
"social_insurance": round(social_insurance, 2),
"housing_fund": round(housing_fund, 2),
"income_tax": round(income_tax, 2)
},
"total_deductions": round(social_insurance + housing_fund + income_tax, 2),
"net_salary": round(net_salary, 2)
}
# 记录薪资历史
employee.salary_history.append(payslip)
return payslip
class OrganizationChart:
"""组织架构图"""
def __init__(self):
self.employees: Dict[str, Employee] = {}
self.departments: Dict[str, List[str]] = {} # 部门 -> [员工ID]
def add_employee(self, employee: Employee, department: str):
self.employees[employee.employee_id] = employee
if department not in self.departments:
self.departments[department] = []
self.departments[department].append(employee.employee_id)
def set_manager(self, employee_id: str, manager_id: str):
"""设置汇报关系"""
if employee_id in self.employees and manager_id in self.employees:
self.employees[employee_id].manager_id = manager_id
def get_subordinates(self, manager_id: str) -> List[Employee]:
"""获取下属"""
return [
emp for emp in self.employees.values()
if emp.manager_id == manager_id
]
def get_department_staff(self, department: str) -> List[Employee]:
"""获取部门员工"""
staff_ids = self.departments.get(department, [])
return [self.employees[eid] for eid in staff_ids]
def generate_org_chart(self) -> Dict:
"""生成组织架构"""
org_chart = {}
for dept, emp_ids in self.departments.items():
managers = []
for emp_id in emp_ids:
emp = self.employees[emp_id]
if emp.manager_id is None: # 部门负责人
subordinates = self.get_subordinates(emp_id)
manager_node = {
"name": emp.name,
"position": emp.position,
"subordinates": [
{"name": sub.name, "position": sub.position}
for sub in subordinates
]
}
managers.append(manager_node)
org_chart[dept] = managers
return org_chart
# 使用示例
if __name__ == "__main__":
print("\n👥 ERP 人力资源模块演示")
print("=" * 40)
org = OrganizationChart()
# 创建员工
ceo = Employee("E001", "张三", "总裁办", "CEO", datetime(2020, 1, 1), 100000)
cto = Employee("E002", "李四", "技术部", "CTO", datetime(2021, 3, 1), 80000)
dev1 = Employee("E003", "王五", "技术部", "高级工程师", datetime(2022, 6, 1), 40000)
dev2 = Employee("E004", "赵六", "技术部", "工程师", datetime(2023, 8, 1), 30000)
hr_manager = Employee("E005", "钱七", "人力资源部", "HR经理", datetime(2021, 5, 1), 50000)
# 添加到组织
org.add_employee(ceo, "总裁办")
org.add_employee(cto, "技术部")
org.add_employee(dev1, "技术部")
org.add_employee(dev2, "技术部")
org.add_employee(hr_manager, "人力资源部")
# 设置汇报关系
org.set_manager(cto.employee_id, ceo.employee_id)
org.set_manager(dev1.employee_id, cto.employee_id)
org.set_manager(dev2.employee_id, cto.employee_id)
# 考勤管理
print("📅 考勤记录:")
today = datetime.now()
dev1.record_attendance(today, AttendanceStatus.PRESENT)
dev2.record_attendance(today, AttendanceStatus.LATE, notes="堵车")
# 休假管理
print("\n🏖️ 休假管理:")
dev1.leave_balance = 10
dev1.apply_leave(
datetime.now() + timedelta(days=7),
datetime.now() + timedelta(days=9),
"annual",
"年度休假"
)
# 薪资计算
print("\n💰 薪资计算:")
calculator = PayrollCalculator()
payslip = calculator.calculate_salary(
dev1,
attendance_days=22,
overtime_hours=20,
bonuses=5000
)
print(f"员工: {payslip['employee_name']}")
print(f"应发工资: ¥{payslip['gross_salary']:,.2f}")
print(f"扣除合计: ¥{payslip['total_deductions']:,.2f}")
print(f"实发工资: ¥{payslip['net_salary']:,.2f}")
# 组织架构
print("\n🏢 组织架构:")
org_chart = org.generate_org_chart()
for dept, managers in org_chart.items():
print(f"\n{dept}:")
for manager in managers:
print(f" ├─ {manager['name']} ({manager['position']})")
for sub in manager['subordinates']:
print(f" │ ├─ {sub['name']} ({sub['position']})")
# 司龄统计
print("\n📊 员工司龄:")
for emp in org.employees.values():
years = emp.get_service_years()
print(f" {emp.name}: {years:.1f} 年")三、 ERP 的现代演进:从本地部署到云原生
时代 | 技术架构 | 特点 | 代表产品 |
|---|---|---|---|
ERP 1.0 (1990s) | 单机版 | 部门级,无集成 | 用友U8、金蝶K3 |
ERP 2.0 (2000s) | C/S 架构 | 企业级,模块化 | SAP R/3、Oracle EBS |
ERP 3.0 (2010s) | B/S 架构 | 集团化,国际化 | SAP S/4HANA |
ERP 4.0 (2020s+) | 云原生 | AI驱动,微服务 | 金蝶云星空、用友YonSuite |
现代 ERP 关键技术栈:
# 现代云ERP技术架构示意
modern_erp_stack = {
"前端": ["React/Vue", "移动优先", "PWA"],
"网关": ["Kong/Apigee", "GraphQL", "gRPC"],
"微服务": ["Spring Cloud", "Kubernetes", "Docker"],
"数据层": ["分布式数据库", "数据湖", "实时数仓"],
"AI能力": ["预测分析", "RPA", "智能推荐"]
}四、 ERP 实施成功率的关键因素
根据 Gartner 研究,ERP 项目失败率高达 70%。成功的关键在于:
- 业务流程重组(BPR):先优化流程,再上系统
- 数据治理:主数据(客户、产品、供应商)质量是生命线
- 分阶段实施:先财务、进销存,再生产、HR
- 用户培训:系统最终是给人用的
- 持续优化:上线只是开始,不是结束
💡 总结
ERP 不是银弹,而是企业管理的放大镜:
- 管理好的企业,ERP 让其效率提升 30-50%
- 管理差的企业,ERP 会让问题暴露无遗
记住:选择 ERP 不是选择软件,而是选择管理理念。最好的 ERP 是那个最匹配你业务模式和管理水平的系统,而不是最贵或功能最多的。
互动话题:你的公司正在使用或考虑哪个 ERP 系统?实施过程中遇到了哪些挑战?评论区聊聊你的经验!