分布式事务视频教程(分布式事务+分布式锁)java教程 / Java Web应用中的事务管理与分布式锁...

wufei123 发布于 2024-06-07 阅读(13)

目标1:分布式锁Redisson讲解目标2:分布式锁控制超卖目标3:Seata分布式事务讲解目标4:普通商品抢单分布式事务目标5:WebSocket讲解1 分布式锁1.1 问题分析上面抢单过程实现了,但其实还是有问题,会发生超卖问题,如下图:

在多线程执行的情况下,上面的抢单流程会发生超卖问题,比如只剩下1个商品,多线程同时判断是否有库存的时候,会同时判断有库存,最终导致1个商品多个订单的问题发生1.2 redisson分布式锁1.2.1 分布式锁介绍。

​ 解决上面超卖问题,我们可以采用分布式锁来控制,分布式锁的原理很简单​ 分布式锁主要是实现在分布式场景下保证数据的最终一致性在单进程的系统中,存在多个线程可以同时改变某个变量(可变共享变量)时,就需要对变量或代码块做同步(lock—synchronized),使其在修改这种变量时能够线性执行消除并发修改变量。

但分布式系统是多部署、多进程的,开发语言提供的并发处理API在此场景下就无能为力了目前市面上分布式锁常见的实现方式有三种:1.基于数据库实现分布式锁;2.基于缓存(Redis等)实现分布式锁;3.基于Zookeeper实现分布式锁;

1.2.2 Redisson介绍​ 大部分网站使用的分布式锁是基于缓存的,有更好的性能,而缓存一般是以集群方式部署,保证了高可用性而Redis分布式锁官方推荐使用redissonRedisson原理图如下:。

Redisson所说明: 1、redission获取锁释放锁的使用和JDK里面的lock很相似,底层的实现采用了类似lock的处理方式 2、redisson 依赖redis,因此使用redisson 锁需要服务端安装redis,而且redisson 支持单机和集群两种模式下的锁的实现

3、redisson 在多线程或者说是分布式环境下实现机制,其实是通过设置key的方式进行实现,也就是说多个线程为了抢占同一个锁,其实就是争抢设置key Redisson原理:1)加锁: if (redis.call(。

exists, KEYS[1]) == 0) then redis.call(hset, KEYS[1], ARGV[2], 1); redis.call(pexpire

, KEYS[1], ARGV[1]); return nil; end; if (redis.call(hexists, KEYS[1], ARGV[2]) ==

1) then redis.call(hincrby, KEYS[1], ARGV[2], 1); redis.call(pexpire, KEYS[1], ARGV[1

]); return nil; end; return redis.call(pttl, KEYS[1]); 将业务封装在lua中发给redis,保障业务执行的原子性第1个if表示执行加锁,会先判断要加锁的key是否存在,不存在就加锁。

当第1个if执行,key存在的时候,会执行第2个if,第2个if会获取第1个if对应的key剩余的有效时间,然后会进入while循环,不停的尝试加锁2)释放锁:if (redis.call(exists。

, KEYS[1]) == 0) then redis.call(publish, KEYS[2], ARGV[1]); return1; end; if

(redis.call(hexists, KEYS[1], ARGV[3]) == 0) thenreturnnil; end; local counter = redis.call(hincrby

, KEYS[1], ARGV[3], -1); if (counter > 0) then redis.call(pexpire, KEYS[1], ARGV[2]); return

0; else redis.call(del, KEYS[1]); redis.call(publish, KEYS[2], ARGV[1]); return1; end

; returnnil; 执行lock.unlock(),每次都对myLock数据结构中的那个加锁次数减1如果发现加锁次数是0了,说明这个客户端已经不再持有锁了,此时就会用:“del myLock”命令,从redis里删除这个key,另外的客户端2就可以尝试完成加锁了。

3)缺点:Redisson存在一个问题,就是如果你对某个redis master实例,写入了myLock这种锁key的value,此时会异步复制给对应的master slave实例但是这个过程中一旦发生redis master宕机,主备切换,redis slave变为了redis master。

接着就会导致,客户端2来尝试加锁的时候,在新的redis master上完成了加锁,而客户端1也以为自己成功加了锁此时就会导致多个客户端对一个分布式锁完成了加锁这时系统在业务上一定会出现问题,导致脏数据的产生。

1.2.3 Redisson配置1)引入依赖org.redissonredisson-spring-boot-starter

