python 实现 redis 分布式锁

作于:2018-12-17 14:57 ,最后更新于:2025-08-17 01:14 ,预计阅读时间 3 分钟

Intro

本文概述如何借住 redis 实现一个分布式锁。

为何是 Lua

redis 保证了 lua 解释器执行脚本的事务性,即执行结果要么不可见,要么已完成。

参考这篇文档

简单锁

简单锁指的是简单互斥锁,一旦锁定,则其他锁定请求都必须等待。

直觉的想法是通过 redis 的键来保持锁,故准备一个用于锁定互斥的名字(比如说 mutex-1)然后指定为键。

直接 SET KEY VALUE 是显然不正确的,如果临界区内程序崩溃或意外断网将导致死锁。

只判断是否存在也不够,临界区内程序崩溃会导致锁无法被释放。最直接的兜底措施就是设置一个超时时间,保证业务能在超时时间内完成,如果崩溃也可以在一段时间后自动释放。

redis 的 SET 命令提供了 NXEX seconds | PX milliseconds 选项,可以实现只在不存在键的时候设置,并同时指定过期时间,有原子性保证。所以我们可以使用 SET 命令实现分布式锁。

因为有超时自动释放存在,所以还存在一个竞争:在解锁时 KEY 已经因超时释放,锁被其他人持有。这时候无条件删除就会导致意外释放他人占有的锁。我们还需要增加一个持有者的判断,决定是否释放。

import datetime
import logging
import threading
import time
import uuid
from contextlib import contextmanager

import redis

logging.basicConfig(
    level=logging.DEBUG,
    format="[%(asctime)s] %(levelname).1s %(name)s:%(message)s",
    datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("main")
client = redis.Redis()


@contextmanager
def lock(client: redis.Redis, key: str, expire: datetime.timedelta):
    wait = 0.1
    maximum = 5
    owner = str(uuid.uuid1())
    while True:
        resp = client.set(key, owner, ex=expire, nx=True)
        if resp:
            break

        logger.debug(f"unable to acquire lock, retrying in {wait} seconds...")
        time.sleep(wait)

        wait *= 2  # exponential backoff
        if wait > maximum:
            wait = maximum

    logger.debug("entering critical section")
    yield
    logger.debug("exiting critical section")

    script = """\
if redis.call("get", KEYS[1]) == ARGV[1] then
    return redis.call("del", KEYS[1])
else
    return 0
end
"""
    logger.debug("releasing lock")
    resp = client.eval(script, 1, key, owner)
    logger.debug("lock released")

# 验证

def run(client: redis.Redis):
    with lock(client, "run", datetime.timedelta(seconds=10)):
        logger.info(f"thread {threading.current_thread().ident} lock acquired")
        time.sleep(3)


threads = [
    threading.Thread(target=run, args=(client,)),
    threading.Thread(target=run, args=(client,)),
]

for thread in threads:
    thread.start()

for thread in threads:
    thread.join()

互斥锁不能提供事务性保证,如何在超时时保证一致性是个问题。

读写锁部分存在严重错误,已删除。

/python/ /redis/ /lua/ /分布式/