Intro
本文概述如何借住 redis 实现一个分布式锁。
为何是 Lua
redis 保证了 lua 解释器执行脚本的事务性,即执行结果要么不可见,要么已完成。
参考这篇文档。
简单锁
简单锁指的是简单互斥锁,一旦锁定,则其他锁定请求都必须等待。
直觉的想法是通过 redis 的键来保持锁,故准备一个用于锁定互斥的名字(比如说 mutex-1)然后指定为键。
直接 SET KEY VALUE
是显然不正确的,如果临界区内程序崩溃或意外断网将导致死锁。
只判断是否存在也不够,临界区内程序崩溃会导致锁无法被释放。最直接的兜底措施就是设置一个超时时间,保证业务能在超时时间内完成,如果崩溃也可以在一段时间后自动释放。
redis 的 SET 命令提供了 NX
和 EX seconds | PX milliseconds
选项,可以实现只在不存在键的时候设置,并同时指定过期时间,有原子性保证。所以我们可以使用 SET
命令实现分布式锁。
因为有超时自动释放存在,所以还存在一个竞争:在解锁时 KEY 已经因超时释放,锁被其他人持有。这时候无条件删除就会导致意外释放他人占有的锁。我们还需要增加一个持有者的判断,决定是否释放。
import datetime
import logging
import threading
import time
import uuid
from contextlib import contextmanager
import redis
logging.basicConfig(
level=logging.DEBUG,
format="[%(asctime)s] %(levelname).1s %(name)s:%(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
logger = logging.getLogger("main")
client = redis.Redis()
@contextmanager
def lock(client: redis.Redis, key: str, expire: datetime.timedelta):
wait = 0.1
maximum = 5
owner = str(uuid.uuid1())
while True:
resp = client.set(key, owner, ex=expire, nx=True)
if resp:
break
logger.debug(f"unable to acquire lock, retrying in {wait} seconds...")
time.sleep(wait)
wait *= 2 # exponential backoff
if wait > maximum:
wait = maximum
logger.debug("entering critical section")
yield
logger.debug("exiting critical section")
script = """\
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
logger.debug("releasing lock")
resp = client.eval(script, 1, key, owner)
logger.debug("lock released")
# 验证
def run(client: redis.Redis):
with lock(client, "run", datetime.timedelta(seconds=10)):
logger.info(f"thread {threading.current_thread().ident} lock acquired")
time.sleep(3)
threads = [
threading.Thread(target=run, args=(client,)),
threading.Thread(target=run, args=(client,)),
]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
互斥锁不能提供事务性保证,如何在超时时保证一致性是个问题。
读写锁部分存在严重错误,已删除。