Redis 事务
Redis事务是一个单独的隔离操作:
事务中的所有命令都会序列化,按顺序地执行;
事务在执行的过程中,不会被其他客户端发过来的命令所打断
Redis 事务的主要作用就是串联多个命令,防止别的命令插队
Redis事务命令:multi、exec、discard
从输入multi
命令开始,输入的命令都会依次进入命令队列中,但不会执行,直到输入exec
后,Redis会将之前的命令队列中的命令依次执行。
组队的过程中可以通过discard来放弃组队。
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k1 v1
QUEUED
127.0.0.1:6379(TX)> set k2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) OK
127.0.0.1:6379>
127.0.0.1:6379> keys *
1) "k2"
2) "k1"
127.0.0.1:6379>
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set k3 v3
QUEUED
127.0.0.1:6379(TX)> set k4 v4
QUEUED
127.0.0.1:6379(TX)> discard
OK
127.0.0.1:6379> keys *
1) "k2"
2) "k1"
127.0.0.1:6379>
事务失败处理:组队阶段
组队中某个命令出现了报告错误,执行时整个的队列都会被取消。
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set b1 v1
QUEUED
127.0.0.1:6379(TX)> set b2 v2
QUEUED
127.0.0.1:6379(TX)> set b3
(error) ERR wrong number of arguments for 'set' command
127.0.0.1:6379(TX)>
127.0.0.1:6379(TX)> exec
(error) EXECABORT Transaction discarded because of previous errors.
127.0.0.1:6379>
127.0.0.1:6379> keys *
(empty array)
127.0.0.1:6379>
事务失败处理:执行阶段
执行阶段某个命令报出了错误,只有报错的命令不会被执行,而其他的命令都会执行。
127.0.0.1:6379> multi
OK
127.0.0.1:6379(TX)> set c1 v1
QUEUED
127.0.0.1:6379(TX)> incr c1
QUEUED
127.0.0.1:6379(TX)> set c2 v2
QUEUED
127.0.0.1:6379(TX)> exec
1) OK
2) (error) ERR value is not an integer or out of range
3) OK
127.0.0.1:6379>
127.0.0.1:6379> keys *
1) "c2"
2) "c1"
127.0.0.1:6379>
Redis 事务三特性
单独的隔离操作
事务中的所有命令都会序列化,按顺序地执行。事务在执行的过程中,不会被其他客户端发送来的命令请求所打断。
没有隔离级别的概念
队列中的命令没有提交之前都不会实际被执行
不保证原子性
事务中如果有一条命令执行失败,其后的命令依然会被执行
事务冲突的问题
场景:你的 老婆和子女 同时拿你的淘宝去参加双十一抢购。老婆买了8k的包,儿子买了5k的电脑,女儿买了1k的化妆品。
假设,你的账户只有10k。此时,我们的事务对三次请求都进行判断,发现三个请求都小于10k,此时执行的话,显然是不正确的。如下:
怎么办呢?这里,我们就要引出我们的乐观锁和悲观锁了。
通过悲观锁解决事务冲突
悲观锁
每次取数据的时候都认为别人会修改这个数据,所以每次取数据我们都将该数据上锁,如此,当别人也想取这个数据的时候,就会被阻塞,直到第一个人操作完成,解锁了,第二个人才能拿到锁,并操作这个数据。
关键:做操作之前先上锁
他是怎么解决上面的问题的呢?
答:首先,第一个请求拿到10k的数据,并把10k上锁,第二、三个请求就没法访问那个数据了(被阻塞)。然后第一个请求判断8k < 10k,可以买,此时剩下2k,解开锁,第二个请求拿到锁了,锁上,把数据取出来,一看,诶!剩下2k了,糟糕,5k买不了呀,执行失败。解开锁。第三个请求拿到锁,取出数据,哈哈,还有2k,买个1k的东西绰绰有余,你的支付宝就剩下1k了。
通过乐观锁解决事务冲突
乐观锁
每次取数据都认为别人不会修改,所以不上锁,但是在更新数据的时候会判断一下在此期间别人有没有已更新了这个数据,可以使用版本号是否相等机制,适用于多读的应用类型。
关键点:先监听key,获得更新前数据库版本号VX,更新数据后插入数据库前,判断此时数据版本是否依然为VX,如果为VX,则插入数据库并且提高数据版本为VXX,否则不执行更新操作
他是怎么解决上面的问题的呢?
答:首先,第一个请求和第二个请求拿到10k的数据的时候,该数据的版本号是1.0。此时:
我们第一个请求先判断成功,减去8k后,发现数据库中该数据的版本号与自己一致,将数据2k写入数据库,并将版本号更改为1.1。
第二个请求也判断成功了,减去5k后,发现数据库的版本号跟自己不一致,减5k的操作不执行。再次获取新的数据,发现此时2k 小于 5k,操作失败。
第三个请求获取数据为2k和版本号为1.1,减去1k后,检查到数据库中该数据的版本号和自己一样,将数据1k写入数据库,并将版本号改为1.2。
通过Redis实现秒杀案例
基础版本v1.0
关键代码(django view)
@csrf_exempt
def seckill_view(request):
# 获取秒杀商品ID
product_id = request.POST.get("product_id")
if not product_id:
return HttpResponse("商品ID错误!")
# 1 为了演示目的,生成一个随机的uid,
uid = "user_" + str(random.randint(1, 100))
# 2 连接redis
r = redis.Redis(host="xxx.xxx.xxx.xxx", port=6379, db=0)
# 3 拼接key: 库存key,已秒杀用户key
product_key = f"sk:product:{product_id}"
user_key = f"sk:users"
# 4 获取库存,如果库存为null,秒杀未开始
if not r.exists(product_key):
r.close()
return HttpResponse("秒杀未开始,请耐心等待!")
# 5 判断用户是否重复秒杀
if r.sismember(user_key, uid):
r.close()
return HttpResponse("请勿重复参与秒杀!")
# 6 如果库存数量<1,商品已被抢光
if int(r.get(product_key)) < 1:
r.close()
return HttpResponse("很遗憾,商品已被抢光!")
# 7 秒杀过程
# 7.1 库存-1
r.decr(product_key)
# 7.2 记录秒杀成功用户
r.sadd(user_key, uid)
print("[sklog]恭喜您:[%s],抢购成功"%uid,flush=True)
return HttpResponse("恭喜您:[%s],抢购成功"%uid)
ab并发模拟测试
安装:yum install httpd-tools
ab 模拟并发命令
- 将post参数放到文件postfile中,以
&
结尾,内容如下:
product_id=1010&
- ab 命令
ab -n 1000 -c 100 -p postfile -T 'application/x-www-form-urlencoded' http://127.0.0.1:8001/sk
-n: 请求数
-c: 并发数
-p: 指定post参数文件
-T: 指定content_type
存在问题
- 超卖,库存出现了负数
- 用户重复参与了秒杀,48个用户秒了53个商品
乐观锁与悲观锁版本V2.0
关键代码(django view)
@csrf_exempt
def seckill_view(request):
# 获取秒杀商品ID
product_id = request.POST.get("product_id")
if not product_id:
return HttpResponse("商品ID错误!")
# 1 为了演示目的,生成一个随机的uid,为了验证一个用户只能参与一次秒杀,需要重复的uid进行并发
uid = "user_" + str(random.randint(1, 100))
# 2 连接redis
# r = RedisConn(host="112.74.110.215", port=6379, db=0).connect
r = redis.Redis(host="112.74.110.215", port=6379, db=0)
# 3 拼接key: 库存key,已秒杀用户key
product_key = f"sk:product:{product_id}"
user_key = "sk:users"
# 记录参与用户
r.sadd("sk:join",uid)
# 4 获取库存,如果库存为null,秒杀未开始
if not r.exists(product_key):
r.close()
return HttpResponse("秒杀未开始,请耐心等待!")
# 5 判断用户是否重复秒杀
if r.sismember(user_key, uid):
r.close()
return HttpResponse("请勿重复参与秒杀!")
# 6 如果库存数量<1,商品已被抢光
if int(r.get(product_key)) < 1:
r.close()
return HttpResponse("很遗憾,商品已被抢光!")
# 7 秒杀过程
if r.setnx(f"lock:{uid}", 1):
r.expire(f"lock:{uid}", 30)
# 创建管道
pipe = r.pipeline()
# 监听库存
pipe.watch(product_key)
# 开启事务
pipe.multi()
# 7.1 库存-1
pipe.decr(product_key)
# 7.2 记录秒杀成功用户
pipe.sadd(user_key, uid)
# 提交事务
pipe.execute()
print("[sklog]恭喜您:[%s],抢购成功" % uid, flush=True)
return HttpResponse("恭喜您:[%s],抢购成功" % uid)
return HttpResponse("谢谢参与!")
ab并发模拟测试
ab -n 1000 -c 100 -p postfile -T 'application/x-www-form-urlencoded' http://127.0.0.1:8001/sk
- 通过乐观锁,解决了超卖问题
- 通过悲观锁,先锁定用户再进行秒杀活动,避免用户重复参与秒杀
存在问题
- 库存遗留问题:该例子中,有100个不同的用户参与秒杀100个商品,但是只有86个用户秒杀成功,14个用户秒杀不到,库存遗留了14个商品
乐观锁库存遗留问题优化版本V3.0
关键代码(django view)
@csrf_exempt
def seckill_view(request):
# 获取秒杀商品ID
product_id = request.POST.get("product_id")
if not product_id:
return HttpResponse("商品ID错误!")
# 1 为了演示目的,生成一个随机的uid,为了验证一个用户只能参与一次秒杀,需要重复的uid进行并发
uid = "user_" + str(random.randint(1, 100))
# 2 连接redis
# r = RedisConn(host="112.74.110.215", port=6379, db=0).connect
r = redis.Redis(host="112.74.110.215", port=6379, db=0)
# 3 拼接key: 库存key,已秒杀用户key
product_key = f"sk:product:{product_id}"
user_key = "sk:users"
# 记录参与用户
r.sadd("sk:join", uid)
# 4 获取库存,如果库存为null,秒杀未开始
if not r.exists(product_key):
r.close()
return HttpResponse("秒杀未开始,请耐心等待!")
# 5 判断用户是否重复秒杀
if r.sismember(user_key, uid):
r.close()
return HttpResponse("请勿重复参与秒杀!")
# 6 如果库存数量<1,商品已被抢光
if int(r.get(product_key)) < 1:
r.close()
return HttpResponse("很遗憾,商品已被抢光!")
# 7 秒杀过程
if r.setnx(f"lock:{uid}", 1):
r.expire(f"lock:{uid}", 30)
# 创建管道
pipe = r.pipeline()
# 在用户首次参与的情况下,循环该用户的秒杀动作,直到库存以空才退出
while True:
# 监听库存
pipe.watch(product_key)
if int(r.get(product_key)) > 0:
try:
# 开启事务
pipe.multi()
# 7.1 库存-1
pipe.decr(product_key)
# 7.2 记录秒杀成功用户
pipe.sadd(user_key, uid)
# 提交事务
pipe.execute()
print("[sklog]恭喜您:[%s],抢购成功" % uid, flush=True)
return HttpResponse("恭喜您:[%s],抢购成功" % uid)
except WatchError as ex:
# 在用户秒杀过程中,监听到库存发生了变化,重新进入循环重新秒杀
pipe.unwatch()
else:
return HttpResponse("很遗憾,商品已被抢光!")
return HttpResponse("谢谢参与!")
ab并发模拟测试
ab -n 1000 -c 100 -p postfile -T 'application/x-www-form-urlencoded' http://127.0.0.1:8001/sk
转载请注明来源,欢迎对文章中的引用来源进行考证,欢迎指出任何有错误或不够清晰的表达。