百度360必应搜狗淘宝本站头条
当前位置:网站首页 > 技术流 > 正文

线程同步工具类(线程同步作用)

citgpt 2024-08-01 13:31 8 浏览 0 评论

同步工具类主要包括闭锁(如CountDownLatch),栅栏(如CyclicBarrier),信号量(如Semaphore)和阻塞队列(如LinkedBlockingQueue)等;

使用同步工具类可以协调线程的控制流;

线程同步工具类(线程同步作用)

同步工具类封装了一些状态,这些状态决定线程是继续执行还是等待,此外同步工具类还提供了修改状态的方法;

下面将简单介绍以上同步工具类;


闭锁

可以让一个线程等待一组事件发生后(不一定要线程结束)继续执行;

以CountDownLatch为例,内部包含一个计数器,一开始初始化为一个整数(事件个数),发生一个事件后,调用countDown方法,计数器减1,await用于等待计数器为0后继续执行当前线程;

举个例子如下,main线程等待其它子线程的事件发生后继续执行main线程:

package concurrency;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class TaskTest implements Runnable {
 private CountDownLatch latch;
 private int sleepTime;
 /**
 * 
 */
 public TaskTest(int sleepTime, CountDownLatch latch) {
 this.sleepTime = sleepTime;
 this.latch = latch;
 }
 /**
 * @see java.lang.Runnable#run()
 */
 @Override
 public void run() {
 try {
 CountDownLatchTest.print(" is running。");
 TimeUnit.MILLISECONDS.sleep(sleepTime);
 CountDownLatchTest.print(" finished。");
 //计数器减减
 latch.countDown();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
}
public class CountDownLatchTest {
 public static void main(String[] args) {
 int count = 10;
 final CountDownLatch latch = new CountDownLatch(count);
 ExecutorService es = Executors.newFixedThreadPool(count);
 for (int i = 0; i < count; i++) {
 es.execute(new TaskTest((i + 1) * 1000, latch));
 }
 try {
 CountDownLatchTest.print(" waiting...");
 //主线程等待其它事件发生
 latch.await();
 //其它事件已发生,继续执行主线程
 CountDownLatchTest.print(" continue。。。");
 } catch (InterruptedException e) {
 e.printStackTrace();
 } finally {
 es.shutdown();
 }
 }
 
 public static void print(String str){
 SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
 System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
 }
}

结果打印如下:

[09:41:43]pool-1-thread-1 is running。

[09:41:43]pool-1-thread-6 is running。

[09:41:43]main waiting...

[09:41:43]pool-1-thread-10 is running。

[09:41:43]pool-1-thread-4 is running。

[09:41:43]pool-1-thread-5 is running。

[09:41:43]pool-1-thread-2 is running。

[09:41:43]pool-1-thread-3 is running。

[09:41:43]pool-1-thread-7 is running。

[09:41:43]pool-1-thread-8 is running。

[09:41:43]pool-1-thread-9 is running。

[09:41:44]pool-1-thread-1 finished。

[09:41:45]pool-1-thread-2 finished。

[09:41:46]pool-1-thread-3 finished。

[09:41:47]pool-1-thread-4 finished。

[09:41:48]pool-1-thread-5 finished。

[09:41:49]pool-1-thread-6 finished。

[09:41:50]pool-1-thread-7 finished。

[09:41:51]pool-1-thread-8 finished。

[09:41:52]pool-1-thread-9 finished。

[09:41:53]pool-1-thread-10 finished。

[09:41:53]main continue。。。

此外,FutureTask也可用作闭锁,其get方法会等待任务完成后返回结果,否则一直阻塞直到任务完成;


信号量

控制同时执行某个指定操作的数量,常用于实现资源池,如数据库连接池,线程池...

以Semaphore为例,其内部维护一组资源,可以通过构造函数指定数目,其它线程在执行的时候,可以通过acquire方法获取资源,有的话,继续执行(使用结束后释放资源),没有资源的话将阻塞直到有其它线程调用release方法释放资源;

举个例子,如下代码,十个线程竞争三个资源,一开始有三个线程可以直接运行,剩下的七个线程只能阻塞等到其它线程使用资源完毕才能执行;

package concurrency;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
public class SemaphoreTest {
 
 public static void print(String str){
 SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
 System.out.println("[" + dfdate.format(new Date()) + "]" + Thread.currentThread().getName() + str);
 }
 
 public static void main(String[] args) {
 // 线程数目
 int threadCount = 10;
 // 资源数目
 Semaphore semaphore = new Semaphore(3);
 
 ExecutorService es = Executors.newFixedThreadPool(threadCount);
 // 启动若干线程
 for (int i = 0; i < threadCount; i++)
 es.execute(new ConsumeResourceTask((i + 1) * 1000, semaphore));
 }
}
class ConsumeResourceTask implements Runnable {
 private Semaphore semaphore;
 private int sleepTime;
 /**
 * 
 */
 public ConsumeResourceTask(int sleepTime, Semaphore semaphore) {
 this.sleepTime = sleepTime;
 this.semaphore = semaphore;
 }
 public void run() {
 try {
 //获取资源
 semaphore.acquire();
 SemaphoreTest.print(" 占用一个资源...");
 TimeUnit.MILLISECONDS.sleep(sleepTime);
 SemaphoreTest.print(" 资源使用结束,释放资源");
 //释放资源
 semaphore.release();
 } catch (InterruptedException e) {
 e.printStackTrace();
 }
 }
}
[10:30:11]pool-1-thread-1 占用一个资源...
[10:30:11]pool-1-thread-2 占用一个资源...
[10:30:11]pool-1-thread-3 占用一个资源...
[10:30:12]pool-1-thread-1 资源使用结束,释放资源
[10:30:12]pool-1-thread-4 占用一个资源...
[10:30:13]pool-1-thread-2 资源使用结束,释放资源
[10:30:13]pool-1-thread-5 占用一个资源...
[10:30:14]pool-1-thread-3 资源使用结束,释放资源
[10:30:14]pool-1-thread-8 占用一个资源...
[10:30:16]pool-1-thread-4 资源使用结束,释放资源
[10:30:16]pool-1-thread-6 占用一个资源...
[10:30:18]pool-1-thread-5 资源使用结束,释放资源
[10:30:18]pool-1-thread-9 占用一个资源...
[10:30:22]pool-1-thread-8 资源使用结束,释放资源
[10:30:22]pool-1-thread-7 占用一个资源...
[10:30:22]pool-1-thread-6 资源使用结束,释放资源
[10:30:22]pool-1-thread-10 占用一个资源...
[10:30:27]pool-1-thread-9 资源使用结束,释放资源
[10:30:29]pool-1-thread-7 资源使用结束,释放资源
[10:30:32]pool-1-thread-10 资源使用结束,释放资源


栅栏

栅栏用于等待其它线程,且会阻塞自己当前线程;

所有线程必须同时到达栅栏位置后,才能继续执行;

举个例子如下:

package concurrency;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
class CyclicBarrierTaskTest implements Runnable {
 private CyclicBarrier cyclicBarrier;
 private int timeout;
 public CyclicBarrierTaskTest(CyclicBarrier cyclicBarrier, int timeout) {
 this.cyclicBarrier = cyclicBarrier;
 this.timeout = timeout;
 }
 @Override
 public void run() {
 TestCyclicBarrier.print(" 正在running...");
 try {
 TimeUnit.MILLISECONDS.sleep(timeout);
 TestCyclicBarrier.print(" 到达栅栏处,等待其它线程到达");
 cyclicBarrier.await();
 } catch (InterruptedException e) {
 e.printStackTrace();
 } catch (BrokenBarrierException e) {
 e.printStackTrace();
 }
 TestCyclicBarrier.print(" 所有线程到达栅栏处,继续执行各自线程任务...");
 }
}
public class TestCyclicBarrier {
 public static void print(String str) {
 SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
 System.out.println("[" + dfdate.format(new Date()) + "]"
 + Thread.currentThread().getName() + str);
 }
 public static void main(String[] args) {
 int count = 5;
 
 ExecutorService es = Executors.newFixedThreadPool(count);
 CyclicBarrier barrier = new CyclicBarrier(count, new Runnable() {
 @Override
 public void run() {
 TestCyclicBarrier.print(" 所有线程到达栅栏处,可以在此做一些处理...");
 }
 });
 for (int i = 0; i < count; i++)
 es.execute(new CyclicBarrierTaskTest(barrier, (i + 1) * 1000));
 }
}
[11:07:00]pool-1-thread-2 正在running...
[11:07:00]pool-1-thread-1 正在running...
[11:07:00]pool-1-thread-5 正在running...
[11:07:00]pool-1-thread-3 正在running...
[11:07:00]pool-1-thread-4 正在running...
[11:07:01]pool-1-thread-1 到达栅栏处,等待其它线程到达
[11:07:02]pool-1-thread-2 到达栅栏处,等待其它线程到达
[11:07:03]pool-1-thread-3 到达栅栏处,等待其它线程到达
[11:07:04]pool-1-thread-4 到达栅栏处,等待其它线程到达
[11:07:05]pool-1-thread-5 到达栅栏处,等待其它线程到达
[11:07:05]pool-1-thread-5 所有线程到达栅栏处,可以在此做一些处理...
[11:07:05]pool-1-thread-1 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-2 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-5 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-3 所有线程到达栅栏处,继续执行各自线程任务...
[11:07:05]pool-1-thread-4 所有线程到达栅栏处,继续执行各自线程任务...


阻塞队列

阻塞队列提供了可阻塞的入队和出对操作,如果队列满了,入队操作将阻塞直到有空间可用,如果队列空了,出队操作将阻塞直到有元素可用;

队列可以为有界和无界队列,无界队列不会满,因此入队操作将不会阻塞;

下面将使用阻塞队列LinkedBlockingQueue举个生产者-消费者例子,生产者每隔1秒生产1个产品,然后有6个消费者在消费产品,可以发现,每隔1秒,只有一个消费者能够获取到产品消费,其它线程只能等待...

如下代码:

package concurrency;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
//生产者
public class Producer implements Runnable {
 private final BlockingQueue<String> fileQueue;
 public Producer(BlockingQueue<String> queue) {
 this.fileQueue = queue;
 }
 public void run() {
 try {
 while (true) {
 TimeUnit.MILLISECONDS.sleep(1000);
 String produce = this.produce();
 System.out.println(Thread.currentThread() + "生产:" + produce);
 fileQueue.put(produce);
 }
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 }
 }
 public String produce() {
 SimpleDateFormat dfdate = new SimpleDateFormat("HH:mm:ss");
 return dfdate.format(new Date());
 }
 public static void main(String[] args) {
 BlockingQueue<String> queue = new LinkedBlockingQueue<String>(10);
 for (int i = 0; i < 1; i++) {
 new Thread(new Producer(queue)).start();
 }
 for (int i = 0; i < 6; i++) {
 new Thread(new Consumer(queue)).start();
 }
 }
}
// 消费者
class Consumer implements Runnable {
 private final BlockingQueue<String> queue;
 public Consumer(BlockingQueue<String> queue) {
 this.queue = queue;
 }
 public void run() {
 try {
 while (true) {
 TimeUnit.MILLISECONDS.sleep(1000);
 System.out.println(Thread.currentThread() + "prepare 消费");
 System.out.println(Thread.currentThread() + "starting:"
 + queue.take());
 System.out.println(Thread.currentThread() + "end 消费");
 }
 } catch (InterruptedException e) {
 Thread.currentThread().interrupt();
 }
 }
}
Thread[Thread-1,5,main]prepare 消费
Thread[Thread-3,5,main]prepare 消费
Thread[Thread-4,5,main]prepare 消费
Thread[Thread-2,5,main]prepare 消费
Thread[Thread-6,5,main]prepare 消费
Thread[Thread-5,5,main]prepare 消费
Thread[Thread-0,5,main]生产:11:36:36
Thread[Thread-1,5,main]starting:11:36:36
Thread[Thread-1,5,main]end 消费
Thread[Thread-1,5,main]prepare 消费
Thread[Thread-0,5,main]生产:11:36:37
Thread[Thread-4,5,main]starting:11:36:37
Thread[Thread-4,5,main]end 消费
Thread[Thread-4,5,main]prepare 消费
Thread[Thread-0,5,main]生产:11:36:38
Thread[Thread-3,5,main]starting:11:36:38
Thread[Thread-3,5,main]end 消费
...

相关推荐

js中arguments详解

一、简介了解arguments这个对象之前先来认识一下javascript的一些功能:其实Javascript并没有重载函数的功能,但是Arguments对象能够模拟重载。Javascrip中每个函数...

firewall-cmd 常用命令

目录firewalldzone说明firewallzone内容说明firewall-cmd常用参数firewall-cmd常用命令常用命令 回到顶部firewalldzone...

epel-release 是什么

EPEL-release(ExtraPackagesforEnterpriseLinux)是一个软件仓库,它为企业级Linux发行版(如CentOS、RHEL等)提供额外的软件包。以下是关于E...

FullGC详解  什么是 JVM 的 GC
FullGC详解 什么是 JVM 的 GC

前言:背景:一、什么是JVM的GC?JVM(JavaVirtualMachine)。JVM是Java程序的虚拟机,是一种实现Java语言的解...

2024-10-26 08:50 citgpt

使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
  • 使用Spire.Doc组件利用模板导出Word文档
跨域(CrossOrigin)

1.介绍  1)跨域问题:跨域问题是在网络中,当一个网络的运行脚本(通常时JavaScript)试图访问另一个网络的资源时,如果这两个网络的端口、协议和域名不一致时就会出现跨域问题。    通俗讲...

微服务架构和分布式架构的区别

1、含义不同微服务架构:微服务架构风格是一种将一个单一应用程序开发为一组小型服务的方法,每个服务运行在自己的进程中,服务间通信采用轻量级通信机制(通常用HTTP资源API)。这些服务围绕业务能力构建并...

深入理解与应用CSS clip-path 属性
深入理解与应用CSS clip-path 属性

clip-pathclip-path是什么clip-path 是一个CSS属性,允许开发者创建一个剪切区域,从而决定元素的哪些部分可见,哪些部分会被隐...

2024-10-25 11:51 citgpt

HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
  • HCNP Routing&Switching之OSPF LSA类型(二)
Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
  • Redis和Memcached的区别详解
Request.ServerVariables 大全

Request.ServerVariables("Url")返回服务器地址Request.ServerVariables("Path_Info")客户端提供的路...

python操作Kafka

目录一、python操作kafka1.python使用kafka生产者2.python使用kafka消费者3.使用docker中的kafka二、python操作kafka细...

Runtime.getRuntime().exec详解

Runtime.getRuntime().exec详解概述Runtime.getRuntime().exec用于调用外部可执行程序或系统命令,并重定向外部程序的标准输入、标准输出和标准错误到缓冲池。...

promise.all详解 promise.all是干什么的
promise.all详解 promise.all是干什么的

promise.all详解promise.all中所有的请求成功了,走.then(),在.then()中能得到一个数组,数组中是每个请求resolve抛出的结果...

2024-10-24 16:21 citgpt

Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解
  • Content-Length和Transfer-Encoding详解

取消回复欢迎 发表评论: