使用阻塞队列
当我们多个线程下 对 一个队列进行操作,队列满了的情况下,其他线程再次 offer,会一直阻塞等待
对一个队列进行出队操作的时候,队列空的情况下,会一直阻塞等待删除,直到队列有元素的时候,会执行删除操作
一直阻塞等待的时候用自旋锁来进行等待
代码如下
阻塞队列接口
java">public interface BlockingQueue <E>{void offer(E e) throws InterruptedException;Boolean offer(E e,long timeout) throws InterruptedException;E poll() throws InterruptedException;
}
阻塞队列实现
java">
public class ArrayBlockingQueue<E> implements BlockingQueue<E> {private final E[] array;private int head;//记录出队时候的头指针private int tail;//记录入队的指针private int size;//记录数组 个数private ReentrantLock lock = new ReentrantLock();private Condition headWait = lock.newCondition();//控制 入队的 同步队列private Condition tailWait = lock.newCondition();//控制出队的同步队列public ArrayBlockingQueue(int capacity) {array = (E[]) new Object[capacity];}private Boolean isFull() {return size == array.length;}private Boolean isEmpty() {return size == 0;}@Overridepublic String toString() {return "ArrayBlockingQueue{" +"array=" + Arrays.toString(array) +'}';}/*** @param e* @throws InterruptedException 在判断是否满的时候 用while循环而不是 if 为了防止虚假唤醒*/@Overridepublic void offer(E e) throws InterruptedException {lock.lockInterruptibly();try {while (isFull()) {tailWait.await();//如果满了 就让线程加入 同步队列// 一直等待被唤醒 使用while防止 多线程下的虚假唤醒}array[tail] = e;if (++tail == array.length) {tail = 0;}size++;//唤醒 出队的线程是为了 防止 在一个空队列中,删除元素的线程会一直线程等待,我们唤醒该线程//提示他可以删除headWait.signal();} finally {lock.unlock();}}/*** @param e* @param timeout* @return {@code Boolean }* @throws InterruptedException 根据 自己传入的时间 设置 入队最多等待的时间 如果* 入队超时 那么返回false 并且该线程不会再等待唤醒* 跟单独使用awaitNanos方法是不一样的* 单独使用 .awaitNanos 方法 当设置的等待时间到期,线程自动启动来竞争锁* ,竞争不到 会到等待队列中去等待被唤醒*/@Overridepublic Boolean offer(E e, long timeout) throws InterruptedException {lock.lockInterruptibly();//把传入的毫秒 转换成纳秒long nanos = TimeUnit.MILLISECONDS.toNanos(timeout);try {while (isFull()) {if (nanos < 0) {//发现等待的时间 <0 表明超过了自己等待的时间 插入操作入队return false;}nanos = tailWait.awaitNanos(nanos);}array[tail] = e;if (++tail == array.length) {tail = 0;}size++;//唤醒 出队的线程是为了 防止 在一个空队列中,删除元素的线程会一直线程等待,我们唤醒该线程//提示他可以删除headWait.signal();return true;} finally {lock.unlock();}}@Overridepublic E poll() throws InterruptedException {lock.lockInterruptibly();try {while (isEmpty()) {headWait.await();}E e = array[head];array[head] = null;if (++head == array.length) {head = 0;}size--;tailWait.signal();//因为 我们 入队 满了的话线程等待// 删除成功之后 队列不满,唤醒入队线程return e;} finally {lock.unlock();}}
}
测试实现
java">public class ArrayBlockingQueueTest {public static void main(String[] args) throws InterruptedException {ArrayBlockingQueue<String> blockingQueue = new ArrayBlockingQueue<String>(3);new Thread(new Runnable() {@Overridepublic void run() {try {blockingQueue.offer("任务2");blockingQueue.offer("任务3");blockingQueue.offer("任务4");System.out.println(blockingQueue.offer("任务1", 1000));} catch (InterruptedException e) {throw new RuntimeException(e);}}},"线程1").start();
Thread.sleep(2000);System.out.println(blockingQueue.toString());}
注意事项
我们只需要注意 condition的awaitnanos 方法,我们等待的时间一到,该线程自动唤醒争抢锁