×

🔥 电商库存扣减防超卖:分布式锁的三种实现(附Python源码)

万邦科技Lex 万邦科技Lex 发表于2026-05-31 16:44:44 浏览19 评论0

抢沙发发表评论

电商库存扣减是高并发场景下的经典难题。超卖的本质是:多个并发请求同时读到库存>0,然后同时扣减,导致实际扣减数超过库存总量。下面我将从最基础的方案讲起,逐步深入到生产级方案。

一、 超卖问题的根源

# 错误示范:经典的超卖代码
def deduct_stock(product_id, quantity):
    stock = db.query("SELECT stock FROM products WHERE id = ?", product_id)
    if stock >= quantity:
        db.execute("UPDATE products SET stock = stock - ? WHERE id = ?", quantity, product_id)
        return True
    return False
问题:在高并发下,两个请求同时读到 stock=10,都认为库存充足,都执行了扣减,最终库存变成 -2

二、 分布式锁的三种实现方案

方案一:数据库乐观锁(最简单,适合低并发)

原理:利用数据库的行锁或版本号机制,保证扣减操作的原子性。
# optimistic_lock.py
import pymysql
from contextlib import contextmanager
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class OptimisticLockDeduction:
    """数据库乐观锁扣减库存"""
    
    def __init__(self, db_config):
        self.db_config = db_config
    
    @contextmanager
    def get_connection(self):
        conn = pymysql.connect(**self.db_config)
        try:
            yield conn
            conn.commit()
        except:
            conn.rollback()
            raise
        finally:
            conn.close()
    
    def deduct_stock(self, product_id, quantity):
        """使用乐观锁扣减库存(CAS思想)"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            
            # 1. 查询当前库存和版本号
            cursor.execute(
                "SELECT stock, version FROM products WHERE id = %s",
                (product_id,)
            )
            row = cursor.fetchone()
            if not row:
                return False, "商品不存在"
            
            stock, version = row
            
            # 2. 检查库存是否充足
            if stock < quantity:
                return False, "库存不足"
            
            # 3. 使用版本号进行乐观锁更新
            affected = cursor.execute(
                """UPDATE products 
                   SET stock = stock - %s, version = version + 1 
                   WHERE id = %s AND version = %s""",
                (quantity, product_id, version)
            )
            
            # 4. 检查是否更新成功
            if affected == 0:
                return False, "并发冲突,请重试"
            
            return True, "扣减成功"
    
    def deduct_stock_with_sql(self, product_id, quantity):
        """使用SQL直接扣减(更简洁的方式)"""
        with self.get_connection() as conn:
            cursor = conn.cursor()
            
            # 直接在SQL中判断库存
            affected = cursor.execute(
                """UPDATE products 
                   SET stock = stock - %s 
                   WHERE id = %s AND stock >= %s""",
                (quantity, product_id, quantity)
            )
            
            if affected == 0:
                # 可能是库存不足或商品不存在
                cursor.execute(
                    "SELECT stock FROM products WHERE id = %s",
                    (product_id,)
                )
                row = cursor.fetchone()
                if row and row[0] < quantity:
                    return False, "库存不足"
                elif not row:
                    return False, "商品不存在"
                return False, "并发冲突"
            
            return True, "扣减成功"

# 使用示例
if __name__ == "__main__":
    db_config = {
        'host': 'localhost',
        'user': 'root',
        'password': 'password',
        'database': 'shop',
        'charset': 'utf8mb4'
    }
    
    deduction = OptimisticLockDeduction(db_config)
    success, msg = deduction.deduct_stock(1001, 2)
    print(f"{'✅' if success else '❌'} {msg}")
优点:实现简单,不需要额外组件
缺点:高并发下大量重试,性能下降明显

方案二:Redis分布式锁(高性能,适合中等并发)

原理:利用Redis的单线程特性和SETNX命令,实现分布式锁。
# redis_lock.py
import redis
import time
import uuid
import threading
from contextlib import contextmanager
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class RedisDistributedLock:
    """Redis分布式锁实现"""
    
    def __init__(self, redis_client):
        self.redis = redis_client
        self.lock_timeout = 30  # 锁的超时时间(秒)
    
    def acquire_lock(self, lock_key, request_id, timeout=None):
        """
        获取分布式锁
        
        Args:
            lock_key: 锁的键
            request_id: 请求唯一标识(用于解锁验证)
            timeout: 锁的超时时间
        
        Returns:
            bool: 是否获取成功
        """
        timeout = timeout or self.lock_timeout
        
        # SETNX + EXPIRE 原子操作(Redis 2.6.12+)
        result = self.redis.set(
            lock_key,
            request_id,
            nx=True,  # 只在键不存在时设置
            ex=timeout  # 设置过期时间
        )
        
        return result is True
    
    def release_lock(self, lock_key, request_id):
        """
        释放分布式锁(使用Lua脚本保证原子性)
        
        只有持有锁的客户端才能释放锁,防止误删别人的锁
        """
        lua_script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        
        result = self.redis.eval(lua_script, 1, lock_key, request_id)
        return result == 1
    
    def spin_lock(self, lock_key, request_id, acquire_timeout=10):
        """
        自旋获取锁(带超时)
        
        Args:
            lock_key: 锁的键
            request_id: 请求唯一标识
            acquire_timeout: 获取锁的超时时间
        
        Returns:
            bool: 是否获取成功
        """
        start_time = time.time()
        while time.time() - start_time < acquire_timeout:
            if self.acquire_lock(lock_key, request_id):
                return True
            time.sleep(0.1)  # 休眠100ms再试
        
        return False
    
    @contextmanager
    def lock(self, lock_key, acquire_timeout=10):
        """
        上下文管理器方式使用锁
        
        Usage:
            with lock_manager.lock('product:1001'):
                # 执行业务逻辑
                pass
        """
        request_id = str(uuid.uuid4())
        
        if not self.spin_lock(lock_key, request_id, acquire_timeout):
            raise TimeoutError(f"获取锁超时: {lock_key}")
        
        try:
            yield
        finally:
            self.release_lock(lock_key, request_id)


class RedisStockDeduction:
    """基于Redis分布式锁的库存扣减"""
    
    def __init__(self, redis_client, db_connection):
        self.redis = redis_client
        self.db = db_connection
        self.lock_manager = RedisDistributedLock(redis_client)
    
    def deduct_stock(self, product_id, quantity):
        """
        扣减库存(带分布式锁)
        
        流程:
        1. 获取分布式锁
        2. 查询库存
        3. 扣减库存
        4. 释放锁
        """
        lock_key = f"lock:product:{product_id}"
        
        try:
            with self.lock_manager.lock(lock_key):
                # 在锁的保护下执行扣减
                cursor = self.db.cursor()
                
                # 查询当前库存
                cursor.execute(
                    "SELECT stock FROM products WHERE id = %s FOR UPDATE",
                    (product_id,)
                )
                row = cursor.fetchone()
                
                if not row:
                    return False, "商品不存在"
                
                if row[0] < quantity:
                    return False, "库存不足"
                
                # 扣减库存
                cursor.execute(
                    "UPDATE products SET stock = stock - %s WHERE id = %s",
                    (quantity, product_id)
                )
                self.db.commit()
                
                return True, "扣减成功"
                
        except TimeoutError:
            return False, "系统繁忙,请稍后重试"
        except Exception as e:
            self.db.rollback()
            return False, f"扣减失败: {str(e)}"


# Redlock算法实现(Redis官方推荐的分布式锁算法)
class Redlock:
    """Redlock算法:适用于Redis集群的高可靠分布式锁"""
    # 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
    def __init__(self, redis_nodes):
        """
        Args:
            redis_nodes: Redis节点列表 [client1, client2, client3]
        """
        self.redis_nodes = redis_nodes
        self.quorum = len(redis_nodes) // 2 + 1  # 多数派
    
    def acquire_lock(self, resource, ttl):
        """
        获取Redlock
        
        Args:
            resource: 资源名称
            ttl: 锁的存活时间(毫秒)
        """
        request_id = str(uuid.uuid4())
        start_time = int(time.time() * 1000)
        acquired_count = 0
        
        # 向所有Redis节点请求锁
        for node in self.redis_nodes:
            try:
                if node.set(resource, request_id, nx=True, px=ttl):
                    acquired_count += 1
            except:
                pass
        
        # 检查是否获取了多数派的锁
        elapsed = int(time.time() * 1000) - start_time
        if acquired_count >= self.quorum and elapsed < ttl:
            return True, request_id
        else:
            # 释放已获取的锁
            for node in self.redis_nodes:
                try:
                    node.delete(resource)
                except:
                    pass
            return False, None
    
    def release_lock(self, resource, request_id):
        """释放Redlock"""
        lua_script = """
        if redis.call('get', KEYS[1]) == ARGV[1] then
            return redis.call('del', KEYS[1])
        else
            return 0
        end
        """
        
        released_count = 0
        for node in self.redis_nodes:
            try:
                if node.eval(lua_script, 1, resource, request_id):
                    released_count += 1
            except:
                pass
        
        return released_count >= self.quorum


# 使用示例
if __name__ == "__main__":
    # 连接Redis
    redis_client = redis.Redis(
        host='localhost',
        port=6379,
        db=0,
        decode_responses=True
    )
    
    # 创建锁管理器
    lock_manager = RedisDistributedLock(redis_client)
    
    # 模拟并发扣减
    import concurrent.futures
    
    def worker(product_id, quantity, worker_id):
        request_id = f"worker_{worker_id}_{uuid.uuid4()}"
        lock_key = f"product_lock:{product_id}"
        
        if lock_manager.acquire_lock(lock_key, request_id):
            try:
                print(f"Worker {worker_id}: 获取锁成功,开始扣减")
                # 模拟业务处理
                time.sleep(0.1)
                print(f"Worker {worker_id}: 扣减完成")
                return True
            finally:
                lock_manager.release_lock(lock_key, request_id)
        else:
            print(f"Worker {worker_id}: 获取锁失败")
            return False
    
    # 启动10个并发线程
    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        futures = [
            executor.submit(worker, 1001, 1, i)
            for i in range(10)
        ]
        
        results = [f.result() for f in concurrent.futures.as_completed(futures)]
        print(f"成功: {sum(results)}/{len(results)}")

方案三:ZooKeeper分布式锁(强一致性,适合金融级场景)

原理:利用ZooKeeper的临时顺序节点和Watcher机制,实现公平锁。
# zk_lock.py
from kazoo.client import KazooClient
from kazoo.exceptions import NodeExistsError, NoNodeError
import uuid
import time
from contextlib import contextmanager
# 封装好API供应商demo url=https://console.open.onebound.cn/console/?i=Lex
class ZooKeeperDistributedLock:
    """ZooKeeper分布式锁实现"""
    
    def __init__(self, hosts='localhost:2181'):
        self.zk = KazooClient(hosts=hosts)
        self.zk.start()
        
        # 锁的基础路径
        self.lock_base_path = "/locks"
        
        # 确保基础路径存在
        self.zk.ensure_path(self.lock_base_path)
    
    def create_lock_node(self, lock_path):
        """
        创建锁节点
        
        使用临时顺序节点,保证:
        1. 客户端断开连接时自动删除
        2. 节点有序,实现公平锁
        """
        request_id = str(uuid.uuid4())
        node_path = f"{lock_path}/{request_id}-"
        
        # 创建临时顺序节点
        actual_path = self.zk.create(
            node_path,
            value=b"locked",
            sequence=True,
            ephemeral=True
        )
        
        return actual_path
    
    def acquire_lock(self, lock_name, timeout=10):
        """
        获取分布式锁
        
        使用ZooKeeper的顺序节点实现公平锁
        """
        lock_path = f"{self.lock_base_path}/{lock_name}"
        self.zk.ensure_path(lock_path)
        
        # 创建自己的锁节点
        my_node = self.create_lock_node(lock_path)
        
        try:
            # 获取所有子节点
            children = self.zk.get_children(lock_path)
            
            # 排序找到最小的节点(最早的请求)
            sorted_children = sorted(children)
            my_node_name = my_node.split('/')[-1]
            
            # 如果自己是第一个,获取锁成功
            if my_node_name == sorted_children[0]:
                return True, my_node
            
            # 否则,监听前一个节点
            my_index = sorted_children.index(my_node_name)
            prev_node = f"{lock_path}/{sorted_children[my_index - 1]}"
            
            # 等待前一个节点释放
            event = self.zk.get(prev_node, watch=self._watch_func)
            
            # 等待事件触发或超时
            if event and event[0] is not None:
                # 前一个节点还存在,等待
                start_time = time.time()
                while time.time() - start_time < timeout:
                    if not self.zk.exists(prev_node):
                        return True, my_node
                    time.sleep(0.1)
                
                # 超时,释放自己的节点
                self.release_lock(my_node)
                return False, None
            
            return True, my_node
            
        except Exception as e:
            self.release_lock(my_node)
            raise e
    
    def _watch_func(self, event):
        """Watcher回调函数"""
        pass  # 实际使用时可以记录日志
    
    def release_lock(self, node_path):
        """释放锁"""
        try:
            self.zk.delete(node_path)
            return True
        except NoNodeError:
            return False
        except Exception as e:
            print(f"释放锁失败: {e}")
            return False
    
    @contextmanager
    def lock(self, lock_name, timeout=10):
        """
        上下文管理器方式使用锁
        
        Usage:
            with zk_lock.lock('product:1001'):
                # 执行业务逻辑
                pass
        """
        acquired, node_path = self.acquire_lock(lock_name, timeout)
        
        if not acquired:
            raise TimeoutError(f"获取锁超时: {lock_name}")
        
        try:
            yield
        finally:
            self.release_lock(node_path)
    
    def close(self):
        """关闭ZooKeeper连接"""
        self.zk.stop()
        self.zk.close()


class ZKStockDeduction:
    """基于ZooKeeper分布式锁的库存扣减"""
    
    def __init__(self, zk_hosts, db_connection):
        self.zk_lock = ZooKeeperDistributedLock(zk_hosts)
        self.db = db_connection
    
    def deduct_stock(self, product_id, quantity):
        """
        使用ZooKeeper锁扣减库存
        """
        lock_name = f"product_stock_{product_id}"
        
        try:
            with self.zk_lock.lock(lock_name):
                # 在锁的保护下执行扣减
                cursor = self.db.cursor()
                
                cursor.execute(
                    "SELECT stock FROM products WHERE id = %s FOR UPDATE",
                    (product_id,)
                )
                row = cursor.fetchone()
                
                if not row:
                    return False, "商品不存在"
                
                if row[0] < quantity:
                    return False, "库存不足"
                
                cursor.execute(
                    "UPDATE products SET stock = stock - %s WHERE id = %s",
                    (quantity, product_id)
                )
                self.db.commit()
                
                return True, "扣减成功"
                
        except TimeoutError:
            return False, "系统繁忙,请稍后重试"
        except Exception as e:
            self.db.rollback()
            return False, f"扣减失败: {str(e)}"


# 使用示例
if __name__ == "__main__":
    # 创建ZooKeeper锁管理器
    zk_lock = ZooKeeperDistributedLock('localhost:2181')
    
    try:
        # 测试锁的基本功能
        print("测试ZooKeeper分布式锁...")
        
        # 获取锁
        acquired, node_path = zk_lock.acquire_lock("test_lock")
        print(f"获取锁: {'✅' if acquired else '❌'}")
        
        if acquired:
            # 执行业务逻辑
            print("执行业务逻辑...")
            time.sleep(2)
            
            # 释放锁
            released = zk_lock.release_lock(node_path)
            print(f"释放锁: {'✅' if released else '❌'}")
        
    finally:
        zk_lock.close()

三、 三种方案对比

方案
一致性
性能
可靠性
复杂度
适用场景
数据库乐观锁
低并发,简单场景
Redis分布式锁
中等
中等
高并发,允许短暂不一致
ZooKeeper锁
中等
极高
金融级,强一致性要求

四、 生产级最佳实践

  1. 组合使用:Redis锁做高性能扣减 + 数据库兜底校验

  2. 库存预热:秒杀开始前将库存加载到Redis,减少数据库压力

  3. 异步扣减:先扣Redis库存,异步同步到数据库

  4. 兜底策略:即使分布式锁失效,数据库的乐观锁也能防止超卖

记住:没有完美的分布式锁方案,只有最适合你业务场景的方案。对于电商秒杀场景,Redis分布式锁 + 数据库乐观锁兜底是最常见的生产级方案。


群贤毕至

访客