java并发-如何保证线程按照顺序执行?
【readme】
- 使用只有单个线程的线程池(最简单)
- Thread.join()
- 可重入锁 ReentrantLock + Condition 条件变量(多个) ; 原理如下:
- 任务1执行前在锁1上阻塞;执行完成后在锁2上唤醒;
- 任务2执行前在锁2上阻塞,执行完成后在锁3上唤醒;
- 任务n执行前在锁n上阻塞,执行完成后在锁n+1上唤醒;
- 以此类推 …………..
- 补充:
- 第1条任务执行前可以不阻塞,但执行完成后必须唤醒;(如果要阻塞,则可以让主线程来唤醒第1条任务);
- 补充: 最后一条任务执行后可以不唤醒,但执行前必须阻塞; (如果要唤醒,则最后一条任务执行完成后唤醒主线程);
- 与可重入锁类似,可以使用monitor监视器锁(多个);
- 与可重入锁类似,使用 Semaphore 信号量(多个);
- 与可重入锁类似,CountDownLatch : 倒计时锁存器(多个);
- 与可重入锁类似,CyclicBarrier 循环栅栏(多个) ;
【1】单个线程的线程池
参数设置:核心线程数=1, 最大线程数=1,就能保证线程池中只有1个线程在运行;
public class OrderlySingleThreadPoolTest {
public static void main(String[] args) {
ThreadPoolExecutor singleThreadPool =
new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(100));
singleThreadPool.execute(new Task(1));
singleThreadPool.execute(new Task(2));
singleThreadPool.execute(new Task(3));
singleThreadPool.execute(new Task(4));
singleThreadPool.execute(new Task(5));
singleThreadPool.shutdown();
}
private static class Task implements Runnable {
int order; // 执行序号
Task(int order) {
this.order = order;
}
@Override
public void run() {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
PrintUtils.print("序号=" + order + "执行完成");
}
}
}
【打印结果】
2024-06-09 16:07:59.398 序号=1执行完成
2024-06-09 16:08:02.404 序号=2执行完成
2024-06-09 16:08:05.411 序号=3执行完成
2024-06-09 16:08:08.425 序号=4执行完成
2024-06-09 16:08:11.439 序号=5执行完成
【2】thread.join()
main 调用 t1.join(),则main线程阻塞直到t1线程执行完成;如下。
public class ThreadJoinTest {
public static void main(String[] args) {
f1();
PrintUtils.print("主线程结束");
}
public static void f1() {
Thread t1 = new Thread(()->{
try {
TimeUnit.SECONDS.sleep(5);
PrintUtils.print("t1线程结束");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
});
t1.start();
try {
t1.join();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
}
2024-06-09 07:44:38.474 t1线程结束
2024-06-09 07:44:38.573 主线程结束
【3】可重入锁+条件变量实现多个线程顺序执行
1. 补充:
condition.await() 调用前需要获取锁;调用后释放锁(其他线程可以获取该锁,因此得名为可重入),但当前线程阻塞;
condition.signal() 调用前需要获取锁;调用后释放锁;
public class OrderlyReentrantLockTest {
private static Condition[][] build(ReentrantLock reentrantLock, int num) {
Condition[][] arr = new Condition[num][2];
arr[0] = new Condition[]{null, reentrantLock.newCondition()};
int i = 1;
for (; i < num - 1; i++) {
arr[i] = new Condition[]{arr[i - 1][1], reentrantLock.newCondition()};
}
arr[i] = new Condition[]{arr[i - 1][1], null};
return arr;
}
public static void main(String[] args) {
int threadNum = 5;
ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(threadNum, threadNum, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(100));
ReentrantLock reentrantLock = new ReentrantLock(true);
AtomicInteger unWaitNum = new AtomicInteger(threadNum);
// 构建条件变量数组
Condition[][] conditionTwoArr = build(reentrantLock, threadNum);
// 提交任务
for (int order = threadNum; order >= 1; order--) {
OrderlyTask orderlyTask = new OrderlyTask(reentrantLock, conditionTwoArr[order - 1], order, unWaitNum);
threadPool.execute(orderlyTask);
// 阻塞成功,才提交下一个任务
while (unWaitNum.get() == threadNum) ; // 这里可能死循环,但可以新增超时重试机制来处理
PrintUtils.print("阻塞成功,线程order=" + order);
}
threadPool.shutdown();
}
private static class OrderlyTask implements Runnable {
private ReentrantLock lock;
private Condition[] conditions;
private int order; // 执行序号
private AtomicInteger unWaitNum;
OrderlyTask(ReentrantLock reentrantLock, Condition[] conditions, int order, AtomicInteger unWaitNum) {
this.lock = reentrantLock;
this.conditions = conditions;
this.order = order;
this.unWaitNum = unWaitNum;
}
@Override
public void run() {
lock.lock();
try {
unWaitNum.decrementAndGet();
try {
if (conditions[0] != null) {
conditions[0].await(); // 在第1个条件变量上阻塞
}
} catch (Exception e) {
unWaitNum.incrementAndGet();
throw e;
}
// 处理业务逻辑
TimeUnit.SECONDS.sleep(3);
// 唤醒在第2个条件变量上阻塞的线程
if (conditions[1] != null) {
conditions[1].signal();
}
} catch (Exception e) {
System.err.println(e);
} finally {
lock.unlock();
}
PrintUtils.print("执行完成, 线程order=" + order + ", 线程id=" + Thread.currentThread().getName());
}
}
}
打印结果:
2024-06-09 22:16:00.696 阻塞成功,线程order=5
2024-06-09 22:16:00.698 阻塞成功,线程order=4
2024-06-09 22:16:00.698 阻塞成功,线程order=3
2024-06-09 22:16:00.698 阻塞成功,线程order=2
2024-06-09 22:16:00.698 阻塞成功,线程order=1
2024-06-09 22:16:03.707 执行完成, 线程order=1, 线程id=pool-1-thread-5
2024-06-09 22:16:06.719 执行完成, 线程order=2, 线程id=pool-1-thread-4
2024-06-09 22:16:09.719 执行完成, 线程order=3, 线程id=pool-1-thread-3
2024-06-09 22:16:12.727 执行完成, 线程order=4, 线程id=pool-1-thread-2
2024-06-09 22:16:15.729 执行完成, 线程order=5, 线程id=pool-1-thread-1
【4】使用CountDownLatch倒计时锁存器
public class OrderlyCountDownLatchTest {
private static CountDownLatch[][] build(int num) {
CountDownLatch[][] arr = new CountDownLatch[num][2];
arr[0] = new CountDownLatch[]{null, new CountDownLatch(1)};
int i = 1;
for (; i < num - 1; i++) {
arr[i] = new CountDownLatch[]{arr[i - 1][1], new CountDownLatch(1)};
}
arr[i] = new CountDownLatch[]{arr[i - 1][1], null};
return arr;
}
public static void main(String[] args) {
int threadNum = 5;
ThreadPoolExecutor threadPool =
new ThreadPoolExecutor(threadNum, threadNum, 0, TimeUnit.MILLISECONDS, new ArrayBlockingQueue(100));
AtomicInteger unWaitNum = new AtomicInteger(threadNum);
// 构建倒计时锁存器数组
CountDownLatch[][] latchArr = build(threadNum);
// 提交任务
for (int order = threadNum; order >= 1; order--) {
OrderlyTask orderlyTask = new OrderlyTask(latchArr[order - 1], order, unWaitNum);
threadPool.execute(orderlyTask);
// 阻塞成功,才提交下一个任务
while (unWaitNum.get() == threadNum) ; // 这里可能死循环,但可以新增超时重试机制来处理
PrintUtils.print("阻塞成功,线程order=" + order);
}
threadPool.shutdown();
}
private static class OrderlyTask implements Runnable {
private CountDownLatch[] latchArr;
private int order; // 执行序号
private AtomicInteger unWaitNum;
OrderlyTask(CountDownLatch[] latchArr, int order, AtomicInteger unWaitNum) {
this.latchArr = latchArr;
this.order = order;
this.unWaitNum = unWaitNum;
}
@Override
public void run() {
try {
unWaitNum.decrementAndGet();
try {
if (latchArr[0] != null) {
latchArr[0].await(); // 在第1个锁存器上阻塞
}
} catch (Exception e) {
unWaitNum.incrementAndGet();
throw e;
}
// 处理业务逻辑
TimeUnit.SECONDS.sleep(3);
// 唤醒在第2个条件变量上阻塞的线程
if (latchArr[1] != null) {
latchArr[1].countDown();
}
} catch (Exception e) {
System.err.println(e);
}
PrintUtils.print("执行完成, 线程order=" + order + ", 线程id=" + Thread.currentThread().getName());
}
}
}
打印结果:
2024-06-09 22:35:13.648 阻塞成功,线程order=5
2024-06-09 22:35:13.651 阻塞成功,线程order=4
2024-06-09 22:35:13.651 阻塞成功,线程order=3
2024-06-09 22:35:13.651 阻塞成功,线程order=2
2024-06-09 22:35:13.651 阻塞成功,线程order=1
2024-06-09 22:35:16.664 执行完成, 线程order=1, 线程id=pool-1-thread-5
2024-06-09 22:35:19.676 执行完成, 线程order=2, 线程id=pool-1-thread-4
2024-06-09 22:35:22.682 执行完成, 线程order=3, 线程id=pool-1-thread-3
2024-06-09 22:35:25.684 执行完成, 线程order=4, 线程id=pool-1-thread-2
2024-06-09 22:35:28.688 执行完成, 线程order=5, 线程id=pool-1-thread-1