NioEventLoopGroup 源码分析
1. 在阅读源码时做了一定的注释,并且做了一些测试分析源码内的执行流程,由于博客篇幅有限。为了方便 IDE 查看、跟踪、调试 代码,所以在 github 上提供 netty 的源码、详细的注释及测试用例。欢迎大家 star、fork ! 2. 由于个人水平有限,对源码的分析理解可能存在偏差或不透彻的地方还请大家在评论区指出,谢谢!
从今天开始,就准备进军 ne tty 了,主要的想法是看看 netty4 中一些比较重要的实现,也就是能经常出现在我们面前的东西。主要是: 线程池、通道、管道、编解码器、以及常用的工具类。
然后现在看源码应该不会像之前的 jdk 那么细致了,主要是看了一个类以后就发现 netty 对代码封装太强了,基本一个功能可能封装了七八个类去实现,很多的抽象类但是这些抽象类中的功能还非常的多。所以说主要看这个流程,以及里面写的比较好的代码或者比较新的思想会仔细的去看看。具体的子字段,每个方法不可能做到那么细致。
好,正式开始 netty 源码征战 !
1. 基本思路 这里首先讲一下结论,也就是先说我看这个类的源码整理出来的思路,主要就是因为这些类太杂,一个功能在好几个类中才完全实现。
我们在 new 一个 worker/boss 线程的时候一般是采用的直接使用的无参的构造方法,但是无参的构造方法他创建的线程池的大小是我们 CPU 核心的 2 倍。紧接着就需要 new 这么多个线程放到线程池里面,这里的线程池采用的数据结构是一个数组存放的,每一个线程需要设置一个任务队列,显然任务队列使用的是一个阻塞队列,这里实际采用的是 LinkedBlockQueue
,然后回想一下在 jdk 中的线程池是不是还有一个比较重要的参数就是线程工厂,对的!这里也有这个东西,他是需要我们手动传入的,但是如果不传则会使用一个默认的线程工厂,里面有一个 newThread
方法,这个方法实现基本和 jdk 中的实现一模一样,就是创建一个级别为 5 的非 Daemon 线程。对这就是我们在创建一个线程池时候完成的全部工作!
好现在来具体说一下,我们每次创建的是 NioEventLoopGroup
但是他又继承了 n 个类才实现了线程池,也就是线程池的祖先是 ScheduledExecutorService
是 jdk 中的线程池的一个接口,其中里面最重要的数据结构就是一个 children 数组,用来装线程的。
然后具体的线程他也是进行了封装的,也就是我们常看到的 NioEventLoop
。这个类里面有两个比较重要的结构:taskQueue 和 thread 。很明显这个非常类似 jdk 中的线程池。
2. NioEventLoopGroup 线程池分析 首先要创建线程池,传入的线程数为 0,他是一直在调用 this()
最后追溯到 super(nThreads,threadFactory,selectorProvider)
也就是使用了 MultithreadEventLoopGroup
的构造方法,在这一步确定了当传入的线程数为 0 时应该设置的线程数为 CPU 核心的两倍。然后再次上调,调用了 MultithreadEventExecutorGroup
的构造方法,在这里才是真正的开始了线程池的初始化。
首先设置了线程池工厂,然后初始化 chooser ,接着创建 n 个线程放到 children 数组中,最后设置线程中断的监听事件。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 protected MultithreadEventExecutorGroup (int nThreads, ThreadFactory threadFactory, Object... args) { if (nThreads <= 0 ) { throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)" , nThreads)); } if (threadFactory == null ) { threadFactory = newDefaultThreadFactory(); } children = new SingleThreadEventExecutor[nThreads]; if (isPowerOfTwo(children.length)) { chooser = new PowerOfTwoEventExecutorChooser(); } else { chooser = new GenericEventExecutorChooser(); } for (int i = 0 ; i < nThreads; i ++) { boolean success = false ; try { children[i] = newChild(threadFactory, args); success = true ; } catch (Exception e) { throw new IllegalStateException("failed to create a child event loop" , e); } finally { if (!success) { for (int j = 0 ; j < i; j ++) { children[j].shutdownGracefully(); } for (int j = 0 ; j < i; j ++) { EventExecutor e = children[j]; try { while (!e.isTerminated()) { e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS); } } catch (InterruptedException interrupted) { Thread.currentThread().interrupt(); break ; } } } } } final FutureListener<Object> terminationListener = new FutureListener<Object>() { @Override public void operationComplete (Future<Object> future) throws Exception { if (terminatedChildren.incrementAndGet() == children.length) { terminationFuture.setSuccess(null ); } } }; for (EventExecutor e: children) { e.terminationFuture().addListener(terminationListener); } }
其中有一个 if 分支用来初始化 chooser ,这个 chooser 就是用来选择使用哪个线程来执行哪些操作的。这里用到了判断一个数是否为 2 的次幂的一个方法 isPowerOfTwo()
实现比较有意思,贴出来。
1 2 3 private static boolean isPowerOfTwo (int val) { return (val & -val) == val; }
接下来目光要转向 newChild(threadFactory, args)
,因为在这个类里面这个方法是抽象的,在 NioEventLoopGroup
得到了实现。其实看到了也非常的简单粗暴,直接 new 了一个 NioEventLoop
,接下来就应该分析这个线程的包装类了。
1 2 3 4 5 6 @Override protected EventExecutor newChild ( ThreadFactory threadFactory, Object... args) throws Exception { return new NioEventLoop(this , threadFactory, (SelectorProvider) args[0 ]); }
3. NioEventLoop 线程分析 上面已经看到了,newChild
方法就是 new 了一个 NioEventLoop
。所以有必要好好看看这个线程包装类。
这个类的构造方法是调用了父类 SingleThreadEventLoop
的构造,接着继续上调 SingleThreadEventExecutor
构造,在这个类中才真正的实现了线程的构造。里面就做了两件事 :
new 了一个新的线程,新的线程还分配了一个任务,任务的内容就是调用本类中的一个 run 方法,在 NioEventLoop
中实现。
设置任务队列为 LinkedBlockQueue
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 protected SingleThreadEventExecutor (EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) { if (threadFactory == null ) { throw new NullPointerException("threadFactory" ); } this .parent = parent; this .addTaskWakesUp = addTaskWakesUp; thread = threadFactory.newThread(new Runnable() { @Override public void run () { boolean success = false ; updateLastExecutionTime(); try { SingleThreadEventExecutor.this .run(); success = true ; } catch (Throwable t) { logger.warn("Unexpected exception from an event executor: " , t); } finally { for (;;) { int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this ); if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet( SingleThreadEventExecutor.this , oldState, ST_SHUTTING_DOWN)) { break ; } } if (success && gracefulShutdownStartTime == 0 ) { logger.error( "Buggy " + EventExecutor.class.getSimpleName() + " implementation; " + SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " + "before run() implementation terminates." ); } try { for (;;) { if (confirmShutdown()) { break ; } } } finally { try { cleanup(); } finally { STATE_UPDATER.set(SingleThreadEventExecutor.this , ST_TERMINATED); threadLock.release(); if (!taskQueue.isEmpty()) { logger.warn("An event executor terminated with non-empty task queue (" + taskQueue.size() + ')' ); } terminationFuture.setSuccess(null ); } } } } }); taskQueue = newTaskQueue(); }
然后看一下他要执行的 run 方法在 NioEventLoop
中得到了实现。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 @Override protected void run () { for (;;) { boolean oldWakenUp = wakenUp.getAndSet(false ); try { if (hasTasks()) { selectNow(); } else { select(oldWakenUp); if (wakenUp.get()) { selector.wakeup(); } } cancelledKeys = 0 ; needsToSelectAgain = false ; final int ioRatio = this .ioRatio; if (ioRatio == 100 ) { processSelectedKeys(); runAllTasks(); } else { final long ioStartTime = System.nanoTime(); processSelectedKeys(); final long ioTime = System.nanoTime() - ioStartTime; runAllTasks(ioTime * (100 - ioRatio) / ioRatio); } if (isShuttingDown()) { closeAll(); if (confirmShutdown()) { break ; } } } catch (Throwable t) { logger.warn("Unexpected exception in the selector loop." , t); try { Thread.sleep(1000 ); } catch (InterruptedException e) { } } } }
紧接着就是分析这个 run 方法,也就是线程在被创建之后进行的一系列操作。里面主要做了三件事:
进行 select
处理 selectedKeys
唤醒队列中所有的任务
上面的操作都是在一个循环里面一直执行的,所以说 NioEventLoop
这个线程的作用就只有一个那就是:进行任务处理。在这个线程被 new 出来时我们就给他分配了线程的任务就是永不停歇的进行上面的操作。
上面的过程说的是有线程安全问题,也就是如果我们过早的把 wakenUp 设置为 true,我们的 select 就会苏醒过来,而其他的线程不清楚这种状态想要设置为 wakenUp 的时候都会失败,导致 select 休眠。主要感觉有点是因为这个东西不是线程间可见的,要是采用 volatile 可能就会解决这个问题,但是 wakenUp 是 final 的不能使用 volatile 关键字修饰。所以作者采用的解决方案就是再次手动唤醒,防止由于其他线程并发设置 wakenUp 的值导致的不必要的休眠。
然后要说一下 select 方法,这个方法的调用主要因为在队列中没有任务,所以就暂时不用 select ,这个方法里面做的就是自旋的去 select ,没有任务就 等待一段时间再去 select。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 private void select (boolean oldWakenUp) throws IOException { Selector selector = this .selector; try { int selectCnt = 0 ; long currentTimeNanos = System.nanoTime(); long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos); for (;;) { long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L ) / 1000000L ; if (timeoutMillis <= 0 ) { if (selectCnt == 0 ) { selector.selectNow(); selectCnt = 1 ; } break ; } int selectedKeys = selector.select(timeoutMillis); selectCnt ++; if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) { break ; } if (Thread.interrupted()) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely because " + "Thread.currentThread().interrupt() was called. Use " + "NioEventLoop.shutdownGracefully() to shutdown the NioEventLoop." ); } selectCnt = 1 ; break ; } long time = System.nanoTime(); if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) { selectCnt = 1 ; } else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 && selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) { logger.warn( "Selector.select() returned prematurely {} times in a row; rebuilding selector." , selectCnt); rebuildSelector(); selector = this .selector; selector.selectNow(); selectCnt = 1 ; break ; } currentTimeNanos = time; } if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS) { if (logger.isDebugEnabled()) { logger.debug("Selector.select() returned prematurely {} times in a row." , selectCnt - 1 ); } } } catch (CancelledKeyException e) { if (logger.isDebugEnabled()) { logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector - JDK bug?" , e); } } }
接着就是 processSelectedKeys();
和runAllTasks();
这两个方法,前一个方法不说就是和我们写 Nio 的时候的步骤差不多,遍历 selectedKeys 处理,然后 runAllTasks()
执行所有的任务的 run 方法。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 protected boolean runAllTasks () { fetchFromDelayedQueue(); Runnable task = pollTask(); if (task == null ) { return false ; } for (;;) { try { task.run(); } catch (Throwable t) { logger.warn("A task raised an exception." , t); } task = pollTask(); if (task == null ) { lastExecutionTime = ScheduledFutureTask.nanoTime(); return true ; } } }
4. 总结 好了其实到这里线程池其实分析的已经差不多了,对于很多的细节问题并没有仔细的去看,单丝我们清楚流程以及里面的结构基本就差不多了。
在 NioEventLoopGroup
中包装了 NioEventLoop
线程任务。具体包装在了 children 数组中,然后使用 newThread 工厂创建线程,接着给线程分配任务,任务就是进行 select 操作。
我的博客即将搬运同步至腾讯云+社区,邀请大家一同入驻:https://cloud.tencent.com/developer/support-plan