线程池的正确打开方式

/ Java / 没有评论 / 0浏览
欢迎关注微信公众号:程序员小圈圈
原文首发于:www.zhangruibin.com
本文出自于:RebornChang的博客
转载请标明出处^_^

线程池的正确打开方式

线程,线程池,多线程,锁,老生常谈的知识点了,涉及很多知识,本节为笔者自己整理的知识点,方便学习记忆,分享出来仅供参考。

线程相关

线程与进程

线程是什么,进程是什么,这是很多初学者的疑问,当然老鸟大神请忽略,这里就简单的比喻下。 如果说把进程比作一条单向路,那线程就是这条路上的通道,比如高速公路单向四通道,可以看成单进程,四线程。 那么所谓的进程线程落到我们电脑硬件上是按什么划分的呢? 我们买电脑的时候,经常看到电脑参数配置里面有这样的介绍: 双核4线,八核16线之类的,这个核就是指的CPU数,几线,就是同时处理的最大线程数,说是八核十六线程的电脑,在使用起来可以达到最大并发16线程,但是,那只是使用起来,究其根本,还是执行的八核八线,只是说在处理速度跟运行效果上,可以达到使用级别的八核十六线,很简单的一个例子,电脑的任务管理器,试试看你的电脑能同时打开几个。

线程的创建和运行方式有几种

如果您是撸了两三年代码的老鸟,这里就可以忽略了。 笼统的来说,线程的创建方式分为四种。 笔者先说下自己知道的,然后具体分为几种,仁者见仁智者见智吧。 第一种,大家公认的,就是继承Thread类进行线程的创建; 第二种,也是公认的,就是实现Runnable接口; 第三种,实现Callable接口通过FutureTask包装器来创建Thread线程; 第四种,通过线程池创建线程;

//lambda写法
Executors.newFixedThreadPool(5).execute(() ->{});

线程的生命周期及线程运行状态分为几种

上面说了线程的创建方式,那这里就得赘述下现成的生命周期,和运行状态了。 首先说下,什么是线程的生命周期。 人的生命周期是出生到死亡,那线程的生命周期也是这样的,从线程的创建,到线程的销毁,就是一个完整的生命周期。说细一点,那就是:新建,就绪,运行,阻塞以及死亡。

线程池及参数支持

说过线程,那不可避免的深入点,就是线程池相关的知识了。

什么是线程池

线程池就是提前创建若干个线程,如果有任务需要处理,线程池里的线程就会处理任务,处理完之后线程并不会被销毁,而是等待下一个任务。由于创建和销毁线程都是消耗系统资源的,所以当你想要频繁的创建和销毁线程的时候就可以考虑使用线程池来提升系统的性能。 那线程池怎么创建呢?

//定义一个有5个线程的线程池

ExecutorService fixedTh = Executors.newFixedThreadPool(5);
ExecutorService singleTh = Executors.newSingleThreadExecutor();
ExecutorService cachedTh = Executors.newCachedThreadPool();

上面那些创建方式是最简单的,也是安全性和实用性最低的,当高并发的时候,容易引发OOM,因为那几种线程池的参数是固定的,作为一个代码人,我们要时刻 感觉,固定的,就是非最好的。 因为给定的参数并不一定适合我们自己的生产研发环境,所以,我们要自己设置参数。

线程池的常用参数及解释

参数有几个?截止到JDK1.8是7个常用参数:

  1. corePoolSize:线程池中核心线程数的最大值

  2. maximumPoolSize:线程池中能拥有最多线程数

  3. workQueue:用于缓存任务的阻塞队列我们现在通过向线程池添加新的任务来说明着三者之间的关系。 (1)如果没有空闲的线程执行该任务且当前运行的线程数少于corePoolSize,则添加新的线程执行该任务。 (2) 如果没有空闲的线程执行该任务且当前的线程数等于corePoolSize同时阻塞队列未满,则将任务入队列,而不添加新的线程。 (3)如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数小于maximumPoolSize,则创建新的线程执行任务。 (4)如果没有空闲的线程执行该任务且阻塞队列已满同时池中的线程数等于maximumPoolSize,则根据构造函数中的handler指定的策略来拒绝新的任务。 注意,线程池并没有标记哪个线程是核心线程,哪个是非核心线程,线程池只关心核心线程的数量。

  4. keepAliveTime:表示空闲线程的存活时间。

  5. TimeUnitunit:表示keepAliveTime的单位。

TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小时
TimeUnit.MINUTES;           //分钟
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微秒
TimeUnit.NANOSECONDS;       //纳秒
  1. workQueue 阻塞队列,存储等待执行的任务。参数有ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue可选。
  2. handler:表示当workQueue已满,且池中的线程数达到maximumPoolSize时,线程池拒绝添加新任务时采取的策略。
ThreadPoolExecutor.AbortPolicy:丢弃任务并抛出RejectedExecutionException异常。 
ThreadPoolExecutor.DiscardPolicy:也是丢弃任务,但是不抛出异常。 
ThreadPoolExecutor.DiscardOldestPolicy:丢弃队列最前面的任务,然后重新尝试执行任务(重复此过程)
ThreadPoolExecutor.CallerRunsPolicy:由调用线程处理该任务

线程池的几个参数都是通俗易懂的,这里我们着重说下java里面的队列,虽然很少用,但是的确是一个不可忽略的知识点。

Java 队列

什么是队列

Queue: 基本上,一个队列就是一个先入先出(FIFO)的数据结构与List、Set同一级别,都是继承了Collection接口。LinkedList实现了Deque接 口。

Java队列有几种

对于队列,我们将其划分为两种:非阻塞队列,以及阻塞队列。

队列的操作

这里说队列的操作,我们指的是阻塞队列的操作。 阻塞队列是在JDK1.5的时候开始加入,1.6的时候加入了LinkedBlockingDeque,到1.7的时候加入了LinkedTransferQueue。 上文所述的七种阻塞队列的五种都是在1.5的时候加入的,他们有公用的操作方法:

 * add 增加一个元素 如果队列已满,则抛出一个IIIegaISlabEepeplian异常  * remove 移除并返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常  * element 返回队列头部的元素 如果队列为空,则抛出一个NoSuchElementException异常

简易线城池的创建

第一种就是直接使用Executors进行创建,这种是最low的,当然,如果只是简单的使用没问题,不适合大规模高并发情况,容易引起OOM。

//定义一个有5个线程的线程池

ExecutorService fixedTh = Executors.newFixedThreadPool(5);
ExecutorService singleTh = Executors.newSingleThreadExecutor();
ExecutorService cachedTh = Executors.newCachedThreadPool();

第二种就是有参线城池的创建

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10,
        TimeUnit.SECONDS, blockingDeque, threadFactory, customRejectedExecutionHandler);

然后使用threadPoolExecutor.submit进行任务的提交。 我们所说的规范化管理线程池,就是使用的这种方式。

为什么要规范化管理线程及线程池

既然上面说了线程池的简单创建,以及线程池的参数,还有创建方式,那为什么要规范化管理线程及线程池呢? 简单的来说,就是为了生产排查以及方便调度还有更重要的,就是自定义可控性。 比如购物车线程池,就是单独的购物车线程池,跟订单线程池之类的名字和handle方式不同,这样当线程在某部分出现异常,通过自定义打印的日志,我们就可以知道在哪部分哪个线程因为什么出现了异常,这样我们就可以快速的对问题进行定位进而解决问题。 那上面那些线程池的创建形式我们就不用了,我们用下面的这种。

线程池的打开方式 (参考)

这里笔者使用一个demo,多线程读取Excel来演示怎样做一个比较晚上的线城池管理: 1.消费者线程管理CustomThreadFactory.java

import java.util.concurrent.ThreadFactory;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * @ClassName CustomThreadFactory
 * @Description TODO
  * @Author zhrb
 * @Date 2019/9/20 16:01
 * @Version
  */ public class CustomThreadFactory implements ThreadFactory{
    private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
    private final ThreadGroup group;
    private final AtomicInteger threadNumber = new AtomicInteger(1);
    private final String namePrefix;

    public CustomThreadFactory() {
        SecurityManager securityManager = System.getSecurityManager();
        group = (securityManager != null) ? securityManager.getThreadGroup() :
                Thread.currentThread().getThreadGroup();
        namePrefix = "custom-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
    }

    //自定义创建线程方式
  @Override
  public Thread newThread(Runnable r) {
        Thread t = new Thread(group,r,
                namePrefix + threadNumber.getAndIncrement(),0 );
        //判断是否是守护进程
  if (t.isDaemon()){
            t.setDaemon(false);
        }
        //判断线程优先级
  if (t.getPriority() != Thread.NORM_PRIORITY){
            t.setPriority(Thread.NORM_PRIORITY);
        }
        return t;
    }
}

