一、Semaphore概述

Semaphore 通常叫它信号量, 可以用来控制同时访问特定资源的线程数量,也可以用来保持资源生产者和消费者之间的资源限制关系,通过协调各个线程,以保证合理的使用资源。Semaphore 具有公平和非公平两种模式,本文通过源码对 Semaphore 的实现做简单分析。

二、使用示例

使用 Semaphore 控制最多有 2 个线程同时运行。

public class SemaphoreTest {
    private static AtomicInteger integer = new AtomicInteger(0);

    public static void main(String[] args) {
        Semaphore semaphore = new Semaphore(2);
        for (int i = 0; i < 6; i++) {
            new Thread(() -> {
                try {
                    semaphore.acquire();
                    System.out.println(Thread.currentThread().getName() + ":"
                                       + integer.incrementAndGet());
                    Thread.sleep((long) (Math.random() * 1000));
                    integer.decrementAndGet();
                    semaphore.release();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }, "Thread-" + i).start();
        }
    }
}

输出结果:

Thread-1:1
Thread-2:2
Thread-5:2
Thread-0:2
Thread-4:2
Thread-3:2

三、执行原理

Semaphore 内部继承了 AbstractQueuedSynchronizer 类, 使用 state 来表示可用资源数,通过 AbstractQueuedSynchronizer 获取共享锁流程进行资源控制。

Semaphore 每获取一个资源 state 就会扣除,应该算是一个独占的场景,不是很理解为什么用共享锁流程,而不是用独占锁的流程。

3.1 Sync

Lockstate 表示锁的持有计数,成功获取锁后 state 增大,在 Semaphore 中成功获取资源之后,可用资源数减少,state 变小。

可以看到,tryReleaseShared 方法并没有判断当前是否是持有锁线程的相关逻辑,所以是允许在任意情况下执行来增加可用资源数的。

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;
    Sync(int permits) {
        // 初始化锁状态码,作为资源数
        setState(permits);
    }
    final int getPermits() {
        // 取得可用资源数
        return getState();
    }
    // 非公平的方式占用acquires个资源,资源数-1
    final int nonfairTryAcquireShared(int acquires) {
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            // 如果占用后资源数小于0,表示资源数已用尽,获取锁失败
            if (remaining < 0 ||
                // CAS修改state
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
    // 释放releases个资源
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            // 资源数相加
            int next = current + releases;
            // 资源数溢出
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // CAS修改state
            if (compareAndSetState(current, next))
                return true;
        }
    }
    // 减少reductions个资源
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            //资源数相减
            int next = current - reductions;
            // 资源数溢出
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // CAS修改state
            if (compareAndSetState(current, next))
                return;
        }
    }
    // 返回可用资源数,并将可用资源数设置为0
    final int drainPermits() {
        for (;;) {
            int current = getState();
            // 可用资源数设置为0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

3.2 NonfairSync

非公平的实现方式

static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;
    NonfairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

3.3 FairSync

公平的实现方式,相比于非公平的实现方式,公平的实现方式在获取资源前会先执行 hasQueuedPredecessors 判断当前线程是否是队列中第一个等待的节点。

static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;
    FairSync(int permits) {
        super(permits);
    }
    protected int tryAcquireShared(int acquires) {
        for (;;) {
            // 判断当前是否是队列首个等待的节点
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            // 资源数相减
            int remaining = available - acquires;
            if (remaining < 0 ||
                // CAS修改资源数
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

关于 AbstractQueuedSynchronizer 中共享锁和独占锁流程的实现,见锁系列文章。