数据源

数据上锁
悲观锁和乐观锁
悲观锁:高并发场景下,需要先验证数据是否上锁才会执行相关的sql语句或其他io事务。
乐观锁:高并发场景下,先执行sql语句或数据库io事务,再根据指定字段判断是否进行下一个行为操作。
悲观锁:
共享锁(mysql)
排他锁(mysql)
意向锁(mysql)
分布式锁(redis或其他数据源)
共享锁
共享锁实现案例:
# 查询订单时加共享锁(读锁)
@app.route('/order/<int:order_id>', methods=['GET'])
def get_order(order_id):
# 使用SELECT ... FOR SHARE语句(MySQL 8.0+)或SELECT ... LOCK IN SHARE MODE(旧版)
order = db.session.execute(
text("SELECT * FROM orders WHERE id = :id FOR SHARE"),
{'id': order_id}
).fetchone()
# 处理订单数据...特点:
允许多个事务同时读取同一订单数据
阻止其他事务获取排他锁(即阻止修改)
适用于读取频繁但修改少的场景
排他锁
实现案例:
# 修改订单时加排他锁(写锁)
@app.route('/order/<int:order_id>', methods=['PUT'])
def update_order(order_id):
# 开始事务
with db.session.begin():
# 先获取排他锁
order = db.session.execute(
text("SELECT * FROM orders WHERE id = :id FOR UPDATE"),
{'id': order_id}
).fetchone()
# 修改订单逻辑...
db.session.execute(
text("UPDATE orders SET status = :status WHERE id = :id"),
{'status': 'processed', 'id': order_id}
)特点:
独占锁定,阻止其他事务读取或修改(取决于隔离级别)
适用于订单状态变更等写操作
事务结束后自动释放
意向锁
意向锁是MySQL内部自动管理的表级锁,开发者通常不直接操作,但需要理解其作用:
意向共享锁(IS):表示事务打算在表中的某些行上设置共享锁
意向排他锁(IX):表示事务打算在表中的某些行上设置排他锁
作用:
提高锁冲突检测效率
避免逐行检查锁状态
redis锁实现
Redis通常用于实现分布式锁,解决跨服务/跨进程的并发问题
# 获取排他锁(写锁)
def acquire_exclusive_lock(lock_key, owner_id, timeout=10):
r = current_app.redis
# 使用SET命令实现原子性获取锁
return r.set(lock_key, owner_id, nx=True, ex=timeout)
# 释放锁
def release_lock(lock_key, owner_id):
r = current_app.redis
# 使用Lua脚本确保只有锁的持有者能释放
script = """
if redis.call("get", KEYS[1]) == ARGV[1] then
return redis.call("del", KEYS[1])
else
return 0
end
"""
return r.eval(script, 1, lock_key, owner_id)redis分布式锁实现案例:
from flask import Flask, jsonify, request
import redis
import time
app = Flask(__name__)
redis_conn = redis.Redis(host='localhost', port=6379, db=0, password='xxx.')
@app.route('/order_create', methods=['POST'])
def order_create():
order_id = None
goods_id = request.json['goods_id']
lock_name = str(goods_id) + '_lock'
lock = redis_conn.lock(lock_name, timeout=30)
if lock.acquire(): # lock.acquire() 获取锁,如果已经占锁了,会等待直到释放锁
time.sleep(10)
order_id = int(time.time())
lock.release()
if order_id:
lock.release()
return jsonify({'order_id': order_id}), 200
else:
return jsonify({'error': 'Failed to generate order. Please try again later.'}), 503
if __name__ == '__main__':
app.run(host='0.0.0.0', port=7999, debug=True)
实现区别总结
锁机制选型
读多写少:对订单查询使用共享锁,保证读取一致性
写操作:对订单修改使用排他锁,防止并发修改
分布式环境:使用Redis锁协调跨服务操作
死锁预防:设置合理的锁超时时间,按固定顺序获取锁
锁粒度:尽量使用行级锁而非表级锁,提高并发性
数据上锁测试方法
对于核心业务场景或核心数据,如果存在高并发场景,需要评估是否需要实现数据上锁,如果需要,测试这边需要保障数据上锁策略。
需要验证的内容包括:
数据上锁机制是否生效
测试用例:
对同一事务存在高并发的处理时,其他并发事务是否命中锁生效的逻辑(阻塞等待 or 拒绝处理)。
对同一事务连续处理2次,验证第一个事务处理完成后是否释放锁,第二个事务处理时不会遇到阻塞等待 or 拒绝处理。
测试方法:模拟高并发场景的方法,基于python准备2个线程,2个线程都是一样的事务,不过都有记录现程开始时间和结束时间,如果这2个事务处理时间差不多 说明数据没上锁。
如果数据上锁是基于redis实现的分布式锁,有可能还会时间锁超时机制,如果锁超时时间是10秒钟
测试用例:
第一请求模拟超时占锁(找开发配合),模拟占锁8秒左右,进行第二个事务请求,是否命中占锁后的效果。
第一请求模拟超时占锁(找开发配合),模拟占锁11秒左右,进行第二个事务请求,事务是否处理成功。
模拟死锁场景,是否有自动释放策略(找开发配合,比方说让开发把数据生成上锁后的操作代码增加5分钟时间,在数据上锁2分钟后对同一数据再进行查询操作,验证该事务是否可以快速获取结果)
有时也需要评估数据上锁的合理性,比方说是非核心操作但却是每个用户都会涉及的功能,存在频繁操作,数据上锁有可能会影响用户体验。
幂等性测试
同一事务或同一请求进行重复操作,验证结果合理性
同一请求多次提交(例如用户快速双击提交按钮、网络超时重试)
同一事务的重试(例如分布式事务因超时回滚后自动重试)
消息队列的重复消费(例如消费者消费后未及时ACK,消息被重新投递)