2.自定义异常处理逻辑类CustomRejectedExecutionHandler.java

import  lombok.extern.slf4j.Slf4j;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * @ClassName CustomRejectedExecutionHandler
 * @Description TODO
  * @Author zhrb
 * @Date 2019/9/20 16:16
 * @Version
  */ @Slf4j public class CustomRejectedExecutionHandler implements RejectedExecutionHandler {

    @Override
  public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        log.error("向线程中添加任务被拒绝...");
        //1.当前线程执行
  r.run();

        //2.异常抛出
  throw new RejectedExecutionException("Task " + r.toString() +
        " rejected from" + executor.toString());

    }
}

3.多线程写入Excel,使用CyclicBarrier TestMyselfThreadPool.java

import com.zhrb.testDemo.thread.poiReadExcel.ContentValueEntity;
import com.zhrb.testDemo.thread.poiReadExcel.PoiWrite;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.ss.usermodel.WorkbookFactory;

import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.*;

/**
 * @ClassName TestMyselfThreadPool
 * @Description TODO
  * @Author zhrb
 * @Date 2019/9/20 16:20
 * @Version
  */ public class TestMyselfThreadPool {
    private final static String fileName = "163672.xls";

    private final static String pathName = "C:\\Users\\Administrator\\Desktop\\";

    private final static File file = new File(pathName + fileName);

    public static void main(String[] args) {
        //factory形式
  CustomThreadFactory threadFactory = new CustomThreadFactory();
        //异常处理逻辑
  CustomRejectedExecutionHandler customRejectedExecutionHandler = new CustomRejectedExecutionHandler();
        //工作队列形式,要定义有界的,否则线程量多的话会OOM
  BlockingDeque<Runnable> blockingDeque = new LinkedBlockingDeque<>();
        //创建线程池管理
  ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(4,8,10,
                TimeUnit.SECONDS, blockingDeque, threadFactory, customRejectedExecutionHandler);
        //使用自定义线程池去读取文件
  operaterExcel2(threadPoolExecutor);
    }

    private static void operaterExcel2(ThreadPoolExecutor es){

        FileInputStream fileInputStream = null;
        Workbook wb = null;
        Sheet sheet = null;
        //创建的线程数
  int threadNum = 4;
        //存放最终结果的map
  final Map<String,List<ContentValueEntity>> map = new HashMap<>(threadNum);
        // 使用线程池进行线程管理
 //final ExecutorService es = Executors.newCachedThreadPool();    try {
            //把文件读入输入流
  fileInputStream = new FileInputStream(file);
            /**创建工作簿,这一个对象代表着对应的一个Excel文件
 * 用 WorkbookFactory 创建workbook * 就 不用判断 2003(xls)(HSSFWorkbook) * 与2007(slsx)(XSSFWorkbook) * */  wb = WorkbookFactory.create(fileInputStream);
            //读取sheet
  sheet = wb.getSheetAt(0);
            //第一行数据(第一行是列名,所以开始行+1)
  int first = sheet.getFirstRowNum()+1;
            //最后一行
  int last = sheet.getLastRowNum();
            //全部的行数
  int totalNum = last - first + 1;
            //每个线程处理的行数
  int numOfThread = totalNum/threadNum;
            // int lastThreadNum = totalNum - (threadNum-1)*numOfThread;   //获取开始时间  final long startTime=System.currentTimeMillis();

            /**回环栅栏,通过它可以实现让一组线程等待至某个状态之后再全部同时执行。
 叫做回环是因为当所有等待线程都被释放以后,CyclicBarrier可以被重用 *  假若有若干个线程都要进行写数据操作, *  并且只有所有线程都完成写数据操作之后, *  这些线程才能继续做后面的事情, *  此时就可以利用CyclicBarrier了: */  CyclicBarrier cyclicBarrier = new CyclicBarrier(threadNum, new Runnable() {
                @Override
  public void run() {
                    //当所有线程执行完毕后,便执行此方法
  List<ContentValueEntity> list1 = map.get("1");
                    List<ContentValueEntity> list2 = map.get("2");
                    List<ContentValueEntity> list3 = map.get("3");
                    List<ContentValueEntity> list4 = map.get("4");
                    List<ContentValueEntity> combineList = new ArrayList<>();
                    combineList.addAll(list1);
                    combineList.addAll(list2);
                    combineList.addAll(list3);
                    combineList.addAll(list4);
                    //获取结束时间
  long endTime=System.currentTimeMillis();
                    //程序运行时间
  System.out.println("程序运行时间: "+(endTime-startTime)+"ms");

                    System.out.println("ResultList====="+combineList);

                    es.shutdown();
                }
            });
            //这里提交线程去执行,有两个方法,1、submit 2、excuse
  Future<Map<String,List<ContentValueEntity>>> data1 = es.submit(new PoiWrite(cyclicBarrier, sheet, 1, numOfThread,"1",
                    map));
            Future<Map<String,List<ContentValueEntity>>> data2 = es.submit(new PoiWrite(cyclicBarrier, sheet, numOfThread+1, 2*numOfThread,"2",
                    map));
            Future<Map<String,List<ContentValueEntity>>> data3 = es.submit(new PoiWrite(cyclicBarrier, sheet, 2*numOfThread+1, 3*numOfThread,"3",
                    map));
            Future<Map<String,List<ContentValueEntity>>> data4 = es.submit(new PoiWrite(cyclicBarrier, sheet, 3*numOfThread+1, totalNum,"4",
                    map));
        }  catch (Exception e){
            e.printStackTrace();
        }
    }
}