3.11.02)锁操作方法实现要想用到分布式锁,我们就必须要实现获取锁和释放锁,获取锁和释放锁可以编写一个DistributedLocker接口,代码如下:

publicinterfaceDistributedLocker{ /*** * lock(), 拿不到lock就不罢休,不然线程就一直block * @param lockKey *

@return */RLock lock(String lockKey); /*** * timeout为加锁时间,单位为秒 * @param lockKey *

@param timeout * @return */RLock lock(String lockKey, long timeout); /*** * timeout为加锁时间,时间单位由unit确定 *

@param lockKey * @param unit * @param timeout * @return */RLock lock(String lockKey, TimeUnit unit,

long timeout); /*** * tryLock(),马上返回,拿到lock就返回true,不然返回false * 带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false. * 。

@param lockKey * @param unit * @param waitTime * @param leaseTime * @return */

booleantryLock(String lockKey, TimeUnit unit, long waitTime, long leaseTime); /*** * 解锁 *

@param lockKey */voidunlock(String lockKey); /*** * 解锁 * @param lock */voidunlock

(RLock lock); } 实现上面接口中对应的锁管理方法,编写一个锁管理类RedissonDistributedLocker,代码如下:@ComponentpublicclassRedissonDistributedLocker

implementsDistributedLocker{ @Autowiredprivate RedissonClient redissonClient; /*** * lock(), 拿不到lock就不罢休,不然线程就一直block *

@param lockKey * @return */@Overridepublic RLock lock(String lockKey){ RLock lock = redissonClient.getLock(lockKey); lock.lock();

return lock; } /*** * timeout为加锁时间,单位为秒 * @param lockKey * @param timeout *

@return */@Overridepublic RLock lock(String lockKey, long timeout){ RLock lock = redissonClient.getLock(lockKey); lock.lock(timeout, TimeUnit.SECONDS);

return lock; } /*** * timeout为加锁时间,时间单位由unit确定 * @param lockKey * @param unit *

@param timeout * @return */@Overridepublic RLock lock(String lockKey, TimeUnit unit, long timeout)

{ RLock lock = redissonClient.getLock(lockKey); lock.lock(timeout, unit); return

lock; } /*** * tryLock(),马上返回,拿到lock就返回true,不然返回false * 带时间限制的tryLock(),拿不到lock,就等一段时间,超时返回false. * 。

@param lockKey * @param unit * @param waitTime * @param leaseTime * @return */

@OverridepublicbooleantryLock(String lockKey, TimeUnit unit, long waitTime, long leaseTime){ RLock lock = redissonClient.getLock(lockKey);

try { return lock.tryLock(waitTime, leaseTime, unit); } catch (InterruptedException e) { e.printStackTrace(); }

returnfalse; } /*** * 解锁 * @param lockKey */@Overridepublicvoidunlock(String lockKey)

{ RLock lock = redissonClient.getLock(lockKey); lock.unlock(); } /*** * 解锁 *

@param lock */@Overridepublicvoidunlock(RLock lock){ lock.unlock(); } } 3)配置Redis链接在resources下新建文件redisson.yml,主要用于配置redis集群节点链接配置,代码如下:

clusterServersConfig:# 连接空闲超时,单位:毫秒 默认10000idleConnectionTimeout:10000pingTimeout:1000# 同任何节点建立连接时的等待超时。

时间单位是毫秒 默认10000connectTimeout:10000# 等待节点回复命令的时间该时间从命令发送成功时开始计时默认3000timeout:3000# 命令失败重试次数retryAttempts:。

3# 命令重试发送时间间隔,单位:毫秒retryInterval:1500# 重新连接时间间隔,单位:毫秒reconnectionTimeout:3000# 执行失败最大次数failedAttempts:

3# 密码#password: test1234# 单个连接最大订阅数量subscriptionsPerConnection:5clientName:null# loadBalancer 负载均衡算法类的选择

loadBalancer:!{}#从节点发布和订阅连接的最小空闲连接数slaveSubscriptionConnectionMinimumIdleSize:

1#从节点发布和订阅连接池大小 默认值50slaveSubscriptionConnectionPoolSize:50# 从节点最小空闲连接数 默认值32slaveConnectionMinimumIdleSize:

32# 从节点连接池大小 默认64slaveConnectionPoolSize:64# 主节点最小空闲连接数 默认32masterConnectionMinimumIdleSize:32# 主节点连接池大小 默认64

masterConnectionPoolSize:64# 订阅操作的负载均衡模式subscriptionMode:SLAVE# 只在从服务器读取readMode:SLAVE# 集群地址nodeAddresses:

