Java并发包提供的并发工具类 - 《java核心》笔记

  • 提供了各种丰富的同步结构,包括CountDownLatch、CyclicBarrier、Semaphore等,可以实现更加丰富的多线程操作,比如利用Semaphore作为资源控制器,限制同时进行工作的线程数量;
  • 各种线程安全的容器,比如ConcurrentHashMap,有序的ConcurrentSkipListMap,通过类似快照机制实现线程安全的动态数组CopyOnWriteArrayList;
  • 各种并发队列实现,比如BlockedQueue,ArrayBlockingQueue、SynchorousQueue,或针对特定场景的PriorityBlockingQueue等;
  • 强大的Executor,可以创建各种不同类型的线程池,调度任务运行等。

问题

一排人在等车的时候,调度员指挥等待坐车的队伍一次进来5个人上车,等这5个人坐车触发,再放进去下一批。

Semaphore

Semaphore就是个计数器,其基本逻辑基于acquire/release。

import java.util.concurrent.Semaphore;
public class AbnormalSemaphoreSample {
  public static void main(String[] args) throws InterruptedException {
      Semaphore semaphore = new Semaphore(0);
      for (int i = 0; i < 10; i++) {
          Thread t = new Thread(new MyWorker(semaphore));
          t.start();
      }
      System.out.println("Action...GO!");
      semaphore.release(5);
      System.out.println("Wait for permits off");
      while (semaphore.availablePermits()!=0) {
          Thread.sleep(100L);
      }
      System.out.println("Action...GO again!");
      semaphore.release(5);
  }
}
class MyWorker implements Runnable {
  private Semaphore semaphore;
  public MyWorker(Semaphore semaphore) {
      this.semaphore = semaphore;
  }
  @Override
  public void run() {
      try {
          semaphore.acquire();
          System.out.println("Executed!");
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }
}

CountDownLatch/CyclicBarrier

CountDownLatch和CyclicBarrier的区别:

  • CountDownLatch不可重置,CyclicBarrier可以重用;
  • CountDownLatch基本操作是countDown/await,调用await的线程阻塞等待countDown足够的次数。CountDownLatch操作的是事件
  • CyclicBarrier基本操作是await,当所有的parties都调用了await,才会继续进行任务,并自动进行重置。CyclicBarrier侧重点是线程,而不是调用事件,其典型应用场景是用来等待并发线程结束。

CountDownLatch实现

import java.util.concurrent.CountDownLatch;
public class LatchSample {
  public static void main(String[] args) throws InterruptedException {
      CountDownLatch latch = new CountDownLatch(6);
           for (int i = 0; i < 5; i++) {
                Thread t = new Thread(new FirstBatchWorker(latch));
                t.start();
      }
      for (int i = 0; i < 5; i++) {
              Thread t = new Thread(new SecondBatchWorker(latch));
              t.start();
      }
           // 注意这里也是演示目的的逻辑,并不是推荐的协调方式
      while ( latch.getCount() != 1 ){
              Thread.sleep(100L);
      }
      System.out.println("Wait for first batch finish");
      latch.countDown();
  }
}
class FirstBatchWorker implements Runnable {
  private CountDownLatch latch;
  public FirstBatchWorker(CountDownLatch latch) {
      this.latch = latch;
  }
  @Override
  public void run() {
          System.out.println("First batch executed!");
          latch.countDown();
  }
}
class SecondBatchWorker implements Runnable {
  private CountDownLatch latch;
  public SecondBatchWorker(CountDownLatch latch) {
      this.latch = latch;
  }
  @Override
  public void run() {
      try {
          latch.await();
          System.out.println("Second batch executed!");
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }
}

CyclicBarrier实现,使用到了barrierAction,当屏障被触发时,Java自动调度该动作。注意,CyclicBarrier会自动进行重置

import java.util.concurrent.CountDownLatch;
public class LatchSample {
  public static void main(String[] args) throws InterruptedException {
      CountDownLatch latch = new CountDownLatch(6);
           for (int i = 0; i < 5; i++) {
                Thread t = new Thread(new FirstBatchWorker(latch));
                t.start();
      }
      for (int i = 0; i < 5; i++) {
              Thread t = new Thread(new SecondBatchWorker(latch));
              t.start();
      }
           // 注意这里也是演示目的的逻辑,并不是推荐的协调方式
      while ( latch.getCount() != 1 ){
              Thread.sleep(100L);
      }
      System.out.println("Wait for first batch finish");
      latch.countDown();
  }
}
class FirstBatchWorker implements Runnable {
  private CountDownLatch latch;
  public FirstBatchWorker(CountDownLatch latch) {
      this.latch = latch;
  }
  @Override
  public void run() {
          System.out.println("First batch executed!");
          latch.countDown();
  }
}
class SecondBatchWorker implements Runnable {
  private CountDownLatch latch;
  public SecondBatchWorker(CountDownLatch latch) {
      this.latch = latch;
  }
  @Override
  public void run() {
      try {
          latch.await();
          System.out.println("Second batch executed!");
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }
}

CopyOnWrite

CopyOnWriteArraySet通过包装CopyOnWriteArrayList来实现。

CopyOnWrite对于任何修改操作,如add、set、remove,都会拷贝原数组,修改后替换原来的数组。

public boolean add(E e) {
  synchronized (lock) {
      Object[] elements = getArray();
      int len = elements.length;
           // 拷贝
      Object[] newElements = Arrays.copyOf(elements, len + 1);
      newElements[len] = e;
           // 替换
      setArray(newElements);
      return true;
            }
}
final void setArray(Object[] a) {
  array = a;
}
comments powered by Disqus