首页 > 编程 > Java > 正文

详解Java如何实现基于Redis的分布式锁

2019-11-26 13:55:16
字体:
来源:转载
供稿:网友

前言

单JVM内同步好办, 直接用JDK提供的锁就可以了,但是跨进程同步靠这个肯定是不可能的,这种情况下肯定要借助第三方,我这里实现用Redis,当然还有很多其他的实现方式。其实基于Redis实现的原理还算比较简单的,在看代码之前建议大家先去看看原理,看懂了之后看代码应该就容易理解了。

我这里不实现JDK的java.util.concurrent.locks.Lock接口,而是自定义一个,因为JDK的有个newCondition方法我这里暂时没实现。这个Lock提供了5个lock方法的变体,可以自行选择使用哪一个来获取锁,我的想法是最好用带超时返回的那几个方法,因为不这样的话,假如redis挂了,线程永远都在那死循环了(关于这里,应该还可以进一步优化,如果redis挂了,Jedis的操作肯定会抛异常之类的,可以定义个机制让redis挂了的时候通知使用这个lock的用户,或者说是线程)

package cc.lixiaohui.lock;import java.util.concurrent.TimeUnit;public interface Lock { /** * 阻塞性的获取锁, 不响应中断 */ void lock;  /** * 阻塞性的获取锁, 响应中断 *  * @throws InterruptedException */ void lockInterruptibly throws InterruptedException;  /** * 尝试获取锁, 获取不到立即返回, 不阻塞 */ boolean tryLock;  /** * 超时自动返回的阻塞性的获取锁, 不响应中断 *  * @param time * @param unit * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未���取到锁  *  */ boolean tryLock(long time, TimeUnit unit);  /** * 超时自动返回的阻塞性的获取锁, 响应中断 *  * @param time * @param unit * @return {@code true} 若成功获取到锁, {@code false} 若在指定时间内未获取到锁 * @throws InterruptedException 在尝试获取锁的当前线程被中断 */ boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException;  /** * 释放锁 */ void unlock; }

看其抽象实现:

