From 57a3ea4f053877641182ad5b216ad1266d7060cb Mon Sep 17 00:00:00 2001 From: carbotaniuman <41451839+carbotaniuman@users.noreply.github.com> Date: Wed, 27 Jan 2021 08:51:26 -0600 Subject: [PATCH] Fix threadpool mechanics --- CHANGELOG.md | 7 +++ settings.sample.yaml | 5 +++ .../kotlin/mdnet/netty/ApplicationNetty.kt | 44 ++++++++++++------- .../kotlin/mdnet/settings/ClientSettings.kt | 1 + 4 files changed, 40 insertions(+), 17 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b49df0c..16e9ea8 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security +## [2.0.0-rc4] - 2021-01-27 +### Changed +- [2021-01-27] Added `threads` back with new semantics [@carbotaniuman]. + +### Fixed +- [2021-01-27] Fix regression with Prometheus metrics [@carbotaniuman]. + ## [2.0.0-rc3] - 2021-01-26 ### Changed - [2021-01-26] Removed `threads` options in settings [@carbotaniuman]. diff --git a/settings.sample.yaml b/settings.sample.yaml index e52ddbe..fbb653b 100644 --- a/settings.sample.yaml +++ b/settings.sample.yaml @@ -69,3 +69,8 @@ server_settings: # Maximum mebibytes per hour of images to server # Setting this to 0 disables the limiter max_mebibytes_per_hour: 0 + # Number of threads for Netty worker pool + # Don't touch this if you know what you're doing + # 0 defaults to (2 * your available processors) + # https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Runtime.html#availableProcessors() + threads: 0 diff --git a/src/main/kotlin/mdnet/netty/ApplicationNetty.kt b/src/main/kotlin/mdnet/netty/ApplicationNetty.kt index 2a8ffe8..95f0c25 100644 --- a/src/main/kotlin/mdnet/netty/ApplicationNetty.kt +++ b/src/main/kotlin/mdnet/netty/ApplicationNetty.kt @@ -41,6 +41,7 @@ import io.netty.handler.traffic.TrafficCounter import io.netty.incubator.channel.uring.IOUring import io.netty.incubator.channel.uring.IOUringEventLoopGroup import io.netty.incubator.channel.uring.IOUringServerSocketChannel +import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor import io.netty.util.internal.SystemPropertyUtil import mdnet.Constants import mdnet.data.Statistics @@ -65,43 +66,52 @@ import java.security.cert.X509Certificate import java.util.Locale import javax.net.ssl.SSLException -interface NettyTransport { - val bossGroup: EventLoopGroup - val workerGroup: EventLoopGroup - val factory: ChannelFactory +sealed class NettyTransport(threads: Int) { + abstract val bossGroup: EventLoopGroup + abstract val workerGroup: EventLoopGroup + abstract val factory: ChannelFactory + val executor = UnorderedThreadPoolEventExecutor( + threads.also { + require(threads > 0) { "Threads must be greater than zero" } + } + ) fun shutdownGracefully() { bossGroup.shutdownGracefully() workerGroup.shutdownGracefully() + executor.shutdownGracefully() } - private class NioTransport : NettyTransport { + private class NioTransport(threads: Int) : NettyTransport(threads) { override val bossGroup = NioEventLoopGroup(1) - override val workerGroup = NioEventLoopGroup() + override val workerGroup = NioEventLoopGroup(8) override val factory = ChannelFactory { NioServerSocketChannel() } } - private class EpollTransport : NettyTransport { + private class EpollTransport(threads: Int) : NettyTransport(threads) { override val bossGroup = EpollEventLoopGroup(1) - override val workerGroup = EpollEventLoopGroup() + override val workerGroup = EpollEventLoopGroup(8) override val factory = ChannelFactory { EpollServerSocketChannel() } } - private class IOUringTransport : NettyTransport { + private class IOUringTransport(threads: Int) : NettyTransport(threads) { override val bossGroup = IOUringEventLoopGroup(1) - override val workerGroup = IOUringEventLoopGroup() + override val workerGroup = IOUringEventLoopGroup(8) override val factory = ChannelFactory { IOUringServerSocketChannel() } } companion object { private val LOGGER = LoggerFactory.getLogger(NettyTransport::class.java) + private fun defaultNumThreads() = Runtime.getRuntime().availableProcessors() * 2 - fun bestForPlatform(): NettyTransport { + fun bestForPlatform(threads: Int): NettyTransport { val name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.UK).trim { it <= ' ' } + + val threadsToUse = if (threads == 0) defaultNumThreads() else threads if (name.startsWith("linux")) { if (IOUring.isAvailable()) { LOGGER.info("Using IOUring transport") - return IOUringTransport() + return IOUringTransport(threadsToUse) } else { LOGGER.info(IOUring.unavailabilityCause()) { "IOUring transport not available" @@ -110,7 +120,7 @@ interface NettyTransport { if (Epoll.isAvailable()) { LOGGER.info("Using Epoll transport") - return EpollTransport() + return EpollTransport(threadsToUse) } else { LOGGER.info(Epoll.unavailabilityCause()) { "Epoll transport not available" @@ -119,7 +129,7 @@ interface NettyTransport { } LOGGER.info("Using Nio transport") - return NioTransport() + return NioTransport(threadsToUse) } } } @@ -130,13 +140,13 @@ class Netty( private val statistics: Statistics ) : ServerConfig { override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer { - private val transport = NettyTransport.bestForPlatform() + private val transport = NettyTransport.bestForPlatform(serverSettings.threads) private lateinit var closeFuture: ChannelFuture private lateinit var address: InetSocketAddress private val burstLimiter = object : GlobalTrafficShapingHandler( - transport.workerGroup, serverSettings.maxKilobitsPerSecond * 1000L / 8L, 0, 50 + transport.workerGroup, serverSettings.maxKilobitsPerSecond * 1000L / 8L, 0, 100 ) { override fun doAccounting(counter: TrafficCounter) { statistics.bytesSent.getAndAccumulate(counter.cumulativeWrittenBytes()) { a, b -> a + b } @@ -176,7 +186,7 @@ class Netty( ) ch.pipeline().addLast("streamer", ChunkedWriteHandler()) - ch.pipeline().addLast("handler", Http4kChannelHandler(httpHandler)) + ch.pipeline().addLast(transport.executor, "handler", Http4kChannelHandler(httpHandler)) ch.pipeline().addLast( "exceptions", diff --git a/src/main/kotlin/mdnet/settings/ClientSettings.kt b/src/main/kotlin/mdnet/settings/ClientSettings.kt index 4232a72..ffb7701 100644 --- a/src/main/kotlin/mdnet/settings/ClientSettings.kt +++ b/src/main/kotlin/mdnet/settings/ClientSettings.kt @@ -41,6 +41,7 @@ data class ServerSettings( val externalMaxKilobitsPerSecond: Long = 0, val maxMebibytesPerHour: Long = 0, val port: Int = 443, + val threads: Int = 0, ) @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy::class)