JUC供工具包
# JUC供工具包
# 1. coutDownLatch
CountDownLatch就是JUC包下的一个工具,整个工具最核心的功能就是计数器。
如果有三个业务需要并行处理,并且需要知道三个业务全部都处理完毕了。
需要一个并发安全的计数器来操作。
CountDownLatch就可以实现。
给CountDownLatch设置一个数值。可以设置3。
每个业务处理完毕之后,执行一次countDown方法,指定的3每次在执行countDown方法时,对3进行-1。
主线程可以在业务处理时,执行await,主线程会阻塞等待任务处理完毕。
当设置的3基于countDown方法减为0之后,主线程就会被唤醒,继续处理后续业务。
# 1.1 应用
模拟有三个任务需要并行处理,在三个任务全部处理完毕后,再执行后续操作
CountDownLatch中,执行countDown方法,代表一个任务结束,对计数器 - 1
执行await方法,代表等待计数器变为0时,再继续执行
执行await(time,unit)方法,代表等待time时长,如果计数器不为0,返回false,如果在等待期间,计数器为0,方法就返回true
一般CountDownLatch更多的是基于业务去构建,不采用成员变量。
static ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.newFixedThreadPool(3);
static CountDownLatch countDownLatch = new CountDownLatch(3);
public static void main(String[] args) throws InterruptedException {
System.out.println("主业务开始执行");
sleep(1000);
executor.execute(CompanyTest::a);
executor.execute(CompanyTest::b);
executor.execute(CompanyTest::c);
System.out.println("三个任务并行执行,主业务线程等待");
// 死等任务结束
// countDownLatch.await();
// 如果在规定时间内,任务没有结束,返回false
if (countDownLatch.await(10, TimeUnit.SECONDS)) {
System.out.println("三个任务处理完毕,主业务线程继续执行");
}else{
System.out.println("三个任务没有全部处理完毕,执行其他的操作");
}
}
private static void a() {
System.out.println("A任务开始");
sleep(1000);
System.out.println("A任务结束");
countDownLatch.countDown();
}
private static void b() {
System.out.println("B任务开始");
sleep(1500);
System.out.println("B任务结束");
countDownLatch.countDown();
}
private static void c() {
System.out.println("C任务开始");
sleep(2000);
System.out.println("C任务结束");
countDownLatch.countDown();
}
private static void sleep(long timeout){
try {
Thread.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
# 1.2 源码
TODO
# 2. CyclicBarrier
从名字上来看CyclicBarrier,就是代表循环屏障
Barrier屏障:让一个或多个线程达到一个屏障点,会被阻塞。屏障点会有一个数值,当达到一个线程阻塞在屏障点时,就会对屏障点的数值进行-1操作,当屏障点数值减为0时,屏障就会打开,唤醒所有阻塞在屏障点的线程。在释放屏障点之后,可以先执行一个任务,再让所有阻塞被唤醒的线程继续之后后续任务。
Cyclic循环:所有线程被释放后,屏障点的数值可以再次被重置。
CyclicBarrier一般被称为栅栏。
CyclicBarrier是一种同步机制,允许一组线程互相等待。现成的达到屏障点其实是基于await方法在屏障点阻塞。
CyclicBarrier并没有基于AQS实现,他是基于ReentrantLock锁的机制去实现了对屏障点--,以及线程挂起的操作。(CountDownLatch本身是基于AQS,对state进行release操作后,可以-1)
CyclicBarrier没来一个线程执行await,都会对屏障数值进行-1操作,每次-1后,立即查看数值是否为0,如果为0,直接唤醒所有的互相等待线程。
CyclicBarrier对比CountDownLatch区别
- 底层实现不同。CyclicBarrier基于ReentrantLock做的。CountDownLatch直接基于AQS做的。
- 应用场景不同。CountDownLatch的计数器只能使用一次。而CyclicBarrier在计数器达到0之后,可以重置计数器。CyclicBarrier可以实现相比CountDownLatch更复杂的业务,执行业务时出现了错误,可以重置CyclicBarrier计数器,再次执行一次。
- CyclicBarrier还提供了很多其他的功能:
- 可以获取到阻塞的现成有多少
- 在线程互相等待时,如果有等待的线程中断,可以抛出异常,避免无限等待的问题。
- CountDownLatch一般是让主线程等待,让子线程对计数器--。CyclicBarrier更多的让子线程也一起计数和等待,等待的线程达到数值后,再统一唤醒
CyclicBarrier:多个线程互相等待,直到到达同一个同步点,再一次执行。
# 2.1 应用
出国旅游。
导游小姐姐需要等待所有乘客都到位后,发送护照,签证等等文件,再一起出发
比如Tom,Jack,Rose三个人组个团出门旅游
在构建CyclicBarrier可以指定barrierAction,可以选择性指定,如果指定了,那么会在barrier归0后,优先执行barrierAction任务,然后再去唤醒所有阻塞挂起的线程,并行去处理后续任务。
所有互相等待的线程,可以指定等待时间,并且在等待的过程中,如果有线程中断,所有互相的等待的线程都会被唤醒。
如果在等待期间,有线程中断了,唤醒所有线程后,CyclicBarrier无法继续使用。
如果线程中断后,需要继续使用当前的CyclicBarrier,需要调用reset方法,让CyclicBarrier重置。
如果CyclicBarrier的屏障数值到达0之后,他默认会重置屏障数值,CyclicBarrier在没有线程中断时,是可以重复使用的。
public static void main(String[] args) throws InterruptedException {
CyclicBarrier barrier = new CyclicBarrier(3,() -> {
System.out.println("等到各位大佬都到位之后,分发护照和签证等内容!");
});
new Thread(() -> {
System.out.println("Tom到位!!!");
try {
barrier.await();
} catch (Exception e) {
System.out.println("悲剧,人没到齐!");
return;
}
System.out.println("Tom出发!!!");
}).start();
Thread.sleep(100);
new Thread(() -> {
System.out.println("Jack到位!!!");
try {
barrier.await();
} catch (Exception e) {
System.out.println("悲剧,人没到齐!");
return;
}
System.out.println("Jack出发!!!");
}).start();
Thread.sleep(100);
new Thread(() -> {
System.out.println("Rose到位!!!");
try {
barrier.await();
} catch (Exception e) {
System.out.println("悲剧,人没到齐!");
return;
}
System.out.println("Rose出发!!!");
}).start();
/*
tom到位,jack到位,rose到位
导游发签证
tom出发,jack出发,rose出发
*/
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
# 2.2 源码
TODO
# 3. Semaphone
sync,ReentrantLock是互斥锁,保证一个资源同一时间只允许被一个线程访问
Semaphore(信号量)保证1个或多个资源可以被指定数量的线程同时访问
底层实现是基于AQS去做的。
Semaphore底层也是基于AQS的state属性做一个计数器的维护。state的值就代表当前共享资源的个数。如果一个线程需要获取的1或多个资源,直接查看state的标识的资源个数是否足够,如果足够的,直接对state - 1拿到当前资源。如果资源不够,当前线程就需要挂起等待。知道持有资源的线程释放资源后,会归还给Semaphore中的state属性,挂起的线程就可以被唤醒。
Semaphore也分为公平和非公平的概念。
使用场景:连接池对象就可以基础信号量去实现管理。在一些流量控制上,也可以采用信号量去实现。再比如去迪士尼或者是环球影城,每天接受的人流量是固定的,指定一个具体的人流量,可能接受10000人,每有一个人购票后,就对信号量进行--操作,如果信号量已经达到了0,或者是资源不足,此时就不能买票。
# 3.1 应用
以上面环球影城每日人流量为例子去测试一下。
public static void main(String[] args) throws InterruptedException {
// 今天环球影城还有人个人流量
Semaphore semaphore = new Semaphore(10);
new Thread(() -> {
System.out.println("一家三口要去~~");
try {
semaphore.acquire(3);
System.out.println("一家三口进去了~~~");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println("一家三口走了~~~");
semaphore.release(3);
}
}).start();
for (int i = 0; i < 7; i++) {
int j = i;
new Thread(() -> {
System.out.println(j + "大哥来了。");
try {
semaphore.acquire();
System.out.println(j + "大哥进去了~~~");
Thread.sleep(10000);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
System.out.println(j + "大哥走了~~~");
semaphore.release();
}
}).start();
}
Thread.sleep(10);
System.out.println("main大哥来了。");
if (semaphore.tryAcquire()) {
System.out.println("main大哥进来了。");
}else{
System.out.println("资源不够,main大哥进来了。");
}
Thread.sleep(10000);
System.out.println("main大哥又来了。");
if (semaphore.tryAcquire()) {
System.out.println("main大哥进来了。");
semaphore.release();
}else{
System.out.println("资源不够,main大哥进来了。");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
其实Semaphore整体就是对构建Semaphore时,指定的资源数的获取和释放操作
获取资源方式:
- acquire():获取一个资源,没有资源就挂起等待,如果中断,直接抛异常
- acquire(int):获取指定个数资源,资源不够,或者没有资源就挂起等待,如果中断,直接抛异常
- tryAcquire():获取一个资源,没有资源返回false,有资源返回true
- tryAcquire(int):获取指定个数资源,没有资源返回false,有资源返回true
- tryAcquire(time,unit):获取一个资源,如果没有资源,等待time.unit,如果还没有,就返回false
- tryAcquire(int,time,unit):获取指定个数资源,如果没有资源,等待time.unit,如果还没有,就返回false
- acquireUninterruptibly():获取一个资源,没有资源就挂起等待,中断线程不结束,继续等
- acquireUninterruptibly(int):获取指定个数资源,没有资源就挂起等待,中断线程不结束,继续等
归还资源方式:
- release():归还一个资源
- release(int):归还指定个数资源
# 3.2 源码
TODO