Semaphore到底该如何使用
事情的起因是最近在看redisson的源码,刚好看到了RedissonSemaphore的acquire/release实现。
public RFuture<Void> releaseAsync(int permits) {if (permits < 0) {throw new IllegalArgumentException("Permits amount can't be negative");}if (permits == 0) {return new CompletableFutureWrapper<>((Void) null);}RFuture<Void> future = commandExecutor.syncedEval(getRawName(), StringCodec.INSTANCE, RedisCommands.EVAL_VOID,"local value = redis.call('incrby', KEYS[1], ARGV[1]); " +"redis.call('publish', KEYS[2], value); ",Arrays.asList(getRawName(), getChannelName()), permits);if (log.isDebugEnabled()) {future.thenAccept(o -> {log.debug("released, permits: {}, name: {}", permits, getName());});}return future;}
这段代码并没有去校验key是否存在,以及设置的semaphore的最大值是多少,直接进行了incr操作。
如果其他客户端存在重复release()的行为,就会导致凭证超发的情况发生。
因此回过头去看了一下jdk的semaphore的实现。
Semaphore semaphore = new Semaphore(1);semaphore.acquire();semaphore.release();semaphore.release();semaphore.release();semaphore.release();boolean b1 = semaphore.tryAcquire();boolean b2 = semaphore.tryAcquire();boolean b3 = semaphore.tryAcquire();System.out.println(b1 + ":" + b2 + ":" + b3);
这段代码的执行结果是:true:true:true
JDK的semaphore的release()核心代码如下:
protected final boolean tryReleaseShared(int releases) {for (;;) {int current = getState();int next = current + releases;if (next < current) // overflowthrow new Error("Maximum permit count exceeded");if (compareAndSetState(current, next))return true;}}
这里只是在release的时候,将当前current和next的值做比对,防止并发情况下修改值异常问题,并不会去校验new Semaphore()时传入的初始化值。其中getState()返回的就是初始化时的state。
protected final int getState() {return state;}
在tryReleaseShared()方法的compareAndSetState(current, next)中,会修改这个state的值。
所以在JDK的semaphore实现中,new Semaphore(int permits)传入的值,只是一个初始化的值,如果在实际使用的时候,进行了重复release()释放,就会导致多余的凭证超发的问题。
在了解了JDK的semaphore的实际实现后,就不难理解redisson的RedissonSemaphore实现了,是完全与JDK保持一致的。
在JDK的semaphore使用时,需要保证多线程进行凭证释放的时候,不会重复release(),防止凭证超发,而使用RedissonSemaphore的时候,一般是分布式的场景,所以常见的凭证超发的情况可能就在于重复消费,所以需要调用方自己保证调用幂等,避免凭证超发。