多线程2
单例模式
单例模式是一种创建只能存在一个实例的类的设计模式。它确保类只有一个实例,并提供一个全局访问点来获取该实例。
class Singleton1{ private static Singleton1 instance = new Singleton1(); //私有化构造方法,防止外部实例化 private Singleton1(){} public static Singleton1 getInstance(){ return instance; } }
进行多线程环境下测试:
Thread t1 = new Thread(() ->{ Singleton1 s1 = Singleton1.getInstance(); System.out.println(s1); }); Thread t2 = new Thread(() ->{ Singleton1 s2 = Singleton1.getInstance(); System.out.println(s2); }); t1.start(); t2.start();
运行结果:
初始版(线程不安全):
class Singleton{ private static Singleton instance; //创建一个私有的构造方法,防止外部代码直接实例化 private Singleton(){} public static Singleton getInstance(){ if(instance == null){ instance = new Singleton(); } return instance; } }
多线程环境下进行测试:
Thread t1 = new Thread(() ->{ Singleton s1 = Singleton.getInstance(); System.out.println(s1); }); Thread t2 = new Thread(() ->{ Singleton s2 = Singleton.getInstance(); System.out.println(s2); }); t1.start(); t2.start();
运行结果:
两次结果不一致,说明创建了两个不同的实例。
对其进行优化,使其变得线程安全:
class Singleton{ private volatile static Singleton instance; //创建一个私有的构造方法,防止外部代码直接实例化 private Singleton(){} public static Singleton getInstance(){ if (instance == null) { synchronized (Singleton.class) { if(instance == null){ instance = new Singleton(); } } } return instance; } }
多线程下运行结果:
说明:
阻塞队列
除了需要满足先进先出的规则外,还要满足:
1)如果队列为空,执行出队列就会阻塞,直到别的线程往队列中添加元素(队列不为空)为止;
2)如果队列满了,执行入队操作也会阻塞,阻塞到另一个线程从队列中拿走元素(队列不满);
示例:
public class ThreadDemo1 { public static void main(String[] args) throws InterruptedException { //阻塞队列 BlockingQueue queue = new LinkedBlockingQueue(); queue.put("hello"); String str = queue.take(); //str = queue.take(); System.out.println(str); } }
阻塞队列中只有使用put()和take()函数才能起到阻塞的作用。
生产者消费者模型
使用生产者消费者模型的两大好处:
1)解耦合 2)削峰填谷,使程序平稳运行
我们可以使用阻塞队列来实现生产者消费者模型,生产者将生产好的产品放到阻塞队列中,消费者从阻塞队列中取产品,从而实现解耦合和平稳运行。
代码示例:
public class ThreadDemo1 { //利用阻塞队列来实现一个生产者消费者模型 public static void main(String[] args) { BlockingQueue queue = new LinkedBlockingQueue(); Thread consumer = new Thread(() -> { while (true) { try { Thread.sleep(1000); System.out.println("消费元素:" + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }); consumer.start(); Thread producer = new Thread(() -> { int count = 0; while (true) { try { queue.put(count); System.out.println("生产元素:" + count); Thread.sleep(500); } catch (InterruptedException e) { e.printStackTrace(); } count++; } }); producer.start(); } }
运行结果:
可以看到生产和消费均按照顺序执行。
实现一个自己的阻塞队列:
使用数组实现:
class MyBlockedQueue{ private int[] nums = new int[4]; private int size = 0; private int head = 0; private int tail = 0; public synchronized void put(int value) throws InterruptedException { //如果队列满,无法插入 while(size == nums.length){ this.wait(); } nums[tail] = value; tail++; if(tail >= nums.length){ tail = 0; } size++; this.notify(); } public synchronized Integer take() throws InterruptedException { //队列为空,无法出队 while(size == 0){ this.wait(); } int value = nums[head]; head++; if(head >= nums.length){ head = 0; } size--; this.notify(); return value; } }
通过生产者消费者模型测试当前实现的阻塞队列:
class MyBlockedQueue{ private int[] nums = new int[1000]; private int size = 0; private int head = 0; private int tail = 0; public synchronized void put(int value) throws InterruptedException { //如果队列满,无法插入 while(size == nums.length){ this.wait(); } nums[tail] = value; tail++; if(tail >= nums.length){ tail = 0; } size++; this.notify(); } public synchronized Integer take() throws InterruptedException { //队列为空,无法出队 while(size == 0){ this.wait(); } int value = nums[head]; head++; if(head >= nums.length){ head = 0; } size--; this.notify(); return value; } } public class ThreadDemo2 { public static void main(String[] args) { MyBlockedQueue queue = new MyBlockedQueue(); Thread consumer = new Thread(() -> { while (true) { try { Thread.sleep(500); System.out.println("消费元素:" + queue.take()); } catch (InterruptedException e) { e.printStackTrace(); } } }); consumer.start(); Thread producer = new Thread(() -> { int count = 0; while (true) { try { queue.put(count); System.out.println("生产元素:" + count); } catch (InterruptedException e) { e.printStackTrace(); } count++; } }); producer.start(); } }
可以看到,当生产1000个元素后,对列满,此时消费一个才生产一个。
定时器
功能描述:等待一定的时间后去执行一个指定的任务。
public class ThreadDemo3 { public static void main(String[] args) { Timer timer = new Timer(); System.out.println(LocalDateTime.now()); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("程序开始执行"); System.out.println(LocalDateTime.now()); } },3000); } }
运行结果:
实现一个自己的定时器:
先来看系统原有的定时器功能:
public class ThreadDemo6 { //实现一个自己的定时器 public static void main(String[] args) { Timer timer = new Timer(); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("执行任务1"); } },3000); timer.schedule(new TimerTask() { @Override public void run() { System.out.println("执行任务2"); } },1000); } }
可以传入多个任务,任务按时间排序,时间段的先执行,因此我们需要一个优先级阻塞队列来存放任务。
class MyTask implements Comparable{ //用来表示具体的任务 private Runnable runnable; //用来存放要倒计时的时间 private long time; public MyTask(Runnable runnable, long time) { this.runnable = runnable; this.time = time; } public void run() { runnable.run(); } public long getTime() { return time; } @Override public int compareTo(MyTask o) { return (int)(this.time - o.getTime()); } } class MyTimer{ //定义一个线程来扫描 Thread t = null; PriorityBlockingQueue queue = new PriorityBlockingQueue(); public MyTimer(){ t = new Thread(() -> { while(true){ //取出队首元素,判断是否到时间 try { synchronized (this) { MyTask task = queue.take(); long curTime = System.currentTimeMillis(); if(task.getTime() > curTime){ queue.put(task); this.wait(task.getTime() - curTime); }else{ task.run(); } } } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); } public void schedule(Runnable runnable, long after){ MyTask task = new MyTask(runnable,System.currentTimeMillis()+after); queue.put(task); synchronized (this){ this.notify(); } } }
工厂模式
使用普通方法来代替构造方法,创建对象。
考虑这样一个场景,我们有一个类,要用来描述平面上的一个点:
class Point{ //通过笛卡尔坐标系来描述 public Point(double x, double y){} //通过极坐标来描述 public Point(double r,double a){} }
此时我们会发现,两个构造方法参数也完全相同,无法构成重载,因此为了解决这类问题,我们提出了工厂模式:
class PointFactory{ public static Point makePointByXY(double x, double y){} public static Point makePointByRA(double x, double y){} }
此时,我们就可以通过对普通方法进行不同的命名即可实现多种方式的构造。
线程池
通过线程池创建线程比普通线程的开销更小,原因如下:
资源管理:线程池管理线程的生命周期,包括创建、销毁和重用线程。因此,在线程池中,创建新线程的开销相对较小。而直接使用new操作符创建线程时,每次都需要手动管理线程的创建和销毁,这会增加额外的开销。
线程重用:线程池中的线程可以被重用,而不是在每次需要执行任务时都创建一个新线程。通过重用线程,避免了频繁创建和销毁线程的开销,提高了线程的利用率。
控制并发数:线程池可以限制并发线程的数量,可以根据系统的资源情况和任务的需求来动态调整并发线程的数量,防止线程过多导致系统资源耗尽和性能下降。
队列管理:线程池通常使用队列来管理待执行的任务。当线程池中的线程都在执行任务时,新的任务可以暂时放置在队列中,等待线程空闲时再进行执行。这种方式能够避免任务丢失,提高系统的可靠性和稳定性。
通过系统的内置函数来使用常量池:
public class ThreadDemo7 { public static void main(String[] args) { //构造出一个具有十个线程的线程池 ExecutorService pool = Executors.newFixedThreadPool(10); for (int i = 0; i < 1000; i++) { //变量捕获 int n = i; pool.submit(new Runnable() { @Override public void run() { System.out.println("hello " + n); } }); } } }
变量捕获是指在匿名函数或闭包中引用外部作用域中的变量。当内部函数通过引用外部作用域中的变量时,会创建对该变量的引用关系,以便在内部函数执行时可以访问和操作该变量。在JDK1.8之前,要求外部作用域中的变量必须由final修饰,1.8之后只有外部代码中没有对该变量进行修改,也可以捕获。
Java中常用的线程池创建线程的函数有以下几种:
newFixedThreadPool(int nThreads):创建一个固定大小的线程池,其中线程数量固定为指定的nThreads。线程池中的线程数始终保持不变,即使线程处于空闲状态。
newCachedThreadPool():创建一个可缓存的线程池,线程池的大小可以根据需要进行自动扩展。如果当前有可用的空闲线程,则重用空闲线程;如果没有可用的空闲线程,则创建一个新线程。如果线程在60秒内没有使用,则被终止并从线程池中移除。
newSingleThreadExecutor():创建一个单线程的线程池,保证所有任务按顺序执行。当该线程执行完任务后,会立即创建一个新线程来处理下一个任务。
newScheduledThreadPool(int corePoolSize):创建一个固定大小的线程池,可以进行定时或周期性任务的调度执行。可以指定核心线程池的大小,其他任务会在需要时进行创建。
ThreadPoolExecutor是Java中用于创建和管理线程池的强大类。
它是ExecutorService接口的一个实现,提供了更灵活和可定制化的线程池功能。下面是ThreadPoolExecutor类的详细介绍:
ThreadPoolExecutor类的构造函数:
public ThreadPoolExecutor( int corePoolSize, // 核心线程池大小 int maximumPoolSize, // 最大线程池大小 long keepAliveTime, // 非核心线程的闲置时间 TimeUnit unit, // keepAliveTime 参数的时间单位 BlockingQueue workQueue, // 用于存放任务的阻塞队列 ThreadFactory threadFactory // 线程工厂,用于创建新线程 );
主要参数说明:
corePoolSize:线程池中核心线程数的大小,核心线程会一直存活,即使没有任务需要执行。
maximumPoolSize:线程池中最大线程数的大小,包括核心线程和非核心线程。
keepAliveTime:非核心线程的闲置时间,超过该时间则会被回收(从理论上来说,
maximumPoolSize - corePoolSize 个线程可以被回收)。
unit:闲置时间的单位,可以是秒、毫秒等。
workQueue:用于存放待执行任务的阻塞队列,可以是ArrayBlockingQueue、LinkedBlockingQueue、
SynchronousQueue等。
threadFactory:线程工厂,用于创建新线程。
RejectedExecutionHandler handler:当线程池中的任务队列已满或者线程池已经关闭时,无法继续添加新的任务到线程池中,这时就需要根据设定的拒绝策略来处理这些无法执行的任务。主要有以下四个拒绝策略:
1) AbortPolicy(默认):直接抛出RejectedExecutionException异常,阻止系统正常工作。
RejectedExecutionHandler handler = new ThreadPoolExecutor.AbortPolicy();
2) CallerRunsPolicy:线程池调用execute方法的线程直接运行被拒绝的任务。
RejectedExecutionHandler handler = new ThreadPoolExecutor.CallerRunsPolicy();
3)DiscardPolicy:直接丢弃被拒绝的任务(抛弃新任务),没有任何异常抛出。
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardPolicy();
4)DiscardOldestPolicy:丢弃任务队列中最老的未处理任务(抛弃老任务),并尝试再次提交被拒绝的任务。
RejectedExecutionHandler handler = new ThreadPoolExecutor.DiscardOldestPolicy();
ThreadPoolExecutor类常用的方法:
execute(Runnable command):提交任务到线程池执行。
shutdown():平滑关闭线程池,等待所有任务执行完成后终止。
shutdownNow():立即关闭线程池,尝试停止所有正在执行的任务,并返回等待执行的任务列表。
getActiveCount():获取线程池中当前活动的线程数。
getCompletedTaskCount():获取线程池已完成的任务数。
getTaskCount():获取线程池总共提交的任务数。
实现一个自己的简单线程池:
//实现一个自己的线程池 class MyThreadPool{ //用来存放任务 private static BlockingQueue queue = new LinkedBlockingQueue(); public MyThreadPool(int n){ for (int i = 0; i { while (true) { try { Runnable runnable = queue.take(); runnable.run(); } catch (InterruptedException e) { e.printStackTrace(); } } }); t.start(); } } public void submit(Runnable runnable){ //注册任务 try { queue.put(runnable); } catch (InterruptedException e) { e.printStackTrace(); } } }
常见的锁策略:
乐观锁 VS 悲观锁
乐观锁:预测锁竞争不会很激烈,需要做的工作相对较少;乐观锁采取乐观的态度,认为并发访问的线程之间很少会发生冲突,因此在读取数据时不会加锁。当要更新数据时,它会先比较当前的数据与自己持有的数据是否一致,如果一致则进行更新,若不一致则说明该数据已经被其他线程更改过,当前操作失败,可能需要重新尝试。乐观锁常用的实现方式是使用版本号或时间戳来标识数据的版本。乐观锁的优点是不会引起线程阻塞,适合读多写少的场景,可以提高并发性能。但是乐观锁对于并发冲突的处理需要进行重试,可能会增加系统的复杂性。
悲观锁:预测锁竞争十分激烈,需要做的工作很多;悲观锁采取悲观的态度,认为并发访问的线程之间会发生冲突,因此在读取和更新数据时都会加上锁,保证同一时间只有一个线程可以访问和修改数据。悲观锁通过独占锁的方式来实现,如使用 synchronized 关键字或 Lock 接口的实现类。悲观锁的优点是简单可靠,适合写多读少或者写读都比较复杂的场景,可以避免并发冲突,保证数据的一致性。但是悲观锁需要在访问数据时进行加锁和解锁的操作,可能会引起线程阻塞,降低并发性能。
轻量级锁 VS 重量级锁
轻量级锁:开销小,效率高
重量级锁:开销大,效率低
自旋锁 VS 挂起等待锁
自旋锁:自旋锁是一种非阻塞的锁机制,当一个线程尝试获取自旋锁时,如果发现锁已经被其他线程占用,该线程会一直忙等待(自旋)直到锁被释放。因此,自旋锁适用于对共享资源的短期占用或者短期等待的情况。
挂起等待锁:挂起等待锁使用的是线程阻塞的机制,当一个线程尝试获取锁时发现锁已被其他线程占用时,该线程会被挂起等待,直到锁被释放后才会被唤醒恢复执行。线程的挂起和唤醒是由操作系统内核来实现的,当线程被挂起时,它会交出CPU资源给其他线程使用。
互斥锁 VS 读写锁
互斥锁:提供加锁和解锁两个操作,如果一个线程加锁了,另一个线程也尝试加锁,就会阻塞等待。
读写锁:提供了三种操作,针对读加锁,针对写加速,解锁,读锁和读锁之间没有互斥,写锁和写锁之间存在互斥,读锁和写锁之间存在互斥。
公平锁 VS 非公平锁
公平锁:当前对象解锁后,由等待时间最长的线程获取当前对象;
非公平锁:当前对象解锁后,其余线程自由竞争,谁抢到谁获取。
Synchronized对号入座:
Synchronized既是一个悲观锁也是一个乐观锁;其默认是一个乐观锁,当锁竞争加剧的时候会转为悲观锁。
Synchronized既是轻量级锁也是重量级锁,默认情况下是轻量级锁,当锁竞争加剧时转为重量级锁。
Synchronized这里的轻量级锁是根据自旋锁的方式实现的,这里的重量级锁是根据挂起等待锁的方式实现的;
Synchronized是可重入锁,是非公平锁,不是读写锁,而是互斥锁。
CAS(compare and swap): 比较交换
我们假设内存中的原始数据V,预期的数据A,要修改的数据为B:
1)比较A和V是否相等;
2)如果相等,则将B写入V。(交换)
3)返回操作是否成功。
CAS的应用场景:
1)实现原子类
通过Java标准库中的函数来进行演示:
public class ThreadDemo1 { public static void main(String[] args) { AtomicInteger count = new AtomicInteger(); Thread t1 = new Thread(() ->{ for (int i = 0; i { for (int i = 0; i < 5000; i++) { count.getAndIncrement(); } }); t1.start(); t2.start(); try { t1.join(); t2.join(); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(count.get()); } }
2)实现自旋锁
伪代码如下:
public class SpinLock{ private Thread owner = null; public void Lock(){ //通过CAS判断当前锁是否被某个线程持有 //如果已经被别的线程持有,则自旋等待 //如果锁为空,那么就把owner设置为当前尝试加锁的线程 while(!CAS(this.owner,null,Thread.currentThread())){ } } public void UnLock(){ this.owner = null; } }
3)CAS的典型问题:ABA问题
ABA问题发生在以下情况下:
线程A读取某个共享变量的值为A。
线程A被阻塞或者挂起,使得线程B有机会进行操作。
线程B将共享变量的值从A改为B,再改为A。
然后,线程A被唤醒,继续执行,通过CAS操作检查共享变量的值为A,不发现变化,认为共享变量没有被其他线程修改过。
在这个过程中,虽然共享变量的值经历了从A到B再到A的变化,但是对于线程A来说,它并没有意识到这个变化。这就是ABA问题。
为了解决ABA问题,Java提供了AtomicStampedReference类。它可以对引用对象进行原子操作,并且可以保留一个版本号,以避免ABA问题。AtomicStampedReference利用了标记来记录引用对象是否发生改变,当进行CAS操作时,除了要比较引用对象的值,还要比较标记的版本号。
Synchronized原理:
synchronized内部有一些优化机制,使得锁更加高效:
加锁过程可能会经历以下四个过程:
1)无锁
2)偏向锁
3)轻量级锁
4)重量级锁
synchronized()一开始并不会真的加锁,而是设置一个偏向锁(一个标记),当发生锁竞争时才会真正的进行加锁,此时synchronized是通过自旋锁的方式进行加锁的,如果迟迟拿不到锁,就会将自旋锁升级成为挂起等待锁。
编译器的智能判定,看当前代码是否真的需要加锁,如果不需要,程序员对其加锁了,那么编译器就会消除锁。
锁对象所包含的代码量的多少决定了锁的粗细,通常情况下,我们希望锁细一些比较好,但是如果几次加锁和解锁之间的间隙特别短的时候,就可以进行锁粗化,从而避免频繁的加锁解锁的开销。
认识Callable接口
Runnable接口用来描述一个任务,描述的任务没有返回值。
Callable接口也是用来描述一个任务,但是描述的任务具有返回值。
代码示例:
public class ThreadDemo1 { public static void main(String[] args) throws ExecutionException, InterruptedException { Callable callable = new Callable() { @Override public Integer call() throws Exception { int count = 0; for (int i = 0; i 0; 执行一次V操作,0 -> 1; 如果已经进行了一次P操作,继续进行P操作,则会进行阻塞等待,这类似于锁的操作,因此锁可以被视为一种特殊的信号量,二元信号量,信号量是锁的一搬表达。 public class ThreadDemo4 { public static void main(String[] args) throws InterruptedException { Semaphore semaphore = new Semaphore(3); semaphore.acquire(); System.out.println("执行了一次 P 操作!"); semaphore.acquire(); System.out.println("执行了一次 P 操作!"); semaphore.acquire(); System.out.println("执行了一次 P 操作!"); semaphore.release(); semaphore.acquire(); System.out.println("执行了一次 P 操作!"); } } 通过信号量也可以实现类似于锁的效果,具体使用根据业务场景确定。 认识 CountDownLatch 假设有一场跑步比赛,开始的时间是确定的,就是裁判信号枪响起的时刻,而结束的时间是不确定的,由最后完成比赛的一名选手来确定。 CountDownLatch 的构造方法,需要指定一个选手数; countDown()表示选手冲过了终点; await()方法由主线程来调用,表示所有选手均冲过了终点,只要有人没有完成比赛就一直等待。 多线程使用哈希表【重点、考点】 HashMap 是线程不安全的 HashTable 是线程安全的 不过,更推荐使用的是使用ConcurrentHashMap 经典面试题:ConcurrentHashMap 和 HashTable 相比,做了哪些优化???? 1. 最大的优化之处:ConcurrentHashMap 相比于 HashTable 大大减小了锁冲突的概率, HashTable 是直接在方法上加锁,相当于只要操作哈希表上的任意元素,都会产生加锁,极大的增加了锁冲突的可能性;ConcurrentHashMap 的做法是,每个链表都有自己的锁,也就是使用每个链表的头结点作为锁对象,以此减少锁冲突。 2. ConcurrentHashMap 做了一个激进的操作,只针对写操作加锁,而不对读操作加锁。 3..ConcurrentHashMap内部充分的使用了CAS,通过这个来进一步的削减加锁操作的数目。 4.针对扩容,采取了“化整为零”的策略,扩容时每次迁移一部分数据,而不是整个迁移。