电商库存扣减是高并发场景下的经典难题。超卖的本质是:多个并发请求同时读到库存>0,然后同时扣减,导致实际扣减数超过库存总量。下面我将从最基础的方案讲起,逐步深入到生产级方案。
一、 超卖问题的根源
# 错误示范:经典的超卖代码
def deduct_stock(product_id, quantity):
stock = db.query("SELECT stock FROM products WHERE id = ?", product_id)
if stock >= quantity:
db.execute("UPDATE products SET stock = stock - ? WHERE id = ?", quantity, product_id)
return True
return False问题:在高并发下,两个请求同时读到
stock=10,都认为库存充足,都执行了扣减,最终库存变成 -2。二、 分布式锁的三种实现方案
方案一:数据库乐观锁(最简单,适合低并发)
原理:利用数据库的行锁或版本号机制,保证扣减操作的原子性。
# optimistic_lock.py
import pymysql
from contextlib import contextmanager
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class OptimisticLockDeduction:
"""数据库乐观锁扣减库存"""
def __init__(self, db_config):
self.db_config = db_config
@contextmanager
def get_connection(self):
conn = pymysql.connect(**self.db_config)
try:
yield conn
conn.commit()
except:
conn.rollback()
raise
finally:
conn.close()
def deduct_stock(self, product_id, quantity):
"""使用乐观锁扣减库存(CAS思想)"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 1. 查询当前库存和版本号
cursor.execute(
"SELECT stock, version FROM products WHERE id = %s",
(product_id,)
)
row = cursor.fetchone()
if not row:
return False, "商品不存在"
stock, version = row
# 2. 检查库存是否充足
if stock < quantity:
return False, "库存不足"
# 3. 使用版本号进行乐观锁更新
affected = cursor.execute(
"""UPDATE products
SET stock = stock - %s, version = version + 1
WHERE id = %s AND version = %s""",
(quantity, product_id, version)
)
# 4. 检查是否更新成功
if affected == 0:
return False, "并发冲突,请重试"
return True, "扣减成功"
def deduct_stock_with_sql(self, product_id, quantity):
"""使用SQL直接扣减(更简洁的方式)"""
with self.get_connection() as conn:
cursor = conn.cursor()
# 直接在SQL中判断库存
affected = cursor.execute(
"""UPDATE products
SET stock = stock - %s
WHERE id = %s AND stock >= %s""",
(quantity, product_id, quantity)
)
if affected == 0:
# 可能是库存不足或商品不存在
cursor.execute(
"SELECT stock FROM products WHERE id = %s",
(product_id,)
)
row = cursor.fetchone()
if row and row[0] < quantity:
return False, "库存不足"
elif not row:
return False, "商品不存在"
return False, "并发冲突"
return True, "扣减成功"
# 使用示例
if __name__ == "__main__":
db_config = {
'host': 'localhost',
'user': 'root',
'password': 'password',
'database': 'shop',
'charset': 'utf8mb4'
}
deduction = OptimisticLockDeduction(db_config)
success, msg = deduction.deduct_stock(1001, 2)
print(f"{'✅' if success else '❌'} {msg}")优点:实现简单,不需要额外组件
缺点:高并发下大量重试,性能下降明显
方案二:Redis分布式锁(高性能,适合中等并发)
原理:利用Redis的单线程特性和SETNX命令,实现分布式锁。
# redis_lock.py
import redis
import time
import uuid
import threading
from contextlib import contextmanager
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class RedisDistributedLock:
"""Redis分布式锁实现"""
def __init__(self, redis_client):
self.redis = redis_client
self.lock_timeout = 30 # 锁的超时时间(秒)
def acquire_lock(self, lock_key, request_id, timeout=None):
"""
获取分布式锁
Args:
lock_key: 锁的键
request_id: 请求唯一标识(用于解锁验证)
timeout: 锁的超时时间
Returns:
bool: 是否获取成功
"""
timeout = timeout or self.lock_timeout
# SETNX + EXPIRE 原子操作(Redis 2.6.12+)
result = self.redis.set(
lock_key,
request_id,
nx=True, # 只在键不存在时设置
ex=timeout # 设置过期时间
)
return result is True
def release_lock(self, lock_key, request_id):
"""
释放分布式锁(使用Lua脚本保证原子性)
只有持有锁的客户端才能释放锁,防止误删别人的锁
"""
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
result = self.redis.eval(lua_script, 1, lock_key, request_id)
return result == 1
def spin_lock(self, lock_key, request_id, acquire_timeout=10):
"""
自旋获取锁(带超时)
Args:
lock_key: 锁的键
request_id: 请求唯一标识
acquire_timeout: 获取锁的超时时间
Returns:
bool: 是否获取成功
"""
start_time = time.time()
while time.time() - start_time < acquire_timeout:
if self.acquire_lock(lock_key, request_id):
return True
time.sleep(0.1) # 休眠100ms再试
return False
@contextmanager
def lock(self, lock_key, acquire_timeout=10):
"""
上下文管理器方式使用锁
Usage:
with lock_manager.lock('product:1001'):
# 执行业务逻辑
pass
"""
request_id = str(uuid.uuid4())
if not self.spin_lock(lock_key, request_id, acquire_timeout):
raise TimeoutError(f"获取锁超时: {lock_key}")
try:
yield
finally:
self.release_lock(lock_key, request_id)
class RedisStockDeduction:
"""基于Redis分布式锁的库存扣减"""
def __init__(self, redis_client, db_connection):
self.redis = redis_client
self.db = db_connection
self.lock_manager = RedisDistributedLock(redis_client)
def deduct_stock(self, product_id, quantity):
"""
扣减库存(带分布式锁)
流程:
1. 获取分布式锁
2. 查询库存
3. 扣减库存
4. 释放锁
"""
lock_key = f"lock:product:{product_id}"
try:
with self.lock_manager.lock(lock_key):
# 在锁的保护下执行扣减
cursor = self.db.cursor()
# 查询当前库存
cursor.execute(
"SELECT stock FROM products WHERE id = %s FOR UPDATE",
(product_id,)
)
row = cursor.fetchone()
if not row:
return False, "商品不存在"
if row[0] < quantity:
return False, "库存不足"
# 扣减库存
cursor.execute(
"UPDATE products SET stock = stock - %s WHERE id = %s",
(quantity, product_id)
)
self.db.commit()
return True, "扣减成功"
except TimeoutError:
return False, "系统繁忙,请稍后重试"
except Exception as e:
self.db.rollback()
return False, f"扣减失败: {str(e)}"
# Redlock算法实现(Redis官方推荐的分布式锁算法)
class Redlock:
"""Redlock算法:适用于Redis集群的高可靠分布式锁"""
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
def __init__(self, redis_nodes):
"""
Args:
redis_nodes: Redis节点列表 [client1, client2, client3]
"""
self.redis_nodes = redis_nodes
self.quorum = len(redis_nodes) // 2 + 1 # 多数派
def acquire_lock(self, resource, ttl):
"""
获取Redlock
Args:
resource: 资源名称
ttl: 锁的存活时间(毫秒)
"""
request_id = str(uuid.uuid4())
start_time = int(time.time() * 1000)
acquired_count = 0
# 向所有Redis节点请求锁
for node in self.redis_nodes:
try:
if node.set(resource, request_id, nx=True, px=ttl):
acquired_count += 1
except:
pass
# 检查是否获取了多数派的锁
elapsed = int(time.time() * 1000) - start_time
if acquired_count >= self.quorum and elapsed < ttl:
return True, request_id
else:
# 释放已获取的锁
for node in self.redis_nodes:
try:
node.delete(resource)
except:
pass
return False, None
def release_lock(self, resource, request_id):
"""释放Redlock"""
lua_script = """
if redis.call('get', KEYS[1]) == ARGV[1] then
return redis.call('del', KEYS[1])
else
return 0
end
"""
released_count = 0
for node in self.redis_nodes:
try:
if node.eval(lua_script, 1, resource, request_id):
released_count += 1
except:
pass
return released_count >= self.quorum
# 使用示例
if __name__ == "__main__":
# 连接Redis
redis_client = redis.Redis(
host='localhost',
port=6379,
db=0,
decode_responses=True
)
# 创建锁管理器
lock_manager = RedisDistributedLock(redis_client)
# 模拟并发扣减
import concurrent.futures
def worker(product_id, quantity, worker_id):
request_id = f"worker_{worker_id}_{uuid.uuid4()}"
lock_key = f"product_lock:{product_id}"
if lock_manager.acquire_lock(lock_key, request_id):
try:
print(f"Worker {worker_id}: 获取锁成功,开始扣减")
# 模拟业务处理
time.sleep(0.1)
print(f"Worker {worker_id}: 扣减完成")
return True
finally:
lock_manager.release_lock(lock_key, request_id)
else:
print(f"Worker {worker_id}: 获取锁失败")
return False
# 启动10个并发线程
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
futures = [
executor.submit(worker, 1001, 1, i)
for i in range(10)
]
results = [f.result() for f in concurrent.futures.as_completed(futures)]
print(f"成功: {sum(results)}/{len(results)}")方案三:ZooKeeper分布式锁(强一致性,适合金融级场景)
原理:利用ZooKeeper的临时顺序节点和Watcher机制,实现公平锁。
# zk_lock.py
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError, NoNodeError
import uuid
import time
from contextlib import contextmanager
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class ZooKeeperDistributedLock:
"""ZooKeeper分布式锁实现"""
def __init__(self, hosts='localhost:2181'):
self.zk = KazooClient(hosts=hosts)
self.zk.start()
# 锁的基础路径
self.lock_base_path = "/locks"
# 确保基础路径存在
self.zk.ensure_path(self.lock_base_path)
def create_lock_node(self, lock_path):
"""
创建锁节点
使用临时顺序节点,保证:
1. 客户端断开连接时自动删除
2. 节点有序,实现公平锁
"""
request_id = str(uuid.uuid4())
node_path = f"{lock_path}/{request_id}-"
# 创建临时顺序节点
actual_path = self.zk.create(
node_path,
value=b"locked",
sequence=True,
ephemeral=True
)
return actual_path
def acquire_lock(self, lock_name, timeout=10):
"""
获取分布式锁
使用ZooKeeper的顺序节点实现公平锁
"""
lock_path = f"{self.lock_base_path}/{lock_name}"
self.zk.ensure_path(lock_path)
# 创建自己的锁节点
my_node = self.create_lock_node(lock_path)
try:
# 获取所有子节点
children = self.zk.get_children(lock_path)
# 排序找到最小的节点(最早的请求)
sorted_children = sorted(children)
my_node_name = my_node.split('/')[-1]
# 如果自己是第一个,获取锁成功
if my_node_name == sorted_children[0]:
return True, my_node
# 否则,监听前一个节点
my_index = sorted_children.index(my_node_name)
prev_node = f"{lock_path}/{sorted_children[my_index - 1]}"
# 等待前一个节点释放
event = self.zk.get(prev_node, watch=self._watch_func)
# 等待事件触发或超时
if event and event[0] is not None:
# 前一个节点还存在,等待
start_time = time.time()
while time.time() - start_time < timeout:
if not self.zk.exists(prev_node):
return True, my_node
time.sleep(0.1)
# 超时,释放自己的节点
self.release_lock(my_node)
return False, None
return True, my_node
except Exception as e:
self.release_lock(my_node)
raise e
def _watch_func(self, event):
"""Watcher回调函数"""
pass # 实际使用时可以记录日志
def release_lock(self, node_path):
"""释放锁"""
try:
self.zk.delete(node_path)
return True
except NoNodeError:
return False
except Exception as e:
print(f"释放锁失败: {e}")
return False
@contextmanager
def lock(self, lock_name, timeout=10):
"""
上下文管理器方式使用锁
Usage:
with zk_lock.lock('product:1001'):
# 执行业务逻辑
pass
"""
acquired, node_path = self.acquire_lock(lock_name, timeout)
if not acquired:
raise TimeoutError(f"获取锁超时: {lock_name}")
try:
yield
finally:
self.release_lock(node_path)
def close(self):
"""关闭ZooKeeper连接"""
self.zk.stop()
self.zk.close()
class ZKStockDeduction:
"""基于ZooKeeper分布式锁的库存扣减"""
def __init__(self, zk_hosts, db_connection):
self.zk_lock = ZooKeeperDistributedLock(zk_hosts)
self.db = db_connection
def deduct_stock(self, product_id, quantity):
"""
使用ZooKeeper锁扣减库存
"""
lock_name = f"product_stock_{product_id}"
try:
with self.zk_lock.lock(lock_name):
# 在锁的保护下执行扣减
cursor = self.db.cursor()
cursor.execute(
"SELECT stock FROM products WHERE id = %s FOR UPDATE",
(product_id,)
)
row = cursor.fetchone()
if not row:
return False, "商品不存在"
if row[0] < quantity:
return False, "库存不足"
cursor.execute(
"UPDATE products SET stock = stock - %s WHERE id = %s",
(quantity, product_id)
)
self.db.commit()
return True, "扣减成功"
except TimeoutError:
return False, "系统繁忙,请稍后重试"
except Exception as e:
self.db.rollback()
return False, f"扣减失败: {str(e)}"
# 使用示例
if __name__ == "__main__":
# 创建ZooKeeper锁管理器
zk_lock = ZooKeeperDistributedLock('localhost:2181')
try:
# 测试锁的基本功能
print("测试ZooKeeper分布式锁...")
# 获取锁
acquired, node_path = zk_lock.acquire_lock("test_lock")
print(f"获取锁: {'✅' if acquired else '❌'}")
if acquired:
# 执行业务逻辑
print("执行业务逻辑...")
time.sleep(2)
# 释放锁
released = zk_lock.release_lock(node_path)
print(f"释放锁: {'✅' if released else '❌'}")
finally:
zk_lock.close()三、 三种方案对比
方案 | 一致性 | 性能 | 可靠性 | 复杂度 | 适用场景 |
|---|---|---|---|---|---|
数据库乐观锁 | 弱 | 低 | 高 | 低 | 低并发,简单场景 |
Redis分布式锁 | 中等 | 高 | 中等 | 中 | 高并发,允许短暂不一致 |
ZooKeeper锁 | 强 | 中等 | 极高 | 高 | 金融级,强一致性要求 |
四、 生产级最佳实践
- 组合使用:Redis锁做高性能扣减 + 数据库兜底校验
- 库存预热:秒杀开始前将库存加载到Redis,减少数据库压力
- 异步扣减:先扣Redis库存,异步同步到数据库
- 兜底策略:即使分布式锁失效,数据库的乐观锁也能防止超卖
记住:没有完美的分布式锁方案,只有最适合你业务场景的方案。对于电商秒杀场景,Redis分布式锁 + 数据库乐观锁兜底是最常见的生产级方案。