V2EX = way to explore
V2EX 是一个关于分享和探索的地方
现在注册
已注册用户请  登录
• 请不要在回答技术问题时复制粘贴 AI 生成的内容
yanhomlin
V2EX  ›  程序员

学到就是赚到,面试加分项之 WebServer 线程池管理!

  •  
  •   yanhomlin ·
    yanhom1314 · 2022-08-16 10:18:55 +08:00 · 1726 次点击
    这是一个创建于 859 天前的主题,其中的信息可能已经有所发展或是发生改变。

    大家好,这篇文章我们来讨论一个话题,怎么去管理 SpringBoot 内置的三大 WebServer ( Tomcat 、Jetty 、Undertow )的线程池,包括监控告警、动态调参。

    不管是应对越来越卷的面试考察,还是自己项目日常性能调优,绝对有用,学到就是赚到,搬好椅子,开始我们的分析之旅。


    写在前面

    要想去管理第三方组件的线程池,首先肯定要对这些组件有一定的熟悉度,了解整个请求的一个处理过程,找到对应处理请求的线程池,这些线程池不一定是 JUC 包下的 ThreadPoolExecutor 类,也可能是组件自己实现的线程池,但是基本原理都差不多。

    Tomcat 、Jetty 、Undertow 这三个 WebServer 都是这样,他们并没有直接使用 JUC 提供的线程池实现,而是自己实现了一套,或者扩展了 JUC 的实现。翻源码找到相应的目标线程池后,然后看有没有暴露 public 方法供我们调用获取,如果没有就需要考虑通过反射来获取了。

    下面我们来简单分析下这三大 WebServer 的线程池内部实现。


    Tomcat 内部线程池的实现

    • Tomcat 内部线程池没有直接使用 JUC 下的 ThreadPoolExecutor ,而是选择继承 JUC 下的 Executor 体系类,然后重写 execute() 等方法,不同版本有差异。

    1.继承 JUC 原生 ThreadPoolExecutor ( 9.0.50 版本及以下),并覆写了一些方法,主要 execute() 和 afterExecute()

    2.继承 JUC 的 AbstractExecutorService ( 9.0.51 版本及以上),代码基本是拷贝 JUC 的 ThreadPoolExecutor ,也相应的微调了 execute() 方法执行流程

    注意 Tomcat 实现的线程池类名称也叫 ThreadPoolExecutor ,名字跟 JUC 下的是一样的,Tomcat 的 ThreadPoolExecutor 类 execute() 方法如下:

    public void execute(Runnable command, long timeout, TimeUnit unit) {
            submittedCount.incrementAndGet();
            try {
                super.execute(command);
            } catch (RejectedExecutionException rx) {
                if (super.getQueue() instanceof TaskQueue) {
                    final TaskQueue queue = (TaskQueue)super.getQueue();
                    try {
                        if (!queue.force(command, timeout, unit)) {
                            submittedCount.decrementAndGet();
                            throw new RejectedExecutionException(sm.getString("threadPoolExecutor.queueFull"));
                        }
                    } catch (InterruptedException x) {
                        submittedCount.decrementAndGet();
                        throw new RejectedExecutionException(x);
                    }
                } else {
                    submittedCount.decrementAndGet();
                    throw rx;
                }
    
            }
        }
    

    可以看出他是先调用父类的 execute()方法,然后捕获 RejectedExecutionException 异常,再去判断如果任务队列类型是 TaskQueue ,则尝试将任务添加到任务队列中,如果添加失败,证明队列已满,然后再执行拒绝策略,此处 submittedCount 是一个原子变量,记录提交到此线程池但未执行完成的任务数(主要在下面要提到的 TaskQueue 队列的 offer()方法用),为什么要这样设计呢?继续往下看!

    • Tomcat 定义了阻塞队列 TaskQueue 继承自 LinkedBlockingQueue ,该队列主要重写了 offer()方法。
     @Override
        public boolean offer(Runnable o) {
            //we can't do any checks
            if (parent==null) return super.offer(o);
            //we are maxed out on threads, simply queue the object
            if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
            //we have idle threads, just add it to the queue
            if (parent.getSubmittedCount()<=(parent.getPoolSize())) return super.offer(o);
            //if we have less threads than maximum force creation of a new thread
            if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
            //if we reached here, we need to add it to the queue
            return super.offer(o);
        }
    

    可以看到他在入队之前做了几个判断,这里的 parent 就是所属的线程池对象

    1.如果 parent 为 null ,直接调用父类 offer 方法入队

    2.如果当前线程数等于最大线程数,则直接调用父类 offer()方法入队

    3.如果当前未执行的任务数量小于等于当前线程数,仔细思考下,是不是说明有空闲的线程呢,那么直接调用父类 offer()入队后就马上有线程去执行它

    4.如果当前线程数小于最大线程数量,则直接返回 false ,然后回到 JUC 线程池的执行流程回想下,是不是就去添加新线程去执行任务了呢

    5.其他情况都直接入队

    • 因为 Tomcat 线程池主要是来做 IO 任务的,做这一切的目的主要也是为了以最小代价的改动更好的支持 IO 密集型的场景,JUC 自带的线程池主要是适合于 CPU 密集型的场景,可以回想一下 JUC 原生线程池 ThreadPoolExecutor#execute()方法的执行流程

    1.判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务

    2.如果当前线程数大于核心线程数且队列没满,则将任务放入任务队列等待执行

    3.如果当前当前线程池数大于核心线程池,小于最大线程数,且任务队列已满,则创建新的线程执行提交的任务

    4.如果当前线程数等于最大线程数,且队列已满,则拒绝该任务

    可以看出当当前线程数大于核心线程数时,JUC 原生线程池首先是把任务放到队列里等待执行,而不是先创建线程执行。

    如果 Tomcat 接收的请求数量大于核心线程数,请求就会被放到队列中,等待核心线程处理,这样会降低请求的总体处理速度,所以 Tomcat 并没有使用 JUC 原生线程池,利用 TaskQueue 的 offer()方法巧妙的修改了 JUC 线程池的执行流程,改写后 Tomcat 线程池执行流程如下:

    1.判断如果当前线程数小于核心线程池,则新建一个线程来处理提交的任务

    2.如果当前当前线程池数大于核心线程池,小于最大线程数,则创建新的线程执行提交的任务

    3.如果当前线程数等于最大线程数,则将任务放入任务队列等待执行

    4.如果队列已满,则执行拒绝策略

    • Tomcat 核心线程池有对应的获取方法,获取方式如下
        public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {
            TomcatWebServer tomcatWebServer = (TomcatWebServer) webServer;
            return new ExecutorWrapper(POOL_NAME,
                    tomcatWebServer.getTomcat().getConnector().getProtocolHandler().getExecutor());
        }
    
    • 想要动态调整 Tomcat 线程池的线程参数,可以在引入相应依赖后,在配置文件中添加以下配置就行,配置文件完整示例看官网
    spring:
      dynamic:
        tp:
          // 省略其他配置项
          tomcatTp:    # tomcat webserver 线程池配置
            corePoolSize: 100
            maximumPoolSize: 200
            keepAliveTime: 60
    

    Tomcat 线程池就介绍到这里吧,通过以上的一些介绍想必大家对 Tomcat 线程池执行任务的流程应该比较清楚了吧。


    Jetty 内部线程池的实现

    • Jetty 内部线程池,定义了一个继承自 Executor 的 ThreadPool 顶级接口,实现类有以下几个

    • 内部主要使用 QueuedThreadPool 这个实现类,该线程池执行流程就不在详细解读了,感兴趣的可以自己去看源码,核心思想都差不多,围绕核心线程数、最大线程数、任务队列三个参数入手,跟 Tocmat 比对着来看,其实也挺简单的。
    public void execute(Runnable job)
        {
            // Determine if we need to start a thread, use and idle thread or just queue this job
            int startThread;
            while (true)
            {
                // Get the atomic counts
                long counts = _counts.get();
    
                // Get the number of threads started (might not yet be running)
                int threads = AtomicBiInteger.getHi(counts);
                if (threads == Integer.MIN_VALUE)
                    throw new RejectedExecutionException(job.toString());
    
                // Get the number of truly idle threads. This count is reduced by the
                // job queue size so that any threads that are idle but are about to take
                // a job from the queue are not counted.
                int idle = AtomicBiInteger.getLo(counts);
    
                // Start a thread if we have insufficient idle threads to meet demand
                // and we are not at max threads.
                startThread = (idle <= 0 && threads < _maxThreads) ? 1 : 0;
    
                // The job will be run by an idle thread when available
                if (!_counts.compareAndSet(counts, threads + startThread, idle + startThread - 1))
                    continue;
    
                break;
            }
    
            if (!_jobs.offer(job))
            {
                // reverse our changes to _counts.
                if (addCounts(-startThread, 1 - startThread))
                    LOG.warn("{} rejected {}", this, job);
                throw new RejectedExecutionException(job.toString());
            }
    
            if (LOG.isDebugEnabled())
                LOG.debug("queue {} startThread={}", job, startThread);
    
            // Start a thread if one was needed
            while (startThread-- > 0)
                startThread();
        }
    
    • Jetty 线程池有提供 public 的获取方法,获取方式如下
        public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {
            JettyWebServer jettyWebServer = (JettyWebServer) webServer;
            return new ExecutorWrapper(POOL_NAME, jettyWebServer.getServer().getThreadPool());
        }
    
    • 想要动态调整 Jetty 线程池的线程参数,可以在引入 DynamicTp 依赖后,在配置文件中添加以下配置就行,配置文件完整示例官网介绍
    spring:
      dynamic:
        tp:
          // 省略其他配置项
          jettyTp:    # jetty weberver 线程池配置
            corePoolSize: 100
            maximumPoolSize: 200
    

    Undertow 内部线程池的实现

    • Undertow 因为其性能彪悍,轻量,现在用的还是挺多的,wildfly (前身 Jboss )从 8 开始内部默认的 WebServer 用 Undertow 了,之前是 Tomcat 吧。了解 Undertow 的小伙伴应该知道,他底层是基于 XNIO 框架( 3.X 之前)来做的,这也是 Jboss 开发的一款基于 java nio 的优秀网络框架。但 Undertow 宣布从 3.0 开始底层网络框架要切换成 Netty 了,官方给的原因是说起网络编程,Netty 已经是事实上标准,用 Netty 的好处远大于 XNIO 能提供的,所以让我们期待 3.0 的发布吧,只可惜三年前就宣布了,至今也没动静,不知道是夭折了还是咋的,说实话,改动也挺大的,看啥时候发布吧,以下的介绍是基于 Undertow 2.x 版本来的

    • Undertow 内部是定义了一个叫 TaskPool 的线程池顶级接口,该接口有如图所示的几个实现。其实这几个实现类都是采用组合装饰的方式,内部都维护一个 JUC 的 Executor 体系类或者维护 Jboss 提供的 EnhancedQueueExecutor 类(也继承 JUC ExecutorService 类),执行流程可以自己去分析。

    • 具体的创建代码如下,根据外部是否传入,如果有传入则用外部传入的类,如果没有,根据参数设置内部创建一个,具体是用 JUC 的 ThreadPoolExecutor 还是 Jboss 的 EnhancedQueueExecutor ,根据配置参数选择

    • Undertow 线程池没有提供 public 的获取方法,所以通过反射来获取,获取方式如下
        public ExecutorWrapper doGetExecutorWrapper(WebServer webServer) {
    
            UndertowWebServer undertowWebServer = (UndertowWebServer) webServer;
            val undertow = (Undertow) ReflectionUtil.getFieldValue(UndertowWebServer.class, "undertow", undertowWebServer);
            if (Objects.isNull(undertow)) {
                return null;
            }
            return new ExecutorWrapper(POOL_NAME, undertow.getWorker());
        }
    
    • 想要动态调整 Undertow 线程池的线程参数,可以在引入 DynamicTp 依赖后,在配置文件中添加以下配置就行,配置文件完整示例看官网
    spring:
      dynamic:
        tp:
          // 省略其他配置项
          undertowTp:      # undertow webserver 线程池配置
            corePoolSize: 100
            maximumPoolSize: 200
            keepAliveTime: 60
    

    总结

    以上介绍了 Tomcat 、Jetty 、Undertow 三大 WebServer 内置线程池的一些情况,重点介绍了 Tomcat 的,篇幅有限,其他两个感兴趣可以自己分析,原理都差不多。同时也介绍了基于 DynamicTp 怎么动态调整线程池的参数,当我们做 WebServer 性能调优时,能动态调整参数真的是非常好用的。

    再次欢迎大家使用 DynamicTp 框架,一起完善项目。


    关于 DynamicTp 框架

    DynamicTp 是一个基于配置中心实现的轻量级动态线程池管理工具,主要功能可以总结为 动态调参、通知报警、运行监控、三方包线程池管理等几大类。

    经过几个版本迭代,目前最新版本 v1.0.7 具有以下特性

    特性

    • 代码零侵入:所有配置都放在配置中心,对业务代码零侵入

    • 轻量简单:基于 springboot 实现,引入 starter ,接入只需简单 4 步就可完成,顺利 3 分钟搞定

    • 高可扩展:框架核心功能都提供 SPI 接口供用户自定义个性化实现(配置中心、配置文件解析、通知告警、监控数据采集、任务包装等等)

    • 线上大规模应用:参考美团线程池实践,美团内部已经有该理论成熟的应用经验

    • 多平台通知报警:提供多种报警维度(配置变更通知、活性报警、容量阈值报警、拒绝触发报警、任务执行或等待超时报警),已支持企业微信、钉钉、飞书报警,同时提供 SPI 接口可自定义扩展实现

    • 监控:定时采集线程池指标数据,支持通过 MicroMeter 、JsonLog 日志输出、Endpoint 三种方式,可通过 SPI 接口自定义扩展实现

    • 任务增强:提供任务包装功能,实现 TaskWrapper 接口即可,如 TtlTaskWrapper 可以支持线程池上下文信息传递,以及给任务设置标识 id ,方便问题追踪

    • 兼容性:JUC 普通线程池也可以被框架监控,@Bean 定义时加 @DynamicTp 注解即可

    • 可靠性:框架提供的线程池实现 Spring 生命周期方法,可以在 Spring 容器关闭前尽可能多的处理队列中的任务

    • 多模式:参考 Tomcat 线程池提供了 IO 密集型场景使用的 EagerDtpExecutor 线程池

    • 支持多配置中心:基于主流配置中心实现线程池参数动态调整,实时生效,已支持 Nacos 、Apollo 、Zookeeper 、Consul ,同时也提供 SPI 接口可自定义扩展实现

    • 中间件线程池管理:集成管理常用第三方组件的线程池,已集成 Tomcat 、Jetty 、Undertow 、Dubbo 、RocketMq 、Hystrix 等组件的线程池管理(调参、监控报警)


    项目地址

    目前累计 1.6k star ,感谢你的 star ,欢迎 pr ,业务之余一起给开源贡献一份力量

    官网https://dynamictp.cn

    gitee 地址https://gitee.com/dromara/dynamic-tp

    github 地址https://github.com/dromara/dynamic-tp


    加入社群

    看到这儿,请给项目一个 star,你的支持是我们前进的动力!

    使用过程中有任何问题,或者对项目有什么想法或者建议,可以加入社群,跟群友一起交流讨论。

    微信群已满 200 人,可以关注微信公众号,加我个人微信拉群(备注:dynamic-tp )。

    wechat.jpeg

    1 条回复    2022-08-16 11:33:31 +08:00
    lmshl
        1
    lmshl  
       2022-08-16 11:33:31 +08:00   ❤️ 3
    别动态线程池了,还不如把 CompletableFuture 用对用好,动态线程池纯粹是先把代码写屎,再在屎上雕花的不必要方案。
    JVM 往保守了说有 fiber ,CompletableFuture 可以用,往激进了说还有 kotlin suspend ,展望未来还可以上 loom ,不管哪条路都没有动态线程池的活路。
    关于   ·   帮助文档   ·   博客   ·   API   ·   FAQ   ·   实用小工具   ·   2857 人在线   最高记录 6679   ·     Select Language
    创意工作者们的社区
    World is powered by solitude
    VERSION: 3.9.8.5 · 23ms · UTC 14:16 · PVG 22:16 · LAX 06:16 · JFK 09:16
    Developed with CodeLauncher
    ♥ Do have faith in what you're doing.