分布式锁
1、分布式锁
这里是在我的一个分布式项目中演示的,我们只关注分布式锁相关的代码即可。
1.1 本地锁的局限性
我们在Java中学习过了synchronized及lock锁,这些锁都是本地锁,我们通过一个案例演示本地锁的问题。
我们通过并发操作对一个redis中的值进行自增操作。
1.1.1 测试代码
在service-product中的TestController中添加测试方法
@RestController
@RequestMapping("/admin/product/test")
public class TestController {
@Autowired
private TestService testService;
@GetMapping("/testLock")
public Result testLock(){
testService.testLock();
return Result.ok();
}
}
接口:
public interface TestService {
void testLock();
}
实现类:
/**
* 本地锁演示
*/
@Override
public synchronized void testLock() {
//从redis中查询num数据
String value = stringRedisTemplate.opsForValue().get("num");
//判断是否为空
if(StringUtils.isEmpty(value)){
return;
}
//对num数据+1处理
int num = Integer.parseInt(value);
//存储到redis
stringRedisTemplate.opsForValue().set("num",String.valueOf(num+1));
}
先通过redis工具设置num=0
1.1.2 使用ab工具测试(单节点)
使用ab测试工具:httpd-tools(yum install -y httpd-tools)
这里我在linux上使用下面这个命令有问题,我把这个工具装在windows上了,当然用Jmeter压测工具也可以的。
测试如下:5000请求,100并发
abs.exe -n 5000 -c 100 http://localhost:8206/admin/product/test/testLock
初始为0,执行上述命令之后,
从目前的执行结果来看似乎没有问题,可是这只是单个节点的测试,如果碰到微服务集群结果是怎样的呢?
1.1.3 本地锁问题演示(集群情况)
这里将service-product服务建立三个节点,端口号分别为8206,8216,8226
先将redis中的num置零,然后再次执行压测命令,此时需要修改一下,由于是测试集群,这里我们走网关,让网关去转发。
压测命令如下:我的网关是80端口,所以这里不带端口号就是默认走网关了。
abs.exe -n 5000 -c 100 http://localhost/admin/product/test/testLock
查看Redis中的值,此时num并不是5000
集群情况下又出问题了!!!
以上测试,可以发现:本地锁只能锁住同一工程内的资源,在分布式系统里面都存在局限性。
此时需要分布式锁。
1.2 分布式锁实现的解决方案
随着业务发展的需要,原单体单机部署的系统被演化成分布式集群系统后,由于分布式系统多线程、多进程并且分布在不同机器上,这将使原单机部署情况下的并发控制锁策略失效,单纯的Java API并不能提供分布式锁的能力。为了解决这个问题就需要一种跨JVM的互斥机制来控制共享资源的访问,这就是分布式锁要解决的问题!
分布式锁主流的实现方案:
-
基于数据库实现分布式锁
-
基于缓存(Redis等)
-
基于Zookeeper
每一种分布式锁解决方案都有各自的优缺点:
-
性能:redis最高
-
可靠性:zookeeper最高
这里,我们就基于redis实现分布式锁。
1.3 使用Redis实现分布式锁(了解即可)
1.3.1 编写代码
@Override
public void testLock() {
//0、生成UUID
String uuid = UUID.randomUUID().toString().replaceAll("-", "");
//1、从redis中获取锁,setnx
// Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", "111");
Boolean lock = stringRedisTemplate.opsForValue().setIfAbsent("lock", uuid,7,TimeUnit.SECONDS);
//设置超时时间
// stringRedisTemplate.expire("lock",7, TimeUnit.SECONDS);
if(lock){
//从redis中查询num数据
String value = stringRedisTemplate.opsForValue().get("num");
//判断是否为空
if(StringUtils.isEmpty(value)){
return;
}
//对num数据+1处理
int num = Integer.parseInt(value);
//存储到redis
stringRedisTemplate.opsForValue().set("num",String.valueOf(num+1));
//定义lua脚本
String script="if redis.call(\"get\",KEYS[1]) == ARGV[1]\n" +
"then\n" +
" return redis.call(\"del\",KEYS[1])\n" +
"else\n" +
" return 0\n" +
"end";
//创建脚本对象
DefaultRedisScript<Long> defaultRedisScript=new DefaultRedisScript<>();
//设置脚本
defaultRedisScript.setScriptText(script);
//设置返回值类型
defaultRedisScript.setResultType(Long.class);
//执行删除
stringRedisTemplate.execute(defaultRedisScript, Arrays.asList("lock"),uuid);
//获取当前锁的value
//判断
// if(uuid.equals(stringRedisTemplate.opsForValue().get("lock"))){
// //释放锁
// stringRedisTemplate.delete("lock");
// }
}else{
//睡眠100
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
//重试调用
testLock();
}
}
这里我们做了如下优化:
1、设置锁的过期时间
2、设置UUID防误删
3、使用LUA脚本保证删除的原子性。
由于这个方式也是有问题的,我们只是演示一下
1.3.2 压测
先将num设置为0,然后执行压测命令
abs.exe -n 5000 -c 100 http://localhost/admin/product/test/testLock
看起来好像成功了,但是这个只是redis单节点的情况。
redis集群状态下的问题:
-
客户端A从master获取到锁
-
在master将锁同步到slave之前,master宕掉了。
-
slave节点被晋级为master节点
-
客户端B取得了同一个资源被客户端A已经获取到的另外一个锁。
安全失效!
解决方案:了解即可!
1.4 使用Redisson解决分布式锁
Github 地址:https://github.com/redisson/redisson
*Redisson是一个在Redis的基础上实现的Java驻内存数据网格*(In-Memory Data Grid)。它不仅提供了一系列的分布式的Java常用对象,还提供了许多分布式服务。其中包括(BitSet, Set, Multimap, SortedSet, Map, List, Queue, BlockingQueue, Deque, BlockingDeque, Semaphore, Lock, AtomicLong, CountDownLatch, Publish / Subscribe, Bloom filter, Remote service, Spring cache, Executor service, Live Object service, Scheduler service) Redisson提供了使用Redis的最简单和最便捷的方法。Redisson的宗旨是促进使用者对Redis的关注分离(Separation of Concern),从而让使用者能够将精力更集中地放在处理业务逻辑上。
官方文档地址:https://github.com/redisson/redisson/wiki
1.4.1 实现代码
导入依赖
<!-- redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson</artifactId>
<version>3.15.3</version>
</dependency>
1、配置Redisson
/**
* redisson配置信息
*/
@Data
@Configuration
@ConfigurationProperties("spring.redis")
public class RedissonConfig {
private String host;
private String addresses;
private String password;
private String port;
private int timeout = 3000;
private int connectionPoolSize = 64;
private int connectionMinimumIdleSize=10;
private int pingConnectionInterval = 60000;
private static String ADDRESS_PREFIX = "redis://";
/**
* 自动装配
*
*/
@Bean
RedissonClient redissonSingle() {
Config config = new Config();
if(StringUtils.isEmpty(host)){
throw new RuntimeException("host is empty");
}
SingleServerConfig serverConfig = config.useSingleServer()
//redis://127.0.0.1:7181
.setAddress(ADDRESS_PREFIX + this.host + ":" + port)
.setTimeout(this.timeout)
.setPingConnectionInterval(pingConnectionInterval)
.setConnectionPoolSize(this.connectionPoolSize)
.setConnectionMinimumIdleSize(this.connectionMinimumIdleSize)
;
if(!StringUtils.isEmpty(this.password)) {
serverConfig.setPassword(this.password);
}
// RedissonClient redisson = Redisson.create(config);
return Redisson.create(config);
}
}
2、修改实现类
/**
* Redisson使用步骤:
* 1、将RedissonClient对象注入到使用的类中
* 2、获取锁
* Lock lock=redisionClient.getLock();
* lock.lock();
* 3、释放锁
* lock.unlock();
* 注意:在使用Redisson时,可以不用自己实现
*/
@Override
public void testLock() {
//模拟sku查询
//定义key sku:1314:info
String lockSku="sku:"+1314+":info";
//获取锁
RLock lock = redissonClient.getLock(lockSku);
//加锁
// lock.lock();
//加锁时,设置超时时间
// lock.lock(7,TimeUnit.SECONDS);
//加锁时,设置超时时间和等待时间
try {
boolean flag = lock.tryLock(100, 10, TimeUnit.SECONDS);
//判断
if (flag) {
//从redis中查询num数据
String value = stringRedisTemplate.opsForValue().get("num");
//判断是否为空
if(StringUtils.isEmpty(value)){
return;
}
//对num数据+1处理
int num = Integer.parseInt(value);
//存储到redis
stringRedisTemplate.opsForValue().set("num",String.valueOf(num+1));
}else{
Thread.sleep(50);
testLock();
}
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
//释放锁
lock.unlock();
}
}
锁的获取方式有很多种,这里我在注释中也写了常用的,只是演示一下效果
1.4.1 压测
abs.exe -n 5000 -c 100 http://localhost/admin/product/test/testLock
查看redis中的值
此时测试是没有问题的。
1.4.2 可重入锁(Reentrant Lock)
基于Redis的Redisson分布式可重入锁RLock Java对象实现了java.util.concurrent.locks.Lock接口。
大家都知道,如果负责储存这个分布式锁的Redisson节点宕机以后,而且这个锁正好处于锁住的状态时,这个锁会出现锁死的状态。为了避免这种情况的发生,Redisson内部提供了一个监控锁的看门狗,它的作用是在Redisson实例被关闭前,不断的延长锁的有效期。默认情况下,看门狗的检查锁的超时时间是30秒钟,也可以通过修改Config.lockWatchdogTimeout来另行指定。
另外Redisson还通过加锁的方法提供了leaseTime的参数来指定加锁的时间。超过这个时间后锁便自动解开了。
快速入门使用的就是可重入锁。也是最常使用的锁。
最常见的使用:
RLock lock = redisson.getLock("anyLock");
// 最常使用
lock.lock();
// 加锁以后10秒钟自动解锁
// 无需调用unlock方法手动解锁
lock.lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = lock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
try {
...
} finally {
lock.unlock();
}
}
1.4.3 读写锁(ReadWriteLock)
基于Redis的Redisson分布式可重入读写锁RReadWriteLock Java对象实现了java.util.concurrent.locks.ReadWriteLock接口。其中读锁和写锁都继承了RLock接口。
分布式可重入读写锁允许同时有多个读锁和一个写锁处于加锁状态。
RReadWriteLock rwlock = redisson.getReadWriteLock("anyRWLock");
// 最常见的使用方法
rwlock.readLock().lock();
// 或
rwlock.writeLock().lock();
// 10秒钟以后自动解锁
// 无需调用unlock方法手动解锁
rwlock.readLock().lock(10, TimeUnit.SECONDS);
// 或
rwlock.writeLock().lock(10, TimeUnit.SECONDS);
// 尝试加锁,最多等待100秒,上锁以后10秒自动解锁
boolean res = rwlock.readLock().tryLock(100, 10, TimeUnit.SECONDS);
// 或
boolean res = rwlock.writeLock().tryLock(100, 10, TimeUnit.SECONDS);
...
lock.unlock();
代码实现
@GetMapping("/read")
public Result read(){
String msg=testService.readLock();
return Result.ok(msg);
}
@GetMapping("/write")
public Result write(){
String msg=testService.writeLock();
return Result.ok(msg);
}
接口
//读锁测试
String readLock();
//写锁测试
String writeLock();
实现类
//读锁测试
@Override
public String readLock() {
//初始化获取锁
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
//获取读锁
RLock rLock = readWriteLock.readLock();
//加锁
rLock.lock(10,TimeUnit.SECONDS);
//读取数据
String msg = stringRedisTemplate.opsForValue().get("msg");
//释放锁
//rLock.unlock();
return msg;
}
//写锁测试
@Override
public String writeLock() {
//初始化获取锁
RReadWriteLock readWriteLock = redissonClient.getReadWriteLock("readWriteLock");
//获取写锁
RLock rLock = readWriteLock.writeLock();
//加锁
// rLock.lock();
rLock.lock(10,TimeUnit.SECONDS);
// boolean flag = rLock.tryLock(100, 10, TimeUnit.SECONDS);
//写入到redis
stringRedisTemplate.opsForValue().set("msg",String.valueOf(System.currentTimeMillis()));
//释放锁
//rLock.unlock();
return "写入了一条数据到redis";
}
这里我将读锁和写锁的释放锁的代码注释掉,目的是为了测试。
打开两个浏览器窗口测试:
http://localhost:8206/admin/product/test/read
http://localhost:8206/admin/product/test/write
- 同时访问写:一个写完之后,等待一会儿(约10s),另一个写开始
- 同时访问读:不用等待
- 先写后读:读要等待(约10s)写完成
- 先读后写:写要等待(约10s)读完成
分布式锁入门大概写到这里,用确实是跟着文档来,但是要谈理解还差的太远。
- 0
- 0
-
分享