幸运蛋蛋pc开奖
这篇文章主要介绍了Redis实现分布式锁和等待序列的方法示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧

在集群下,经常会因为同时处理发生资源争抢和并发问题,但是我们都知道同步锁 synchronized 、 cas 、 ReentrankLock 这些锁的作用范围都是 JVM ,说白了在集群下没啥用。这时我们就需要能在多台 JVM 之间决定执行顺序的锁了,现在分布式锁主要有 redis 、 Zookeeper 实现的,还有数据库的方式,不过性能太差,也就是需要一个第三方的

背景

最近在做一个消费 Kafka 消息的时候发现,由于线上的消费者过多,经常会遇到,多个机器同时处理一个主键类型的数据的情况发生,如果最后是执行更新操作的话,也就是一个更新顺序的问题,但是如果恰好都需要插入数据的时候,会出现主键重复的问题。这是生产上不被允许的(因为公司有异常的机制,扣分啥的),这是就需要个分布式锁了,斟酌后用了 Redis 的实现方式(因为网上例子多)

分析

redis 实现的分布式锁,实现原理是 set 方法,因为多个线程同时请求的时候,只有一个线程可以成功并返回结果,还可以设置?#34892;?#26399;,来避免死锁的发生,一切都是这么的完美,不过有个问题,在 set 的时候,会直接返回结果,成功或者失败,不具有阻塞效果,需要我们自己?#20801;?#36133;的线程进程处理,有两种方式

    丢弃 等待重试 由于我们的系统需要这些数据,那么只能重新尝试获取。这里使用 redis 的 List 类型实现等待序列的作用

代码

直接上代码 其实直接redis的工具类就可以解决了

package com.test
import redis.clients.jedis.Jedis;

import java.util.Collections;
import java.util.List;

/**
 * @desc redis队列实现方式
 * @anthor 
 * @date 
 **/
public class RedisUcUitl {

  private static final String LOCK_SUCCESS = "OK";
  private static final String SET_IF_NOT_EXIST = "NX";
  private static final String SET_WITH_EXPIRE_TIME = "PX";

  private static final Long RELEASE_SUCCESS = 1L;

  private RedisUcUitl() {

  }
  /**
   * logger
   **/

  /**
   * 存储redis队列顺序存储 在队列首?#30475;?#20837;
   *
   * @param key  字节类型
   * @param value 字节类型
   */
  public static Long lpush(Jedis jedis, final byte[] key, final byte[] value) {

    return jedis.lpush(key, value);
  
  }

  /**
   * 移除列表中最后一个元素 并将?#33041;?#32032;添加入另一个列表中 ,当列表为空时 将阻塞连接 直到等待超时
   *
   * @param srckey
   * @param dstkey
   * @param timeout 0 表示永不超时
   * @return
   */
  public static byte[] brpoplpush(Jedis jedis,final byte[] srckey, final byte[] dstkey, final int timeout) {

    return jedis.brpoplpush(srckey, dstkey, timeout);

  }

  /**
   * 返回制定的key,起始位置的redis数据
   * @param redisKey
   * @param start
   * @param end -1 表示到最后
   * @return
   */
  public static List<byte[]> lrange(Jedis jedis,final byte[] redisKey, final long start, final long end) {
    
    return jedis.lrange(redisKey, start, end);
  }

  /**
   * 删除key
   * @param redisKey
   */
  public static void delete(Jedis jedis, final byte[] redisKey) {
    
     return jedis.del(redisKey);
  }

  /**
   * 尝试加锁
   * @param lockKey key名称
   * @param requestId 身份标识
   * @param expireTime 过期时间
   * @return
   */
  public static boolean tryGetDistributedLock(Jedis jedis,final String lockKey, final String requestId, final int expireTime) {
    String result = jedis.set(lockKey, requestId, SET_IF_NOT_EXIST, SET_WITH_EXPIRE_TIME, expireTime);
    return LOCK_SUCCESS.equals(result);

  }

  /**
   * 释放锁
   * @param lockKey key名称
   * @param requestId 身份标识
   * @return
   */
  public static boolean releaseDistributedLock(Jedis jedis,final String lockKey, final String requestId) {
    final String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end";
    jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId));

    return RELEASE_SUCCESS.equals(result);

  }
}

业务逻辑主要代码如下

1.先消耗队列中的