4.PoiWrite.java

import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.CyclicBarrier;

/**
 * @ClassName PoiWrite
 * @Author zhrb
 * @Date 2018/11/20 上午10:53
 */ public class PoiWrite implements Callable {

    private CyclicBarrier cyclicBarrier;
    private Sheet sheet;
    private int start;
    private int end;
    private String key;
    private Map<String,List<ContentValueEntity>> map;

    public PoiWrite(CyclicBarrier cyclicBarrier, Sheet sheet, int start, int end, String key, Map<String, List<ContentValueEntity>> map) {
        this.cyclicBarrier = cyclicBarrier;
        this.sheet = sheet;
        this.start = start;
        this.end = end;
        this.key = key;
        this.map = map;
    }

    @Override
  public Map<String, List<ContentValueEntity>> call() throws Exception {
        List<ContentValueEntity> list = new ArrayList<>();
        for (int rowNum = this.start;rowNum<=this.end;rowNum++){
            Row row = sheet.getRow(rowNum);
            if (row == null){
                continue;
            }
            ContentValueEntity entity = new ContentValueEntity();
            for (int cellNum = 0;cellNum<=5;cellNum++){
                Cell cell = row.getCell(cellNum);
                if (cell == null){
                    continue;
                }
                if (cellNum == 0){
                    entity.setNowTime(cell.getDateCellValue());
                }else if (cellNum == 1){
                    entity.setSecondValue((int) cell.getNumericCellValue());
                }else if (cellNum == 2){
                    entity.setThirdValue(cell.getStringCellValue());
                }else if (cellNum == 3){
                    entity.setFourthValue(cell.getBooleanCellValue());
                }else if (cellNum == 4){
                    entity.setFivthValue((int) cell.getNumericCellValue());
                }else{
                    entity.setFourthValue(cell.getBooleanCellValue());
                }
            }
            list.add(entity);
        }
        this.map.put(key,list);
        /**线程或者任务等待至barrier状态
 *         */
  this.cyclicBarrier.await();
        return this.map;
    }
}

这篇幅已经有点长了,多线程的三种执行方式,CyclicBarrier,CountDownLatch,Semaphore,这里就不多说了,有兴趣的同学可以自行研究,可能笔者后面的文章会写,敬请期待。