-"redis://192.168.211.137:7001"-"redis://192.168.211.137:7002"-"redis://192.168.211.137:7003"-"redis://192.168.211.137:7004"

-"redis://192.168.211.137:7005"-"redis://192.168.211.137:7006"# 对Redis集群节点状态扫描的时间间隔单位是毫秒默认1000scanInterval:。

1000#这个线程池数量被所有RTopic对象监听器,RRemoteService调用者和RExecutorService任务共同共享默认2threads:0#这个线程池数量是在一个Redisson实例内,被其创建的所有分布式数据类型和服务,以及底层客户端所一同共享的线程池里保存的线程数量。

默认2nettyThreads:0# 编码方式 默认org.redisson.codec.JsonJacksonCodeccodec:!

{}#传输模式transportMode:NIO# 分布式锁自动过期时间,防止死锁,默认30000lockWatchdogTimeout:30000# 通过该参数来修改是否按订阅发布消息的接收顺序出来消息,如果选否将对消息实行并行处理,该参数只适用于订阅发布消息的情况, 默认true

keepPubSubOrder:true# 用来指定高性能引擎的行为由于该变量值的选用与使用场景息息相关(NORMAL除外)我们建议对每个参数值都进行尝试##该参数仅限于Redisson PRO版本#performanceMode: HIGHER_THROUGHPUT

4)创建Redisson管理对象​ Redisson管理对象有2个,分别为RedissonClient和RedissonConnectionFactory,我们只用在项目的RedisConfig中配置一下这2个对象即可,在RedisConfig中添加的代码如下:

/**** * Redisson客户端 * @return * @throws IOException */@Beanpublic RedissonClient redisson()throws

IOException { ClassPathResource resource = new ClassPathResource("redssion.yml"); Config config = Config.fromYAML(resource.getInputStream()); RedissonClient redisson = Redisson.create(config);

return redisson; } /*** * Redisson工厂对象 * @param redisson * @return */@Beanpublic RedissonConnectionFactory

redissonConnectionFactory(RedissonClient redisson){ returnnew RedissonConnectionFactory(redisson); }

5)测试代码测试Redisson分布式锁的代码如下:

测试结果如下:

1.3 Redisson分布式锁控制超卖​ 我们把上面秒杀下单会出现超卖的部分代码用Redisson分布式锁来控制一下,代码如下:/*** * 秒杀下单 * @param orderMap */@Override

publicvoid addHotOrder(Map orderMap) { String id = orderMap.get("id"); String

username = orderMap.get("username"); //keyString key = "SKU_" + id; //分布式锁的keyString lockkey =

"LOCKSKU_" + id; //用户购买的keyString userKey = "USER" + username + "ID" + id; //尝试获取锁,等待10秒,自己获得锁后一直不解锁则10秒后自动解锁

boolean bo = distributedLocker.tryLock(lockkey, TimeUnit.SECONDS, 10L, 10L); if(bo){ if (redisTemplate.hasKey(key)) {

//数量 Integer num = Integer.parseInt(redisTemplate.boundHashOps(key).get("num").toString());

//拥有库存,执行递减操作if (num > 0) { //查询商品 Result result = skuFeign.findById(id); Sku sku = result.getData(); Order order =

new Order(); order.setCreateTime(newDate()); order.setUpdateTime(order.getCreateTime()); order.setUsername(username); order.setSkuId(id); order.setName(sku.getName()); order.setPrice(sku.getSeckillPrice()); order.setId(

"No" + idWorker.nextId()); order.setOrderStatus("0"); order.setPayStatus(

"0"); order.setConsignStatus("0"); orderMapper.insertSelective(order);

//库存递减 num--; if (num == 0) { //同步数据到数据库,秒杀数量归零 skuFeign.zero(id); }

//更新数据 Map dataMap = new HashMap(); dataMap.put(

"num", num); dataMap.put(userKey, 0); //存数据 redisTemplate.boundHashOps(key).putAll(dataMap); }

//记录该商品用户24小时内无法再次购买,测试环境,我们只配置成1分钟 redisTemplate.boundValueOps(userKey).set(""); redisTemplate.expire(userKey,

1, TimeUnit.MINUTES); } //解锁 distributedLocker.unlock(lockkey); } }

发表评论:

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。

河南中青旅行社综合资讯 奇遇综合资讯 盛世蓟州综合资讯 综合资讯 游戏百科综合资讯 新闻58493