while(true){
  // 消?#35759;?#21015;
  try{
    // 被放入redis队列的数据 序列化后的
    byte[] bytes = RedisUcUitl.brpoplpush(keyStr.getBytes(UTF_8), dstKeyStr.getBytes(UTF_8), 1);
    if(bytes == null || bytes.isEmpty()){
      // 队列中没数据时?#39034;?      break;
    }
    // 反序列化对象
    Map<String, Object> singleMap = (Map<String, Object>) ObjectSerialUtil.bytesToObject(bytes);
    // 唯一的值 防止被其他线程误解锁
    String requestId = UUID.randomUUID().toString();
    boolean lockGetFlag = RedisUcUitl.tryGetDistributedLock(keyStr,requestId, 100);
    if(lockGetFlag){
      // 成功获取锁 进行业务处理
      //TODO
      // 处理完毕释放锁 
      boolean freeLock = RedisUcUitl.releaseDistributedLock(keyStr, requestId);

    }else{
      // 未能获得锁放入等待队列
     RedisUcUitl.lpush(keyStr.getBytes(UTF_8), ObjectSerialUtil.objectToBytes(param));
  
    }
    
  }catch(Exception e){
    break;
  }
  
}

2.处理最新接到的数据

同样是走尝试获取锁,获取不到放入队列的流程

一般序列化用 fastJson 之列的就可以了,这里用的是 JDK ?#28304;?#30340;,工具类如下

public class ObjectSerialUtil {

  private ObjectSerialUtil() {
//    工具类
  }

  /**
   * 将Object对象序列化为byte[]
   *
   * @param obj 对象
   * @return byte数组
   * @throws Exception
   */
  public static byte[] objectToBytes(Object obj) throws IOException {
    ByteArrayOutputStream bos = new ByteArrayOutputStream();
    ObjectOutputStream oos = new ObjectOutputStream(bos);
    oos.writeObject(obj);
    byte[] bytes = bos.toByteArray();
    bos.close();
    oos.close();
    return bytes;
  }


  /**
   * 将bytes数组还原为对象
   *
   * @param bytes
   * @return
   * @throws Exception
   */
  public static Object bytesToObject(byte[] bytes) {
    try {
      ByteArrayInputStream bin = new ByteArrayInputStream(bytes);
      ObjectInputStream ois = new ObjectInputStream(bin);
      return ois.readObject();
    } catch (Exception e) {
      throw new BaseException("反序列化出错!", e);
    }
  }
}

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持爱安网。

最新资讯
特斯拉计划在?#20998;?号超级工厂实现75万辆EV量产目标

特斯拉计划在?#20998;?号

据外媒报道,特斯拉近期已经为他们在?#20998;?#35759;号了汽车和电
马斯克:太阳能、能源存储业务将超越电动车业务

马斯克:太阳能、能源存

特斯拉首席执行官埃隆·马斯克最近表示,公司的太阳能和
分析师?#28009;还?#26126;年5G iPhone销量将令投资者失望

分析师?#28009;还?#26126;年5G iPh

著名?#36824;?#20998;析师、Loup Ventures执行合伙人吉恩·蒙斯
预付费最后的疯狂

预付费最后的疯狂

在韦博英语、浩沙健身等一系列知名机构频?#24403;?#38647;后,预付
比尔·盖茨2019年度书单出炉,《美国式婚姻》入选

比尔·盖茨2019年度书

比尔·盖茨近日公布了他2019年最?#19981;?#30340;五本书。这些书
人民日报批视频网站VIP额外付费?#22909;?#35270;用户权益

人民日报批视频网站VI

VIP之外设置VVIP,额外掏钱才能享受超前点播,视频网站是
最新文章
详解redis desktop manager安装及连接方式

详解redis desktop ma

这篇文章主要介绍了redis desktop manager安装及连接
Redis集群下过期key监听的实现代码

Redis集群下过期key监

这篇文章主要介绍了Redis集群下过期key监听的实现代码
Redis集群增加节点与删除节点的方法详解

Redis集群增加节点与

这篇文章主要给大家介绍了关于Redis集群增加节点与删
Linux 下redis5.0.0安装教程详解

Linux 下redis5.0.0安

这篇文章主要介绍了Linux 下redis5.0.0安装教程,本文图
Redis 实现“附近的人”功能

Redis 实现“附近的人

Redis基于geohash和有序集合提供了地理位置相关功能。
Redis和Lua使用过程中遇到的小问题

Redis和Lua使用过程中

这篇文章主要给大家介绍了关于Redis和Lua使用过程中遇
幸运蛋蛋pc开奖 豪客彩首页 哪个平台有辽宁十一选五开奖 街机电玩捕鱼达人 快乐12黄金五码组合 棋牌移动版世界 吉林时时彩预测 2014最新网络捕鱼游戏 青海快3今日开奖结果走势图 通化大嘴棋牌 官网 湖南幸运赛车攻略