package cc.lixiaohui.lock;import java.util.concurrent.TimeUnit;/** * 锁的骨架实现, 真正的获取锁的步骤由子类去实现. *  * @author lixiaohui * */public abstract class AbstractLock implements Lock { /** * <pre> * 这里需不需要保证可见性值得讨论, 因为是分布式的锁,  * 1.同一个jvm的多个线程使用不同的锁对象其实也是可以的, 这种情况下不需要保证可见性  * 2.同一个jvm的多个线程使用同一个锁对象, 那可见性就必须要保证了. * </pre> */ protected volatile boolean locked; /** * 当前jvm内持有该锁的线程(if have one) */ private Thread exclusiveOwnerThread; public void lock { try { lock(false, 0, null, false); } catch (InterruptedException e) { // TODO ignore } } public void lockInterruptibly throws InterruptedException { lock(false, 0, null, true); } public boolean tryLock(long time, TimeUnit unit) { try { return lock(true, time, unit, false); } catch (InterruptedException e) { // TODO ignore } return false; } public boolean tryLockInterruptibly(long time, TimeUnit unit) throws InterruptedException { return lock(true, time, unit, true); } public void unlock { // TODO 检查当前线程是否持有锁 if (Thread.currentThread != getExclusiveOwnerThread) { throw new IllegalMonitorStateException("current thread does not hold the lock"); }  unlock0; setExclusiveOwnerThread(null); } protected void setExclusiveOwnerThread(Thread thread) { exclusiveOwnerThread = thread; } protected final Thread getExclusiveOwnerThread { return exclusiveOwnerThread; } protected abstract void unlock0;  /** * 阻塞式获取锁的实现 *  * @param useTimeout  * @param time * @param unit * @param interrupt 是否响应中断 * @return * @throws InterruptedException */ protected abstract boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException;}

基于Redis的最终实现,关键的获取锁,释放锁的代码在这个类的lock方法和unlock0方法里,大家可以只看这两个方法然后完全自己写一个:

package cc.lixiaohui.lock;import java.util.concurrent.TimeUnit;import redis.clients.jedis.Jedis;/** * <pre> * 基于Redis的SETNX操作实现的分布式锁 *  * 获取锁时最好用lock(long time, TimeUnit unit), 以免网路问题而导致线程一直阻塞 *  * <a href="http://redis.io/commands/setnx">SETNC操作参考资料</a> * </pre> *  * @author lixiaohui * */public class RedisBasedDistributedLock extends AbstractLock {  private Jedis jedis;  // 锁的名字 protected String lockKey;  // 锁的有效时长(毫秒) protected long lockExpires;  public RedisBasedDistributedLock(Jedis jedis, String lockKey, long lockExpires) { this.jedis = jedis; this.lockKey = lockKey; this.lockExpires = lockExpires; } // 阻塞式获取锁的实现 protected boolean lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt) throws InterruptedException{ if (interrupt) { checkInterruption; }  long start = System.currentTimeMillis; long timeout = unit.toMillis(time); // if !useTimeout, then it's useless  while (useTimeout ? isTimeout(start, timeout) : true) { if (interrupt) { checkInterruption; }  long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//锁超时时间 String stringOfLockExpireTime = String.valueOf(lockExpireTime);  if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁 // TODO 成功获取到锁, 设置相关标识 locked = true; setExclusiveOwnerThread(Thread.currentThread); return true; }  String value = jedis.get(lockKey); if (value != null && isTimeExpired(value)) { // lock is expired // 假设多个线程(非单jvm)同时走到这里 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的) // 加入拿到的oldValue依然是expired的,那么就说明拿到锁了 if (oldValue != null && isTimeExpired(oldValue)) {  // TODO 成功获取到锁, 设置相关标识  locked = true;  setExclusiveOwnerThread(Thread.currentThread);  return true; } } else {  // TODO lock is not expired, enter next loop retrying } } return false; }  public boolean tryLock { long lockExpireTime = System.currentTimeMillis + lockExpires + 1;//锁超时时间 String stringOfLockExpireTime = String.valueOf(lockExpireTime);  if (jedis.setnx(lockKey, stringOfLockExpireTime) == 1) { // 获取到锁 // TODO 成功获取到锁, 设置相关标识 locked = true; setExclusiveOwnerThread(Thread.currentThread); return true; }  String value = jedis.get(lockKey); if (value != null && isTimeExpired(value)) { // lock is expired // 假设多个线程(非单jvm)同时走到这里 String oldValue = jedis.getSet(lockKey, stringOfLockExpireTime); // getset is atomic // 但是走到这里时每个线程拿到的oldValue肯定不可能一样(因为getset是原子性的) // 假如拿到的oldValue依然是expired的,那么就说明拿到锁了 if (oldValue != null && isTimeExpired(oldValue)) { // TODO 成功获取到锁, 设置相关标识 locked = true; setExclusiveOwnerThread(Thread.currentThread); return true; } } else {  // TODO lock is not expired, enter next loop retrying }  return false; }  /** * Queries if this lock is held by any thread. *  * @return {@code true} if any thread holds this lock and  *   {@code false} otherwise */ public boolean isLocked { if (locked) { return true; } else { String value = jedis.get(lockKey); // TODO 这里其实是有问题的, 想:当get方法返回value后, 假设这个value已经是过期的了, // 而就在这瞬间, 另一个节点set了value, 这时锁是被别的线程(节点持有), 而接下来的判断 // 是检测不出这种情况的.不过这个问题应该不会导致其它的问题出现, 因为这个方法的目的本来就 // 不是同步控制, 它只是一种锁状态的报告. return !isTimeExpired(value); } } @Override protected void unlock0 { // TODO 判断锁是否过期 String value = jedis.get(lockKey); if (!isTimeExpired(value)) { doUnlock; } } private void checkInterruption throws InterruptedException { if(Thread.currentThread.isInterrupted) { throw new InterruptedException; } }  private boolean isTimeExpired(String value) { return Long.parseLong(value) < System.currentTimeMillis; }  private boolean isTimeout(long start, long timeout) { return start + timeout > System.currentTimeMillis; }  private void doUnlock { jedis.del(lockKey); }}

如果将来还换一种实现方式(比如zookeeper之类的),到时直接继承AbstractLock并实现lock(boolean useTimeout, long time, TimeUnit unit, boolean interrupt)unlock0方法即可(所谓抽象嘛)

测试

模拟全局ID增长器,设计一个IDGenerator类,该类负责生成全局递增ID,其代码如下:

package cc.lixiaohui.lock;import java.math.BigInteger;import java.util.concurrent.TimeUnit;/** * 模拟ID生成  * @author lixiaohui * */public class IDGenerator { private static BigInteger id = BigInteger.valueOf(0); private final Lock lock; private static final BigInteger INCREMENT = BigInteger.valueOf(1); public IDGenerator(Lock lock) { this.lock = lock; }  public String getAndIncrement { if (lock.tryLock(3, TimeUnit.SECONDS)) { try { // TODO 这里获取到锁, 访问临界区资源 return getAndIncrement0; } finally { lock.unlock; } } return null; //return getAndIncrement0; } private String getAndIncrement0 { String s = id.toString; id = id.add(INCREMENT); return s; }}

测试主逻辑:同一个JVM内开两个线程死循环地(循环之间无间隔,有的话测试就没意义了)获取ID(我这里并不是死循环而是跑20s),获取到ID存到同一个Set里面,在存之前先检查该IDset中是否存在,如果已存在,则让两个线程都停止。如果程序能正常跑完20s,那么说明这个分布式锁还算可以满足要求,如此测试的效果应该和不同JVM(也就是真正的分布式环境中)测试的效果是一样的,下面是测试类的代码:

package cc.lixiaohui.DistributedLock.DistributedLock;import java.util.HashSet;import java.util.Set;import org.junit.Test;import redis.clients.jedis.Jedis;import cc.lixiaohui.lock.IDGenerator;import cc.lixiaohui.lock.Lock;import cc.lixiaohui.lock.RedisBasedDistributedLock;public class IDGeneratorTest {  private static Set<String> generatedIds = new HashSet<String>;  private static final String LOCK_KEY = "lock.lock"; private static final long LOCK_EXPIRE = 5 * 1000;  @Test public void test throws InterruptedException { Jedis jedis1 = new Jedis("localhost", 6379); Lock lock1 = new RedisBasedDistributedLock(jedis1, LOCK_KEY, LOCK_EXPIRE); IDGenerator g1 = new IDGenerator(lock1); IDConsumeMission consume1 = new IDConsumeMission(g1, "consume1");  Jedis jedis2 = new Jedis("localhost", 6379); Lock lock2 = new RedisBasedDistributedLock(jedis2, LOCK_KEY, LOCK_EXPIRE); IDGenerator g2 = new IDGenerator(lock2); IDConsumeMission consume2 = new IDConsumeMission(g2, "consume2");  Thread t1 = new Thread(consume1); Thread t2 = new Thread(consume2); t1.start; t2.start;  Thread.sleep(20 * 1000); //让两个线程跑20秒  IDConsumeMission.stop;  t1.join; t2.join; }  static String time { return String.valueOf(System.currentTimeMillis / 1000); }  static class IDConsumeMission implements Runnable { private IDGenerator idGenerator;  private String name;  private static volatile boolean stop;  public IDConsumeMission(IDGenerator idGenerator, String name) { this.idGenerator = idGenerator; this.name = name; }  public static void stop { stop = true; }  public void run { System.out.println(time + ": consume " + name + " start "); while (!stop) { String id = idGenerator.getAndIncrement; if(generatedIds.contains(id)) {  System.out.println(time + ": duplicate id generated, id = " + id);  stop = true;  continue; }   generatedIds.add(id); System.out.println(time + ": consume " + name + " add id = " + id); } System.out.println(time + ": consume " + name + " done "); }  } }

说明一点,我这里停止两个线程的方式并不是很好,我是为了方便才这么做的,因为只是测试,最好不要这么做。

测试结果

跑20s打印的东西太多,前面打印的被clear了,只有差不多跑完的时候才有,下面截图。说明了这个锁能正常工作:

IDGererator没有加锁(即IDGereratorgetAndIncrement方法内部获取id时不上锁)时,测试是不通过的,非常大的概率中途就会停止,下面是不加锁时的测试结果:

这个1秒都不到:

这个也1秒都不到:

结束语

好了,以上就是Java实现基于Redis的分布式锁的全部内容,各位如果发现问题希望能指正,希望这篇文章能对大家的学习和工作带来一定的帮助,如果有疑问可以留言交流。

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表