java - JUC
1.什么是JUC?
java.util.concurrent
在并发编程中使用的工具类
2.线程 和 进程
进程:一个程序,QQ.exe Music.exe 程序的集合
一个进程往往可以包含多个线程,至少包含一个!
java 默认有几个线程?两个 main GC(垃圾回收)
线程:开了一个进程 Typora ,写字,自动保存(线程负责)
对于java而言:Thread,Runnable,Callable 方式实现多线程
java 不可以开启线程,无法操作硬件,因为它是运行在虚拟机中的,java 是通过本地方法 利用 C 开启线程的。
并发,并行
并发:(多线程操作同一个资源)
- CPU 一核,模拟出来多条线程,天下武功,为快不破,快速交替
并行:(多个人一起走)
- CPU 多核,多个线程可以同时执行
并发是两个队列交替使用一台咖啡机,并行是两个队列同时使用两台咖啡机,如果串行,一个队列使用一台咖啡机。并发和并行都可以是很多个线程,就看这些线程能不能同时被(多个)cpu执行,如果可以说明是并行,而并发是多个线程被(一个)cpu 轮流切换着执行。
public static void main(String[] args) {// 获取CPU的核数 System.out.println(Runtime.getRuntime().availableProcessors());}
并发编程的本质:充分利用CPU的资源
线程的状态
public enum State { //新生NEW,//运行RUNNABLE, //阻塞BLOCKED,//等待,死死的等WAITING, //超时等待TIMED_WAITING, //终止TERMINATED; }
3.Lock 锁
传统 synchronized
public class Test1 { public static void main(String[] args) { Ticket ticket = new Ticket(); new Thread(() -> { for (int i = 0; i < 60; i ) { ticket.sale(); } }).start(); new Thread(() -> { for (int i = 0; i < 60; i ) { ticket.sale(); } }).start(); new Thread(() -> { for (int i = 0; i < 60; i ) { ticket.sale(); } }).start(); }}class Ticket { private int number = 30; public synchronized void sale() { if (number > 0) { System.out.println(Thread.currentThread().getName() "卖出了" (number--) "票,剩余:" number); } }}
Lock 接口
方法:
- lock() 加锁
- unlock() 解锁
实现类:
- ReentrantLock 可重入锁(常用)
- ReentrantReadWriteLock.ReadLock 读锁
- ReentrantReadWriteLock.WriteLock 写锁
ReentrantLock 类
公平锁:十分公平;先来后到
非公平锁:十分不公平;可以插队
public class Test1 { public static void main(String[] args) { Ticket ticket = new Ticket(); new Thread(() -> { for (int i = 0; i < 60; i ) { ticket.sale(); } }).start(); new Thread(() -> { for (int i = 0; i < 60; i ) { ticket.sale(); } }).start(); new Thread(() -> { for (int i = 0; i < 60; i ) { ticket.sale(); } }).start(); }}class Ticket { private int number = 30; Lock lock = new ReentrantLock(); public void sale() {// 加锁 lock.lock(); try { if (number > 0) { System.out.println(Thread.currentThread().getName() "卖出了" (number--) "票,剩余:" number); } } catch (Exception e) { e.printStackTrace(); } finally {// 解锁 lock.unlock(); } }}
synchronized 和 Lock 区别
- synchronized 内置关键字,Lock 是一个 Java类
- synchronized 无法判断获取锁的状态,Lock 可以判断是否获取到锁
- synchronized 会自动释放锁,Lock 必须要手动解锁!如果不释放,死锁
- synchronized 线程1(获得锁,阻塞),线程2(等待,傻傻的等);Lock 锁就不一定会等待下去
- synchronized 可重入锁,不可以中断的,非公平;Lock,可重入锁,不可以中断的,非公平(可以自己设置);
- synchronized 适合锁少量的代码同步问题;Lock 锁适合锁大量的同步代码!
锁是什么,如何判断锁的是谁?后面会回答
4.生产者 和 消费者 问题
synchronized 版本
// 生产者 消费者问题public class A { public static void main(String[] args) { Data data = new Data(); new Thread(() -> { for (int i = 0; i < 10; i ) { data.increment(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i ) { data.decrement(); } }, "B").start(); }}class Data { private int number = 0; public synchronized void increment() { // 关键点,这里应该使用while循环 if (number != 0) {// 等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } number ; System.out.println(Thread.currentThread().getName() "==>" number);// 通知 this.notifyAll(); } public synchronized void decrement() { // 关键点,这里应该使用while循环 if (number == 0) {// 等待 try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } number--; System.out.println(Thread.currentThread().getName() "==>" number);// 通知 this.notifyAll(); }}
问题存在,有A,B,C,D 多个线程呢?虚假唤醒问题
什么是虚假唤醒
假设当前有4个线程分别为A,B,C,D,其中A,B线程是生产者,C,D线程是消费者,当A和B线程生产了一个数据后就去通知消费者去消费,C和D消费掉这一个数据后就通知生产者去生产,数据的大小为1。也就是说正常情况下,数据只会有0和1两种状态,0表示生产者该生产数据了,1表示消费者该消费数据了。
出现问题的场景是这样的:当data为1的时候,线程A和B先后获取锁去生产数据的时候会被阻塞住,然后消费者C或者D消费掉数据后去notifyAll()唤醒了线程A和B,被唤醒的A和B没有再次去判断data状态,就去执行后续增加数据的逻辑了,导致两个生产者都执行了increment(),最终data出现了2这种情况。也就是说线程A和B有一个是不应该被唤醒的却被唤醒了,出现这个问题的关键点在于程序中使用到了if判断,只判断了一次data的状态,应该使用while循环去判断。
JUC 版的生产者和消费者
通过 Lock 找到Condition子类
线程之间除了同步互斥,还要考虑通信。在Java5之前我们的通信方式为:wait 和 notify。那么Condition的优势是支持多路等待,就是我可以定义多个Condition,每个condition控制线程的一条执行通路。传统方式只能是一路等待。传统方式只能是一路等待。我们可以先分析下Java5 Api中的缓冲队列的实现:
假定有一个绑定的缓冲区,它支持 put 和 take 方法。如果试图在空的缓冲区上执行take 操作,则在某一个项变得可用之前,线程将一直阻塞;如果试图在满的缓冲区上执行put 操作,则在有空间变得可用之前,线程将一直阻塞。我们喜欢在单独的等待 set 中保存put 线程和take 线程,这样就可以在缓冲区中的项或空间变得可用时利用最佳规划,一次只通知一个线程。可以使用两个Condition 实例来做到这一点。
class BoundedBuffer { final Lock lock = new ReentrantLock();//实例化一个锁对象 final Condition notFull = lock.newCondition(); //实例化两个condition final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100];//初始化一个长度为100的队列 int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock();//获取锁 try { while (count == items.length) notFull.await();//当计数器count等于队列的长度时,不能在插入,因此等待 items[putptr] = x; //将对象放入putptr索引处 if ( putptr == items.length) putptr = 0;//当索引长度等于队列长度时,将putptr置为0 //原因是,不能越界插入 count;//没放入一个对象就将计数器加1 notEmpty.signal();//一旦插入就唤醒取数据线程 } finally { lock.unlock();//最后释放锁 } } public Object take() throws InterruptedException { lock.lock();//获取锁 try { while (count == 0) //如果计数器等于0那么等待 notEmpty.await(); Object x = items[takeptr]; //取得takeptr索引处对象 if ( takeptr == items.length) takeptr = 0;//当takeptr达到队列长度时,从零开始取 --count;//每取一个讲计数器减1 notFull.signal();//枚取走一个就唤醒存线程 return x; } finally { lock.unlock();//释放锁 } }}
Condition
Condition 实现精准通知唤醒
//A 执行完调用B,B执行完调用C,C执行完调用Apublic class C { public static void main(String[] args) { Data3 data = new Data3(); new Thread(() -> { for (int i = 0; i < 10; i ) { data.printA(); } }, "A").start(); new Thread(() -> { for (int i = 0; i < 10; i ) { data.printB(); } }, "B").start(); new Thread(() -> { for (int i = 0; i < 10; i ) { data.printC(); } }, "C").start(); }}class Data3 { private Lock lock = new ReentrantLock(); private Condition condition1 = lock.newCondition(); private Condition condition2 = lock.newCondition(); private Condition condition3 = lock.newCondition(); private int number = 1; //1A 2B 3C public void printA() { lock.lock(); try { while (number != 1) {// 等待 condition1.await(); } System.out.println(Thread.currentThread().getName() "=>AAAAAAAAAAAA");// 唤醒B,唤醒指定的人,B number = 2; condition2.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printB() { lock.lock(); try { while (number != 2) { condition2.await(); } System.out.println(Thread.currentThread().getName() "=>BBBBBBBBBB"); number = 3; condition3.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } } public void printC() { lock.lock(); try { while (number != 3) { condition3.await(); } System.out.println(Thread.currentThread().getName() "=>CCCCCCCCCCC"); number = 1; condition1.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); } }}
5. 8锁问题
如何判断锁的是谁!永远的知道什么锁,锁到底锁的是谁!
8锁,就是关于锁的8个问题,深刻理解我们的锁
/*1,标准情况下,两个线程先打印 发短信还是打电话? */public class Test { public static void main(String[] args) { Phone phone = new Phone(); new Thread(() -> { phone.sendSms(); }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone.call(); }).start(); }}class Phone { // synchronized 锁的对象是方法的调用者 //两个方法用的是同一个锁,谁先拿到,谁先执行 public synchronized void sendSms() { System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); }}
/*2,sendSms 延迟4秒,两个线程先打印 发短信还是打电话? */public class Test { public static void main(String[] args) { Phone phone = new Phone(); new Thread(() -> { phone.sendSms(); }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone.call(); }).start(); }}class Phone { // synchronized 锁的对象是方法的调用者 //两个方法用的是同一个锁,谁先拿到,谁先执行,sendSms方法先拿到,call就会被锁住 public synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); }}
/*3,增加了一个普通方法,先发短信还是hello? */public class Test { public static void main(String[] args) { Phone phone = new Phone(); new Thread(() -> { phone.sendSms(); }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone.hello(); }).start(); }}class Phone { // synchronized 锁的对象是方法的调用者 //两个方法用的是同一个锁,谁先拿到,谁先执行 public synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } //hello 方法没有 synchronized,不受锁影响 public void hello() { System.out.println("hello"); }}
/*4.两个对象,两个同步方法,先执行哪个? */public class Test { public static void main(String[] args) {// 4.两个对象 Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(() -> { phone1.sendSms(); }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone2.call(); }).start(); }}class Phone { // synchronized 锁的对象是方法的调用者 //两个方法用的是同一个锁,谁先拿到,谁先执行 public synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public synchronized void call() { System.out.println("打电话"); }}
/*5.增加两个静态同步方法,先执行哪个? */public class Test { public static void main(String[] args) { Phone phone = new Phone(); new Thread(() -> { phone.sendSms(); }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone.call(); }).start(); }}class Phone { // 静态方法,类一加载就有了!Class模板。锁的是Class,只有一个 public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public static synchronized void call() { System.out.println("打电话"); } }
/*6.两个对象,也是之前的两个静态方法,先执行哪个? */public class Test { public static void main(String[] args) {// 4.两个对象的class 模板只有一个 Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(() -> { phone1.sendSms(); }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone2.call(); }).start(); }}class Phone { // synchronized 锁的对象是方法的调用者 //两个方法用的是同一个锁,谁先拿到,谁先执行 public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); } public static synchronized void call() { System.out.println("打电话"); }}
/*7.一个普通同步方法,一个静态同步方法,先执行哪个? */public class Test { public static void main(String[] args) { Phone phone = new Phone(); new Thread(() -> { phone.sendSms(); }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone.call(); }).start(); }}class Phone {// 静态同步方法 public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); }// 普通同步方法 public synchronized void call() { System.out.println("打电话"); }}
/*8.两个对象,一个普通同步方法,一个静态同步方法,先执行哪个? */public class Test { public static void main(String[] args) { Phone phone1 = new Phone(); Phone phone2 = new Phone(); new Thread(() -> { phone1.sendSms(); }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } new Thread(() -> { phone2.call(); }).start(); }}class Phone {// 静态同步方法 public static synchronized void sendSms() { try { TimeUnit.SECONDS.sleep(4); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("发短信"); }// 普通同步方法 public synchronized void call() { System.out.println("打电话"); }}
总结
- 重点要判断锁的是谁。
- synchronized 锁的对象是方法的调用者(方法不是静态的)
- synchronized 锁的对象是Class模板,只有唯一一个Class对象。(方法是静态的)
- 普通方法没有锁
- 普通同步方法 与 静态同步方法 锁的对象是不同的
6. 不安全集合类
List 不安全
public class ListTest { public static void main(String[] args) {// 异常:ConcurrentModificationException 并发修改异常// 并发下的 ArrayList 不安全 /* * 解决方法: * 1,List<String> list = new Vector<>(); * 2,Collections.synchronizedList(); * 3,JUC包的new CopyOnWriteArrayList<>() * */// CopyOnWriteArrayList 写入时复制 COW 计算机程序设计领域的一种优化策略// 多个线程调用时,list,读取时,固定的,写入(覆盖)// 在写入的时候地面覆盖,造成数据问题// CopyOnWriteArrayList 相对于 Vector 的优势:// CopyOnWriteArrayList 使用 Lock ,效率高;Vector 使用synchronized 效率较低。 List<String> list = new CopyOnWriteArrayList<>(); for (int i = 1; i <= 10; i ) { new Thread(() -> { list.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(list); }, String.valueOf(i)).start(); } }}
Set 不安全
public class SetTest { public static void main(String[] args) {// Set<String> set = new HashSet<>();// Set<String> set = Collections.synchronizedSet(new HashSet<>()); Set<String> set = new CopyOnWriteArraySet<>(); for (int i = 0; i < 10; i ) { new Thread(() -> { set.add(UUID.randomUUID().toString().substring(0, 5)); System.out.println(set); }, String.valueOf(i)).start(); } }}
HashSet 底层是什么?
public HashSet(){map = new HashMap<>();}// add set 本质就是map ,key是无法重复的public boolean add(E e) {return map.put(e, PRESENT)==null;}
Map 不安全
public class MapTest { public static void main(String[] args) {// map 是这样使用的么? 高并发下,工作中不用 HashMap// 默认等价于什么? new HashMap<>(16,0.75f);// Map<String, String> map = new HashMap<>(16,0.75f);// Map<String, String> map = new HashMap<>();// Map<String, String> map = Collections.synchronizedMap(new HashMap<>()); Map<String,String> map = new ConcurrentHashMap<>(); for (int i = 0; i < 30; i ) { new Thread(() -> { map.put(Thread.currentThread().getName(), UUID.randomUUID().toString().substring(0, 5)); System.out.println(map); }, String.valueOf(i)).start(); } }}
7. Callable
1,可以有返回值
2,可以抛出异常
3,方法不同 call()
public class CallableTest { public static void main(String[] args) throws ExecutionException, InterruptedException {// 怎么启动 Callable?// new Thread(new Runnable()).start();// new Thread(new FutureTask<V>()).start();// new Thread(new FutureTask<V> (Callable )).start(); MyThread thread = new MyThread();// 适配类 FutureTask<Integer> futureTask = new FutureTask<>(thread); new Thread(futureTask,"A").start(); new Thread(futureTask,"B").start(); //结果会被缓存,效率高// 获取 Callable 返回结果 Integer integer = futureTask.get(); //这个 get1 方法可能会产生阻塞!把他放到最后 System.out.println(integer); }}class MyThread implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("call"); return 1024; }}
细节:
- 有缓存
- 结果可能需要等待,会阻塞!
8.常用辅助类
8.1 CountDownLatch
//计数器public class CountDownLatchDemo { public static void main(String[] args) throws InterruptedException { CountDownLatch countDownLatch = new CountDownLatch(6);// 必须要执行任务的时候,再使用 for (int i = 1; i <= 6; i ) { new Thread(() -> { System.out.println(Thread.currentThread().getName() "Go out"); countDownLatch.countDown(); //-1 }, String.valueOf(i)).start(); } countDownLatch.await(); //等待计数器归零 System.out.println("Close Door!"); }}
原理:
countDownLatch.countDown();
//数量减一countDownLatch.await();
// 等待计数器 归零每次有线程调用 countDown() ,数量减一,假设计数器变为零,
countDownLatch.await()
就会被唤醒,继续执行countDownLatch
这个类使一个线程等待其他线程各自执行完毕后再执行。
是通过一个计数器来实现的,计数器的初始值是线程的数量。每当一个线程执行完毕后,计数器的值就-1,当计数器的值为0时,表示所有线程都执行完毕,然后在闭锁上等待的线程就可以恢复工作了。
8.2 CyclicBarrier
现实生活中我们经常会遇到这样的情景,在进行某个活动前需要等待人全部都齐了才开始。例如吃饭时要等全家人都上座了才动筷子,旅游时要等全部人都到齐了才出发,比赛时要等运动员都上场后才开始。
加法计数器
public class CyclicBarrierTest { public static void main(String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(7, () -> { System.out.println("召唤神龙成功!"); }); for (int i = 1; i <= 7; i ) {// lambda 能操作到 i 么?不能,lambda 本质是 类,除非变量设置为 final final int temp = i; new Thread(() -> { System.out.println(Thread.currentThread().getName() "收集" temp "个龙珠"); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } }}
等到有七个线程的时候,再执行 CyclicBarrier 中设置的方法
8.3 Semaphore
Semaphore : 信号量
public class SemaphoreDemo { public static void main(String[] args) {// 线程数量:停车位! 限流! Semaphore semaphore = new Semaphore(3); for (int i = 1; i <= 6; i ) { new Thread(() -> { try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() "抢到车位"); TimeUnit.SECONDS.sleep(2); System.out.println(Thread.currentThread().getName() "离开车位"); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } },String.valueOf(i)).start(); } }}
原理:
- semaphore.acquire(); 获得,假设如果已经满了,等待,等待被释放为止
- semaphore.release(); 释放,会将当前的信号量释放,然后唤醒等待的线程
作用:多个共享资源互斥使用!并发限流,控制最大的线程数!
9. 读写锁
/* * 独占锁(写锁):一次只能被一个线程占有 * 共享锁(读锁) 多个线程可以同时占有 * ReadWriteLock * 读- 读 可以共存 * 读 - 写 可以共存 * 写 - 写 不能共存 */public class ReadWriteLockDemo { public static void main(String[] args) { MyCache1 cache = new MyCache1(); // 写入 for (int i = 1; i <= 5; i ) { final int temp = i; new Thread(() -> { cache.put(temp "", temp ""); }, String.valueOf(i)).start(); }// 读取 for (int i = 1; i <= 5; i ) { final int temp = i; new Thread(() -> { cache.get(temp ""); }, String.valueOf(i)).start(); } }}// 加锁的情况class MyCache1 { private volatile Map<String, Object> map = new HashMap<>(); // 读写锁,更加细粒度的控制 private ReentrantReadWriteLock readWriteLock = new ReentrantReadWriteLock(); // 写入的时候,只希望同时只有一个线程写 public void put(String key, Object value) { readWriteLock.writeLock().lock(); try { System.out.println(Thread.currentThread().getName() "写入" key); map.put(key, value); System.out.println(Thread.currentThread().getName() "写入完毕"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.writeLock().unlock(); } } // 读的时候可以多个线程一起读 public void get(String key) { readWriteLock.readLock().lock(); try { System.out.println(Thread.currentThread().getName() "读取" key); Object o = map.get(key); System.out.println(Thread.currentThread().getName() "读取完毕"); } catch (Exception e) { e.printStackTrace(); } finally { readWriteLock.readLock().unlock(); } }}
10.阻塞队列
阻塞 队列
- 写入:如果队列满了,就必须阻塞等待
- 取出:如果队列是空的,就必须阻塞等待生产
BlockingQueue:阻塞队列
什么情况下我们会使用 阻塞队列:多线程并发处理,线程池
学会使用队列
添加,移除
四组API
方式 | 抛出异常 | 有返回值 | 阻塞 等待 | 超时等待 |
---|---|---|---|---|
添加 | add | offer | put | offer(data,) |
移除 | remove | poll | take | poll(,) |
判断队列队首 | element | peek |
每组 API 的特点:
- 抛出异常
- 不会抛出异常
- 阻塞等待
- 超时等待
10.1 BlockingQueue 四组 API
/** 抛出异常*/public class Test { public static void main(String[] args) { test1(); } public static void test1() {// 队列大小 ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3); System.out.println(blockingQueue.add("a")); System.out.println(blockingQueue.add("b")); System.out.println(blockingQueue.add("c"));// IllegalStateException 抛出异常。Queue full// System.out.println(blockingQueue.add("d"));// 查看队首元素 System.out.println(blockingQueue.element()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove()); System.out.println(blockingQueue.remove());// 抛出异常 NoSuchElementException// System.out.println(blockingQueue.remove()); }}
/** 有返回值,不抛出异常*/public class Test { public static void main(String[] args) { test2(); } public static void test2(){ ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3); System.out.println(blockingQueue.offer("a")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("b")); System.out.println(blockingQueue.offer("d")); // 队列满了,返回结果为false,不抛出异常 // 取出队首元素 System.out.println(blockingQueue.peek()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll());//队列空。返回结果为true,不抛出异常 }}
// 等待,阻塞(一直阻塞)public class Test { public static void main(String[] args) throws InterruptedException {// test1();// test2(); test3(); }// 等待,阻塞(一直阻塞) private static void test3() throws InterruptedException { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3); blockingQueue.put("a"); blockingQueue.put("b"); blockingQueue.put("c"); blockingQueue.put("d"); //队列满了,会一直等待 blockingQueue.take(); blockingQueue.take(); blockingQueue.take(); blockingQueue.take(); // 队列空,会一直等待 }}
// 等待,阻塞(等待超时)public class Test { public static void main(String[] args) throws InterruptedException { test4(); } // 等待,阻塞(等待超时) private static void test4() throws InterruptedException { ArrayBlockingQueue blockingQueue = new ArrayBlockingQueue(3); blockingQueue.offer("a"); blockingQueue.offer(""); blockingQueue.offer("c"); blockingQueue.offer("c", 2, TimeUnit.SECONDS); //等待超过2秒就退出 System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); System.out.println(blockingQueue.poll()); blockingQueue.poll(2,TimeUnit.SECONDS); //等待超过2秒就退出 }}
SynchronousQueue 同步队列
没有容量,必须等待取出来之后,才能再往里面放元素
put,take
//同步队列public class SynchronousQueueDemo { public static void main(String[] args) { SynchronousQueue<String> blockingQueue = new SynchronousQueue<>(); //同步队列 new Thread(() -> { try { System.out.println(Thread.currentThread().getName() " PUT 1"); blockingQueue.put("1"); System.out.println(Thread.currentThread().getName() " PUT 2"); blockingQueue.put("2"); System.out.println(Thread.currentThread().getName() " PUT 3"); blockingQueue.put("3"); } catch (InterruptedException e) { e.printStackTrace(); } }, "T1").start(); new Thread(() -> { try { TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() "=>" blockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() "=>" blockingQueue.take()); TimeUnit.SECONDS.sleep(3); System.out.println(Thread.currentThread().getName() "=>" blockingQueue.take()); } catch (InterruptedException e) { e.printStackTrace(); } }, "T2").start(); }}
11.线程池
池化技术
线程池:三大方法,七大参数,4种拒接策略
程序的运行,本质:占用系统的资源!优化资源的使用!=> 池化技术
线程池,连接池,内存池,对象池(创建,销毁,十分耗内存)
池化技术:事先准备好一些资源,有人要用,就来我这里拿,用完之后还给我
线程池的好处:
- 降低资源的的消耗
- 提高响应速度
- 方便管理
线程可以复用,可以控制最大并发数,管理线程
线程池:三大方法
//Executors 工具类//使用了线程池之后,使用线程池来创建线程public class Demo1 { public static void main(String[] args) {// Executors.newSingleThreadExecutor();//单个线程// Executors.newFixedThreadPool(5); //固定大小的线程池// Executors.newCachedThreadPool(); //可伸缩的 ExecutorService service = Executors.newSingleThreadExecutor();//单个线程 try { for (int i = 0; i < 10; i ) { service.execute(() -> { System.out.println(Thread.currentThread().getName() " ok"); }); } } finally {// 线程池使用完,程序结束,关闭线程池 service.shutdownNow(); } ExecutorService service1 = Executors.newFixedThreadPool(5);//单个线程 try { for (int i = 0; i < 10; i ) { service1.execute(() -> { System.out.println(Thread.currentThread().getName() " ok"); }); } } finally {// 线程池使用完,程序结束,关闭线程池 service1.shutdownNow(); } ExecutorService service2 = Executors.newCachedThreadPool();//单个线程 try { for (int i = 0; i < 100; i ) { service2.execute(() -> { System.out.println(Thread.currentThread().getName() " ok"); }); } } finally {// 线程池使用完,程序结束,关闭线程池 service2.shutdownNow(); } }}
七大参数
源码分析
public static ExecutorService newSingleThreadExecutor() {return new FinalizableDelegatedExecutorService(new ThreadPoolExecutor(1, 1,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>()));}public static ExecutorService newFixedThreadPool(int nThreads) {return new ThreadPoolExecutor(nThreads, nThreads,0L, TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());}public static ExecutorService newCachedThreadPool() {return new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>());}本质:ThreadPoolExecutor public ThreadPoolExecutor(int corePoolSize, // 核心线程池大小int maximumPoolSize, // 最大核心线程池大小long keepAliveTime, // 超时了没有调用就会释放TimeUnit unit, // 超时单位BlockingQueue<Runnable> workQueue, //阻塞队列ThreadFactory threadFactory, //线程工厂:创建线程的,一般不用动RejectedExecutionHandler handler) { // 拒绝策略 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler;}
手动创建线程池
public class Demo1 { public static void main(String[] args) {// 自定义线程池 ExecutorService threadPool = new ThreadPoolExecutor( 2, 5, 3, TimeUnit.SECONDS, new LinkedBlockingQueue<>(3), Executors.defaultThreadFactory(), /** * AbortPolicy 银行满了,还有人进来,不处理这个人的,抛出异常 * CallerRunsPolicy 哪来的去哪里 * DiscardPolicy 银行满了,还有人进来,不处理这个人的,不抛出异常 * DiscardOldestPolicy 队列满了,尝试去和最早的竞争,也不会抛出异常 */ new ThreadPoolExecutor.DiscardOldestPolicy()); try {// 最大承载:Deque max。再大就会抛出异常 RejectedExecutionException for (int i = 0; i < 8; i ) { threadPool.execute(() -> { System.out.println(Thread.currentThread().getName() " ok"); }); } } catch (Exception e) { e.printStackTrace(); } finally { threadPool.shutdownNow(); } }}
四种拒接策略
/*** AbortPolicy 银行满了,还有人进来,不处理这个人的,抛出异常* CallerRunsPolicy 满了,哪来的去哪里。在旧线程里要创建新线程,返回旧线程,不会创建新的* DiscardPolicy 银行满了,还有人进来,不处理这个人的,不抛出异常* DiscardOldestPolicy 队列满了,尝试去和最早的竞争,也不会抛出异常*/
小结 和 扩展
了解:CPU密集型 和 IO 密集型
/** 线程池最大线程到底该怎么设置?* 1,cpu 密集型 12核 --> 12 个线程 (几核的,就是几)* 2,IO 密集型 判断你的程序中十分耗IO的线程*/// 获取Cpu 核数System.out.println(Runtime.getRuntime().availableProcessors());
CPU 密集型
CPU密集型也叫计算密集型,指的是系统的硬盘、内存性能相对CPU要好很多,此时,系统运作大部分的状况是CPU Loading 100%,CPU要读/写I/O(硬盘/内存),I/O在很短的时间就可以完成,而CPU还有许多运算要处理,CPU Loading很高。IO 密集型
IO密集型指的是系统的CPU性能相对硬盘、内存要好很多,此时,系统运作,大部分的状况是CPU在等I/O (硬盘/内存) 的读/写操作,此时CPU Loading并不高。
12.四大函数式接口
新时代程序员必须掌握:lambda表达式,链式编程,函数式接口,Stream流式计算
函数式接口:只有一个方法的接口;例如:
@FunctionalInterfacepublic interface Runnable { public abstract void run();}//java中有超级多FunctionalInterface//简化编程模型,在新版本的框架底层大量应用//new ArrayList<>().forEach(消费者类的函数式接口);
Function 函数式接口
/* * Function:函数型接口,有一个输入参数,一个输入 * 只要是 函数型接口 可以 用Lambda表达式简化 */public class Demo1 { public static void main(String[] args) { // 工具类:输出输入的集合// Function function = new Function<String, String>() {// @Override// public String apply(String s) {// return s;// }// };// Function<String, String> function = (s) -> { return s;}; Function<String, String> function = s -> { return s;}; System.out.println(function.apply("aaa")); }}
Predicate 断定型函数接口
/* * 断定型接口:有一个输入参数,返回值是 boolean值 */public class Demo2 { public static void main(String[] args) {// 判断是否为空// Predicate predicate = new Predicate<String>() {// @Override// public boolean test(String s) {// return s.isEmpty();// }// };// Predicate<String> predicate = (s) -> {return s.isEmpty();}; Predicate<String> predicate = s -> {return s.isEmpty();}; System.out.println(predicate.test("")); System.out.println(predicate.test("11")); }}
Consumer 消费型函数接口
/* * Consumer:消费型接口:只有输入,没有返回值 */public class demo3 { public static void main(String[] args) {// 只有输入,没有返回值// Consumer<String> consumer = new Consumer<String>() {// @Override// public void accept(String s) {// System.out.println(s);// }// };// Consumer<String> consumer = (s) -> { System.out.println(s); }; Consumer<String> consumer = s -> { System.out.println(s); }; consumer.accept("sss"); }}
Supplier 供给型函数接口
/* * Supplier :供给型函数接口:无输入,有返回值 */public class demo4 { public static void main(String[] args) {// Supplier<String> supplier = new Supplier<String>() {// @Override// public String get() {// return "null";// }// }; Supplier<String> supplier = () -> { return "s"; }; System.out.println(supplier.get()); }}
13. Stream 流式计算
什么是Stream 流式计算?
大数据时代重点:存储 计算
集合,MySQL 本质是存储东西的;
计算都应该交给流来操作!
public class Test { /* * 题目要求: * 现在有5个用户!筛选 * 1,ID 必须是偶数 * 2,年龄必须大于23岁 * 3,用户名转为大写字母 * 4,用户名字母倒着排序 * 5,只输出一个用户 * */ public static void main(String[] args) { User u1 = new User(1, "a", 21); User u2 = new User(2, "b", 22); User u3 = new User(3, "c", 23); User u4 = new User(4, "d", 24); User u5 = new User(6, "e", 25);// 集合就是管存储的 List<User> users = Arrays.asList(u1, u2, u3, u4, u5);// 计算交给 Stream 流 users.stream().filter(u -> { return u.getId() % 2 == 0; }).filter(u -> { return u.getAge() > 23; }).map(u -> { return u.getName().toUpperCase(); }).sorted((uu1, uu2) -> { return uu2.compareTo(uu1); }).limit(1).forEach(System.out::println); }}
Stream
Stream 是 Java8 中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行非常复杂的查找、过滤和映射数据等操作。Stream API 提供了一种高效且易于使用的处理数据的方式。
特点:
1 . 不是数据结构,不会保存数据。
2. 不会修改原来的数据源,它会将操作后的数据保存到另外一个对象中。
3. 惰性求值,流在中间处理过程中,只是对操作进行了记录,并不会立即执行,需要等到执行终止操作的时候才会进行实际的计算。
14. ForkJoin
什么是 ForkJoin
ForkJoin 在 JDK1.7 ,并行执行任务!效率高。大数据量
类似于大数据:Map Reduce(把大任务拆分为小)
ForkJoin特点:工作窃取
这个里面维护的都是双端队列
/* * 如何使用 forkJoin * 1,forkJoinPool 通过它来执行 * 2,计算任务 forkJoinPool.execute(ForkJoinTask task) * 3,计算类要继承 ForkJoinTask */public class ForkJoinDemo extends RecursiveTask<Long> { private Long start; private Long end;// 临界值 private Long temp = 10000L; public ForkJoinDemo(Long start, Long end) { this.start = start; this.end = end; } @Override protected Long compute() { if ((end - start) < temp) { Long sum = 0L; for (Long i = start; i < end; i ) { sum = i; }// System.out.println(sum); return sum; } else { //递归 long middle = (start end) / 2; //中间值 ForkJoinDemo task1 = new ForkJoinDemo(start, middle); task1.fork(); // 拆分任务,把任务压入线程队列 ForkJoinDemo task2 = new ForkJoinDemo(middle 1, end); task2.fork(); // 拆分任务,把任务压入线程队列 return task1.join() task2.join(); } }}
public class Test { public static void main(String[] args) throws ExecutionException, InterruptedException { test3(); }// 低级的计算方式,效率满 public static void test1() { long sum = 0L; long start = System.currentTimeMillis(); for (long i = 1L; i <= 10_0000_0000; i ) { sum = i; } long end = System.currentTimeMillis(); System.out.println("sum=" sum "时间:" (end - start)); }// 使用 ForkJoinPool 计算。效率提升 public static void test2() throws ExecutionException, InterruptedException { long start = System.currentTimeMillis(); ForkJoinPool forkJoinPool = new ForkJoinPool(); ForkJoinTask<Long> task = new ForkJoinDemo(0L, 10_0000_0000L); ForkJoinTask<Long> submit = forkJoinPool.submit(task); //提交任务,有结果返回 Long sum = submit.get(); long end = System.currentTimeMillis(); System.out.println("sum=" sum "时间:" (end - start)); }// 使用Stream 并行流,效率最高 public static void test3() { long start = System.currentTimeMillis();// Stream 并行流// IntStream// DoubleStream long sum = LongStream.rangeClosed(0L, 10_0000_0000L).parallel().reduce(0, Long::sum); long end = System.currentTimeMillis(); System.out.println("sum=" sum "时间:" (end - start)); }}
15.异步回调
CompletableFuture
// 异步调用 类似 Ajax// 成功回调// 失败回调public class demo1 { public static void main(String[] args) throws ExecutionException, InterruptedException {// 没有返回值的 runAsync 异步回调 CompletableFuture<Void> completableFuture = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread().getName() "runAsync"); }); System.out.println("1111"); completableFuture.get(); // 获取执行结果// 有返回值的 supplyAsync CompletableFuture<Integer> objectCompletableFuture = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread().getName() "runAsync => Integer"); return 1024; }); System.out.println(objectCompletableFuture.whenComplete((t, u) -> { System.out.println("t ==> " t); //正常的返回结果 System.out.println("u ==> " u); //错误信息 }).exceptionally(e -> { System.out.println(e.getMessage()); return 233; //可以获取到错误的返回结果 }).get()); }}
16.JMM
请你谈谈对 Volatile 的理解
Volatile 是 Java 虚拟机提高 轻量级的同步机制
- 保证可见性
- 不保证原子性
- 禁止指令重排
什么是JMM?
JMM : Java 内存模型,不存在的东西,概念,约定!
关于 JMM 的一些同步的约定:
- 线程解锁前,必须把共享变量立刻刷回主存
- 线程加锁前,必须读取主存中的最新值到工作内存中!
- 加锁和解锁是同一把锁
线程 工作内存,主内存
8种操作:
- lock (锁定):作用于主内存的变量,把一个变量标识为线程独占状态
- unlock (解锁):作用于主内存的变量,它把一个处于锁定状态的变量释放出来,释放后的变量才可以被其他线程锁定
- read (读取):作用于主内存变量,它把一个变量的值从主内存传输到线程的工作内存中,以便随后的load动作使用
- load (载入):作用于工作内存的变量,它把read操作从主存中变量放入工作内存中
- use (使用):作用于工作内存中的变量,它把工作内存中的变量传输给执行引擎,每当虚拟机遇到一个需要使用到变量的值,就会使用到这个指令
- assign (赋值):作用于工作内存中的变量,它把一个从执行引擎中接受到的值放入工作内存的变量副本中
- store (存储):作用于主内存中的变量,它把一个从工作内存中一个变量的值传送到主内存中,以便后续的write使用
- write (写入):作用于主内存中的变量,它把store操作从工作内存中得到的变量的值放入主内存的变量中
JMM对这八种指令的使用,制定了如下规则:
- 不允许read和load、store和write操作之一单独出现。即使用了read必须load,使用了store必须write
- 不允许线程丢弃他最近的assign操作,即工作变量的数据改变了之后,必须告知主存
- 不允许一个线程将没有assign的数据从工作内存同步回主内存
- 一个新的变量必须在主内存中诞生,不允许工作内存直接使用一个未被初始化的变量。就是怼变量实施use、store操作之前,必须经过assign和load操作
- 一个变量同一时间只有一个线程能对其进行lock。多次lock后,必须执行相同次数的unlock才能解锁
- 如果对一个变量进行lock操作,会清空所有工作内存中此变量的值,在执行引擎使用这个变量前,必须重新load或assign操作初始化变量的值
- 如果一个变量没有被lock,就不能对其进行unlock操作。也不能unlock一个被其他线程锁住的变量
- 对一个变量进行unlock操作之前,必须把此变量同步回主内存
//死循环,另一个线程不知道main线程的 num 值发生改变public class JMMDemo { private static int num = 0; public static void main(String[] args) { new Thread(() -> { while (num == 0){ } }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } num = 1; System.out.println(num); }}
问题:程序不知道主内存的值已经被修改过了!
17.Volatile
1,保证可见性
public class JMMDemo { // 不加 volatile 程序就会死循环,另一个线程感知不到num的变化// 加了 volatile 可以保证可见性 private volatile static int num = 0; public static void main(String[] args) { new Thread(() -> { while (num == 0){ } }).start(); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } num = 1; System.out.println(num); }}
2,不保证原子性
原子性:不可分割
线程A在执行任务的时候,不能被打扰的,也不能被分割。要么同时成功,要么同时失败
public class VolatileAtomDemo { // volatile不保证原子性 // 原子性:保证数据一致性、完整性 volatile int number = 0; public void addPlusPlus() { number ; } public static void main(String[] args) { VolatileAtomDemo volatileAtomDemo = new VolatileAtomDemo(); for (int j = 0; j < 20; j ) { new Thread(() -> { for (int i = 0; i < 1000; i ) { volatileAtomDemo.addPlusPlus(); } }, String.valueOf(j)).start(); } // 后台默认两个线程:一个是main线程,一个是gc线程 while (Thread.activeCount() > 2) { Thread.yield(); } // 如果volatile保证原子性的话,最终的结果应该是20000 // 但是每次程序执行结果都不等于20000 System.out.println(Thread.currentThread().getName() "\tfinal number result = " volatileAtomDemo.number); }}
volatile不保证原子性原理分析
number 操作对应的字节码:
number 被拆分成3个指令
执行GETFIELD拿到主内存中的原始值number
执行IADD进行加1操作
执行PUTFIELD把工作内存中的值写回主内存中
当多个线程并发执行PUTFIELD指令的时候,会出现写回主内存覆盖问题,所以才会导致最终结果不为20000,volatile不能保证原子性。
使用原子类,解决 原子性问题
public class demo2 { // volatile 不保证原子性// 原子类的 Integer private volatile static AtomicInteger num = new AtomicInteger(); public static void add() { // num ; AtomicInteger的 1方法 num.getAndIncrement(); //不是一个原子性操作 } public static void main(String[] args) { for (int i = 0; i < 20; i ) { new Thread(() -> { for (int i1 = 0; i1 < 1000; i1 ) { add(); } }).start(); } while (Thread.activeCount() > 2) { //main gc// 使当前线程由执行状态,变成为就绪状态,让出cpu时间,// 在下一个线程执行时候,此线程有可能被执行,也有可能没有被执行。 Thread.yield(); } System.out.println(Thread.currentThread().getName() " " num); }}
方法前加synchronized解决
public synchronized void addPlusPlus() { number ;}
加锁解决
// 使用锁保证数据原子性Lock lock = new ReentrantLock();public void addPlusPlus() { lock.lock(); number ; lock.unlock();}
这些类的底层都直接和操作系统挂钩!在内存中修改值!Unsafe类是一个很特殊的类的存在!
指令重排
什么是 指令重排:你写的程序,计算机并不是按照你写的那样去执行的。
源代码 --> 编译器优化的重排 -->指令并行也可能会重排 --> 内存系统也会重排 --> 执行
处理器在进行指令重排的时候,考虑:数据之间的依赖性
int x = 1; //1int y = 2; //2x = x 5; //3y = y * x; //4我们所期望的:1234 但是有可能会 2134 1324,可不可能 是 4123? 不会
volatile 可以避免指令重排
内存屏障:CPU指令。作用:
- 保证特定的操作的执行顺序
- 可以保证某些变量的内存可见性(利用这些特性 volatile 实现了可见性)
Volatile 是可以保持可见性,不能保证原子性,由于内存屏障,可以保证避免指令重排的现象产生!
18. 彻底玩转单例模式
饿汉式 DCL懒汉式,深究!
饿汉式
//饿汉式单例public class Hungry { private Hungry() { } // 可能会浪费空间 private byte[] data1 = new byte[1024 * 1024]; private byte[] data2 = new byte[1024 * 1024]; private byte[] data3 = new byte[1024 * 1024]; private byte[] data4 = new byte[1024 * 1024]; private final static Hungry HUNGRY = new Hungry(); public static Hungry getInstance() { return HUNGRY; }}
懒汉式
//懒汉式单例// 并发下有问题public class LazyMan { private LazyMan() { } private static LazyMan lazyMan; public static LazyMan getInstance() { if (lazyMan == null) { lazyMan = new LazyMan(); } return lazyMan; }}
// 改进后的懒汉式public class LazyMan { private LazyMan() { }// 避免指令重排 private volatile static LazyMan lazyMan;// 双重检查锁模式的 懒汉式单例 DCL懒汉式 public static LazyMan getInstance() { if (lazyMan == null) { synchronized (LazyMan.class) { if (lazyMan == null) { lazyMan = new LazyMan(); /* * 1,分配内存空间 * 2,执行构造方法,初始化对象 * 3,把这个对象指向这个空间 */ } } } return lazyMan; }}
public class LazyMan { //加密,防止破环单例 private static boolean qinjiang = false; private LazyMan() {// 避免别人使用反射破坏单例模式 synchronized (LazyMan.class) { if (qinjiang == false) { qinjiang = true; } else { throw new RuntimeException("不要试图使用反射破坏异常!"); } } } // 避免指令重排 private volatile static LazyMan lazyMan; // 双重检查锁模式的 懒汉式单例 DCL懒汉式 public static LazyMan getInstance() { if (lazyMan == null) { synchronized (LazyMan.class) { if (lazyMan == null) { lazyMan = new LazyMan(); /* * 1,分配内存空间 * 2,执行构造方法,初始化对象 * 3,把这个对象指向这个空间 */ } } } return lazyMan; } // 反射可以破坏单例! public static void main(String[] args) throws Exception { Constructor<LazyMan> declaredConstructor = LazyMan.class.getDeclaredConstructor(null); declaredConstructor.setAccessible(true); LazyMan lazyMan2 = declaredConstructor.newInstance(); Field qinjiang = LazyMan.class.getDeclaredField("qinjiang"); qinjiang.set(lazyMan2, false); System.out.println(lazyMan2); }}
静态内部类
public class Holder { private Holder() { } public static Holder getInstance() { return InnerClass.HOLDER; } public static class InnerClass { private static final Holder HOLDER = new Holder(); }}
单例不安全,反射可以破解单例模式
19.深入理解 CAS
什么是 CAS?
CAS是英文单词Compare and Swap的缩写,翻译过来就是比较并替换。
CAS机制中使用了3个基本操作数:内存地址V,旧的预期值A,要修改的新值B。
更新一个变量的时候,只有当变量的预期值A和内存地址V当中的实际值相同时,才会将内存地址V对应的值修改为B。如果不是就一直循环
Unsafe 类
CAS 缺点:
- 循环会耗时
- 一次性只能保证一个共享变量的原子性
- ABA 问题
CAS:ABA问题(狸猫换太子)
在 Unsafe 这个类中,它用 cas 保证原子性问题,但同时也引发了新的问题;
ABA,一句话,狸猫换太子,举个例子。(V,内存值,A旧的预期值,B,要求个更新值);
举例:
有两个线程,同时操作一个变量,线程1执行时间比线程2执行时间长,线程2执行快
线程1读取值,此时读到的值是A,这时候线程被挂起,
线程2也读到值,并将A修改为X,然后又做了操作,X又改为Z,最后又将Z改为A;线程2交出执行权;
线程1此时拿到执行权了,此时进行compareAndSwap,发现内存值和期望值是一样,于是正常执行,
但是内存值在这期间已经被操作过;
这就是ABA问题;
public class CASDemo { //CAS compareAndSet : 比较并交换! public static void main(String[] args) { AtomicInteger atomicInteger = new AtomicInteger(2020);// 期望,更新// public final boolean compareAndSet(int expect, int update)// 如果我期望的值达到了,那么就更新,否则,就不更新,CAS 是CPU的并发原语!// ========== 捣乱的线程 ============ atomicInteger.compareAndSet(2020, 2021); System.out.println(atomicInteger.get()); atomicInteger.compareAndSet(2021, 2020); System.out.println(atomicInteger.get());// ========== 期望的线程 ============ atomicInteger.compareAndSet(2021, 6666); System.out.println(atomicInteger.get()); }}
20.原子引用
解决ABA问题,引入原子引用!对应的思想,乐观锁
Integer 使用了对象缓存机制,默认范围是 -128 ~ 127,推荐使用静态工厂方法 valueOf 获取对象实例,而不是 new ,因为 valueOf 使用缓存,而 new 一定会创建新的对象分配新的内存空间(超过默认范围,就一定会分配新的内存空间)
解决ABA最简单的方案就是给值加一个修改版本号,每次值变化,都会修改它版本号,CAS操作时都对比此版本号。
public class CASDemo { //CAS compareAndSet : 比较并交换! public static void main(String[] args) {// AtomicInteger atomicInteger = new AtomicInteger(2020);// AtomicStampedReference 注意:如果泛型是一个包装类,注意对象的引用问题 AtomicStampedReference<Integer> atomicInteger = new AtomicStampedReference<>(1, 1); new Thread(() -> { int stamp = atomicInteger.getStamp(); //获得版本号 System.out.println("a1=>" stamp); // try {// TimeUnit.SECONDS.sleep(2);// } catch (InterruptedException e) {// e.printStackTrace();// }// 如果参数太大,比如 2020,那么就会出错,因为Integer 缓存的问题,2020 和 2020 不是同一个对象 atomicInteger.compareAndSet(1, 2, atomicInteger.getStamp(), atomicInteger.getStamp() 1); System.out.println("a2=>" atomicInteger.getStamp()); atomicInteger.compareAndSet(2, 1, atomicInteger.getStamp(), atomicInteger.getStamp() 1); System.out.println("a3=>" atomicInteger.getStamp()); }, "a").start(); //乐观锁的原理相同 new Thread(() -> { int stamp = atomicInteger.getStamp(); //获得版本号 System.out.println("b1=>" stamp); try { TimeUnit.SECONDS.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } atomicInteger.compareAndSet(1,6, atomicInteger.getStamp(),atomicInteger.getStamp() 1); System.out.println("b2=>" atomicInteger.getStamp()); }, "b").start(); }}
21. 各种锁的理解
21.1 公平锁,非公平锁
公平锁 指在分配锁前检查是否有线程在排队等待获取该锁,优先将锁分配给排队时间最长的线程
非公平锁 指在分配锁时不考虑线程排队等待的情况,直接尝试获取锁,在获取不到锁时再排到队尾等待
因为公平锁需要在多核的情况下维护一个锁线程等待队列,基于该队列进行锁的分配,因此效率比非公平锁低很多,java中的synchronized时非公平锁,ReentranLock默认的lock方法采用的时非公平锁。
public ReentrantLock() {sync = new NonfairSync();}public ReentrantLock(boolean fair) {sync = fair ? new FairSync() : new NonfairSync();}
21.2 可重入锁
不可重入锁
所谓不可重入锁,即若当前线程执行某个方法已经获取了该锁,那么在方法中尝试再次获取锁时,就会获取不到被阻塞。我们尝试设计一个不可重入锁:
public class Lock{ private boolean isLocked = false; public synchronized void lock() throws InterruptedException{ while(isLocked){ wait(); } isLocked = true; } public synchronized void unlock(){ isLocked = false; notify(); } }
使用该锁:
public class Count{ Lock lock = new Lock(); public void print(){ lock.lock(); doAdd(); lock.unlock(); } public void doAdd(){ lock.lock(); //do something lock.unlock(); }}
当前线程执行print()方法首先获取lock,接下来执行doAdd()方法就无法执行doAdd()中的逻辑,必须先释放锁。这个例子很好的说明了不可重入锁。
可重入锁(递归锁)
ReentrantLock翻译过来为可重入锁,它的可重入性表现在同一个线程可以多次获得锁,而不同线程依然不可多次获得锁。我们设计一种可重入锁
public class Lock{ boolean isLocked = false; Thread lockedBy = null; int lockedCount = 0; public synchronized void lock() throws InterruptedException{ Thread thread = Thread.currentThread(); while(isLocked && lockedBy != thread){ wait(); } isLocked = true; lockedCount ; lockedBy = thread; } public synchronized void unlock(){ if(Thread.currentThread() == this.lockedBy){ lockedCount--; if(lockedCount == 0){ isLocked = false; notify(); } } }}
所谓可重入,意味着线程可以进入它已经拥有的锁的同步代码块儿。
我们设计两个线程调用print()方法,第一个线程调用print()方法获取锁,进入lock()方法,由于初始lockedBy是null,所以不会进入while而挂起当前线程,而是是增量lockedCount并记录lockBy为第一个线程。接着第一个线程进入doAdd()方法,由于同一进程,所以不会进入while而挂起,接着增量lockedCount,当第二个线程尝试lock,由于isLocked=true,所以他不会获取该锁,直到第一个线程调用两次unlock()将lockCount递减为0,才将标记为isLocked设置为false
21.3 自旋锁
自旋锁(spin lock)是一种非阻塞锁,也就是说,如果某线程需要获取锁,但该锁已经被其他线程占用时,该线程不会被挂起,而是在不断的消耗CPU的时间,不停的试图获取锁。
自旋锁是计算机科学用于多线程同步的一种锁,线程反复检查锁变量是否可用。由于线程在这一过程中保持执行,因此是一种忙等待。
自旋锁避免了进程上下文的调度开销,因此对于线程只会阻塞很短时间的场合是有效的。
public class SpinlockDemo { AtomicReference<Thread> atomicReference = new AtomicReference<>();// 加锁 public void myLock(){ Thread thread = Thread.currentThread(); System.out.println(thread.getName() "=> myLock");// 自旋锁。一直获取锁 while (!atomicReference.compareAndSet(null,thread)){ } }// 解锁 public void myUnLock(){ Thread thread = Thread.currentThread(); System.out.println(thread.getName() "=> myUnLock"); atomicReference.compareAndSet(thread,null); }}
过多占用CPU的资源,如果锁持有者线程A一直长时间的持有锁处理自己的逻辑,那么这个线程B就会一直循环等待过度占用cpu资源
递归使用可能会造成死锁,不过这种场景一般写不出来
21.4 死锁
死锁是什么?
死锁测试,如何排除死锁
public class DeaLock { public static void main(String[] args) { Makeup g1 = new Makeup(0, "灰姑凉"); Makeup g2 = new Makeup(1, "白雪公主"); g1.start(); g2.start(); }}class Lipstick {}class Mirror {}class Makeup extends Thread { static Lipstick lipstick = new Lipstick(); static Mirror mirror = new Mirror(); int choice; String girlName; Makeup(int choice, String girlName) { this.choice = choice; this.girlName = girlName; } @Override public void run() { try { makeup(); } catch (InterruptedException e) { e.printStackTrace(); } } private void makeup() throws InterruptedException { if (choice == 0) { synchronized (lipstick) { System.out.println(this.girlName "获得口红的锁"); Thread.sleep(1000); synchronized (mirror) { System.out.println(this.girlName "获得镜子的锁"); } } } else { synchronized (mirror) { System.out.println(this.girlName "获得镜子的锁"); Thread.sleep(1000); synchronized (lipstick) { System.out.println(this.girlName "获得镜口号的锁"); } } } }}
解决问题
- 使用
jps -l
定位进程号 - 使用
jstack 进程号
查看进程信息