并发编程 生产者和消费者模式
一个生产环境中、生产者和消费者在同一时间段内共享同一块缓冲区、生产者负责向缓冲区添加数据、消费者负责从缓冲区取出数据
卖汉堡 汉堡类
1 2 3 4 5 6 7 8 9 10 11 12 13 public class Hamburger { private int id; @Override public String toString () { return "Hamburger{" + "id=" + id + '}' ; } public Hamburger (int id) { this .id = id; } }
容器类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public class Container { public Hamburger[] array = new Hamburger[6 ]; public int index = 0 ; public synchronized void push (Hamburger hamburger) { while (index == array.length){ try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this .notify(); array[index] = hamburger; index++; System.out.println("生产了一个汉堡" + hamburger); } public synchronized Hamburger pop () { while (index == 0 ){ try { this .wait(); } catch (InterruptedException e) { e.printStackTrace(); } } this .notify(); index--; System.out.println("取出了一个汉堡" + array[index]); return array[index]; } }
生产者类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 public class Producer { private Container container; public Producer (Container container) { this .container = container; } public synchronized void producer () { for (int i = 0 ; i < 30 ; i++){ Hamburger hamburger = new Hamburger(i); this .container.push(hamburger); try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } } } }
消费者类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public class Consumer { private Container container; public Consumer (Container container) { this .container = container; } public void consumer () { for (int i = 0 ; i < 30 ; i++){ this .container.pop(); } try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); } } }
测试类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 public class Test { public static void main (String[] args) { Container container = new Container(); Producer producer = new Producer(container); Consumer consumer = new Consumer(container); new Thread(()->{ producer.producer(); }).start(); new Thread(()->{ consumer.consumer(); }).start(); } }
必须使用 while 判断,不能用 if,因为 if 会存在线程虚假唤醒的问题,虚假唤醒就是一些 wait 方法会在除了 notify 的其他情况被唤醒,不是真正的唤醒,使用 while 完成多重检测,避免这一问题
高并发 并发和并行的区别
并发(concurrency):多线程同时操作同一个资源、但并不是真正的同时操作、而是交替操作、单核CPU的情况下、资源按时间段分配给多个线程 并行(parallelism):是真正的多个线程同时执行、多核CPU、每个线程使用一个CPU资源来运行
高并发标准
QPS:每秒响应的HTTP请求数量 吞吐量:单位时间内处理的请求数 平均响应时间:系统对一个请求作出响应的评价时间 并发用户数:同时称在正常使用系统的用户人数 QPS = 并发数/平均响应时间
提高系统并发能力的方式 垂直扩展
提升单机的硬件设备 提升单机的架构性能
水平扩展
集群 分布式 站点层扩展 服务层扩展 数据层扩展
并发编程
一种描述使系统允许多个任务可以在重叠的时间段内执行的设计结构、不是指多个任务在同一时间段内执行、而是指系统具备处理多个任务在同一时间段内同时执行的能力
JUC
JDK提供给的一个工具包、用来帮助开发者完成Java并发编程
进程和线程 java默认线程数为2个
main主线程 GC垃圾回收机制
java本身无法开启线程、因为java无法操作硬件、只能通过调用本地方法
1 private native void start0 () ;
实现多线程的几种方法
继承Thread类 实现Runnable接口 实现Callable接口
实现Runnable接口和Callable接口的区别
Runnable的run方法没有返回值 Callable的call方法有返回值
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public class MyCallable implements Callable <String > { @Override public String call () throws Exception { System.out.println("这是Callable" ); return "Hello Callable" ; } } public static void main (String[] args) { MyCallable myCallable = new MyCallable(); FutureTask<String> future = new FutureTask(myCallable); Thread thread = new Thread(future); thread.start(); try { String value = future.get(); System.out.println(value); } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); } }
Callable流程分析 1 2 3 4 5 6 7 8 9 public interface Callable <V > { V call () throws Exception ; }
1 2 3 4 5 6 public FutureTask (Callable<V> callable) { if (callable == null ) throw new NullPointerException(); this .callable = callable; this .state = NEW; }
1 public class FutureTask <V > implements RunnableFuture <V > { }
1 2 3 4 5 6 7 public interface RunnableFuture <V > extends Runnable , Future <V > { void run () ; }
1 2 3 public Thread (Runnable target) { init(null , target, "Thread-" + nextThreadNum(), 0 ); }
sleep和wait
sleep:让当前线程休眠、不会释放锁 wait:让访问当前对象的线程休眠、会释放锁
synchronized锁
synchronized修饰非静态方法、锁定方法的调用者 synchronized修饰静态方法、锁定的是类 synchronized静态方法和实例方法同时存在、静态方法锁定的是类、实例方法锁定的是对象
synchronized锁与Lock锁的区别
synchronized 自动上锁,自动释放锁,Lock 手动上锁,手动释放锁 synchronized 无法判断是否获取到了锁,Lock 可以判断是否拿到了锁 synchronized 拿不到锁就会一直等待,Lock 不一定会一直等待 synchronized 是 Java 关键字,Lock 是接口 synchronized 是非公平锁,Lock 可以设置是否为公平锁
Lock锁
使用 Lock 锁、就不能通过 wait 和 notify 来暂停线程和唤醒线程、而应该使用 Condition 的 await 和 signal 来暂停和唤醒线程
ConcurrentModificationException并发访问异常 例
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 public static void main (String[] args) { List list = new ArrayList(); for (int i = 0 ; i < 10 ; i++) { new Thread(()->{ try { TimeUnit.MILLISECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); }finally { list.add("xiaobo" ); System.out.println(list); } }).start(); } }
解决方法 Vector 1 List list = new Vector();
List.add vector.add
Collections.synchronizedList 1 List<String> list = Collections.synchronizedList(new ArrayList<>());
JUC:CopyOnWriteArrayList 1 List<String> list = new CopyOnWriteArrayList<>();
CopyOnwrite写时复制、当我们向一个容器中添加元素时、不直接给容器添加、而是现将容器复制一份、向新的容器中添加数据、添加完成后、再将原来的容器引用指向新的容器
JUC 工具类 计数器
当两个线程同时执行时、如果要确保一个线程优先执行、可以使用计时器、让一个线程先执行、再让另一个线程执行
CountDownLatch减法计数器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public static void main (String[] args) { CountDownLatch countDownLatch = new CountDownLatch(100 ); new Thread(()->{ for (int i = 0 ; i < 100 ; i++) { System.out.println("---------------xiaobo" ); countDownLatch.countDown(); } }).start(); try { countDownLatch.await(); } catch (InterruptedException e) { e.printStackTrace(); }finally { new Thread(()->{ for (int i = 0 ; i < 100 ; i++) { System.out.println("wyb---------------" ); } }).start(); } }
new CountDownLatch(100)、coutDown()、await()需要联合使用、必须保证计数器清零、所以coutDown() 的调用次数必须大于等于构造函数的参数值
CyclicBarrier加法计数器 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 public static void main (String[] args) { CyclicBarrier cyclicBarrier = new CyclicBarrier(100 ,()->{ System.out.println("执行完毕、允许通过" ); }); for (int i = 0 ; i < 100 ; i++) { final int temp = i; new Thread(() -> { System.out.println("-->" + temp); try { cyclicBarrier.await(); } catch (InterruptedException e) { e.printStackTrace(); } catch (BrokenBarrierException e) { e.printStackTrace(); } }).start(); } }
CyclicBarrier设置一个值、await唤醒其他线程、当await唤醒的线程数达到Cyclicbarrier指定的值时、CyclicBarrier所设置的线程被执行 lambda表达式里面不能去访问没有final修饰的变量
Semaphore记数信号量
主要使用它来完成限流操作,限制可以访问某些资源的线程数量 三个操作:初始化 new Semaphore(5); 获取许可 semaphore.acquire(); 释放 semaphore.release();
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 public static void main (String[] args) { Semaphore semaphore = new Semaphore(5 ); for (int i = 0 ; i < 20 ; i++) { new Thread(()->{ try { semaphore.acquire(); System.out.println(Thread.currentThread().getName() + "开始访问" ); TimeUnit.SECONDS.sleep(2 ); System.out.println(Thread.currentThread().getName() + "------退出访问" ); } catch (InterruptedException e) { e.printStackTrace(); }finally { semaphore.release(); } }).start(); } }
首先获取信号量、获取到资源才可以执行、执行完毕后需要释放资源、留给下一个线程
读写锁
接口ReadwriteLock、实现类是ReentrantReadWriteLock 可以多线程同时读、但是同一时间内只能有一个线程进行写入操作 写入锁又称独占锁、只能被一个线程占用 读取锁又称共享锁、多个线程可以同时占用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 public class Cache { private Map<Integer,String> map = new HashMap<>(); private ReadWriteLock readWriteLock = new ReentrantReadWriteLock(); public void write (Integer key,String value) { readWriteLock.writeLock().lock(); System.out.println(key + "开始写入" ); map.put(key,value); System.out.println(key + "写入完毕" ); readWriteLock.writeLock().unlock(); } public void read (Integer key) { readWriteLock.readLock().lock(); System.out.println(key + "开始读取" ); map.get(key); System.out.println(key + "读取完毕" ); readWriteLock.readLock().unlock(); } } public static void main (String[] args) { Cache cache = new Cache(); for (int i = 0 ; i < 5 ; i++) { final int temp = i; new Thread(()->{ cache.write(temp,String.valueOf(temp)); }).start(); } for (int i = 0 ; i < 5 ; i++) { final int temp = i; new Thread(()->{ cache.read(temp); }).start(); } }
线程池
预先创建好一定数量的线程对象、存入缓冲池中、需要用的时候直接从缓冲池中取出、用完之后不要销毁、还回到缓冲池中、为了提高资源的利用率
优势
提高线程的利用率 提高响应速度 便于统一管理线程对象 可控制最大并发数
设计思想
核心池的大小 线程池的最大容量 等待队列 拒绝策略
基本流程
1、线程池初始化的时候创建一定数量的线程对象 2、如果缓冲池中没有空闲的线程对象、则新来的任务进入等待队列 3、如果缓冲池中没有空闲的线程对象、等待队列也已经填满、可以申请再创建一定数量的新线程对象、直到到达线程池的最大值、这时候如果还有新的任务进来、只能选择拒绝。
底层ThreadPoolExecutor
工具类的Executors 底层都是ThreadPoolExecutor
ThreadPoolExecutor核心参数(7个)
corePoolSize:核心池的大小 maximumPoolSize:线程池的最大容量 keepAliveTime:线程存活时间(在没有任务可执行的情况下)、必须是线程池中的数量大于corePoolSize、才生效 TimeUnit:存活时间单位 BlockingQueue:等待队列、存储等待执行的任务 ThreadFactory:线程工厂、用来创建线程对象 RejectedExecutionHandler:拒绝策略
4种拒绝策略
AbortPolicy:直接抛出异常 DiscardPolicy:放弃任务,不抛出异常 DiscardOldestPolicy:尝试与等待队列中最前面的任务去争夺,不抛出异常 CallerRunsPolicy:谁调用谁处理
JUC工具类提供的线程池 单例线程池 1 ExecutorService executorService = Executors.newSingleThreadExecutor();
指定数量线程池 1 ExecutorService executorService = Executors.newFixedThreadPool(5 );
缓存线程池 1 ExecutorService executorService = Executors.newCachedThreadPool();
自定义线程池 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 public static void main (String[] args) { ExecutorService executorService = null ; executorService = new ThreadPoolExecutor( 2 , 3 , 1L , TimeUnit.SECONDS, new ArrayBlockingQueue<>(2 ), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy() ); for (int i = 0 ; i < 6 ; i++) { executorService.execute(()->{ try { TimeUnit.SECONDS.sleep(1 ); } catch (InterruptedException e) { e.printStackTrace(); }finally { System.out.println(Thread.currentThread().getName() + "开始办理业务" ); } }); } executorService.shutdown(); }
Volatile关键字
Volatile 是 JVM 提供的轻量级同步机制、可见性、主内存对象线程可见
一个线程执行完任务之后,会把变量存回到主内存中,并且从主内存中读取当前最新的值,如果是一个空的任务,则不会重新读取主内存中的值
递归
函数直接或者间接调用函数本身
需要满足条件
一个父问题可以拆分成若干个子问题,并且若干子问题的结果汇总起来就是父问题的答案 父问题和子问题,解题思路必须完全一致,只是数据规模不同 存在终止条件、问题在不断拆分的同时,一定要在某个节点终止拆分
正确的开始、微小的长进、然后持续、嘿、我是小博、带你一起看我目之所及的世界……