1
0
Fork 1
mirror of https://gitlab.com/mangadex-pub/mangadex_at_home.git synced 2024-01-19 02:48:37 +00:00

Fix threadpool mechanics

This commit is contained in:
carbotaniuman 2021-01-27 08:51:26 -06:00
parent 86a5094cd3
commit 57a3ea4f05
4 changed files with 40 additions and 17 deletions

View file

@ -17,6 +17,13 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
### Security ### Security
## [2.0.0-rc4] - 2021-01-27
### Changed
- [2021-01-27] Added `threads` back with new semantics [@carbotaniuman].
### Fixed
- [2021-01-27] Fix regression with Prometheus metrics [@carbotaniuman].
## [2.0.0-rc3] - 2021-01-26 ## [2.0.0-rc3] - 2021-01-26
### Changed ### Changed
- [2021-01-26] Removed `threads` options in settings [@carbotaniuman]. - [2021-01-26] Removed `threads` options in settings [@carbotaniuman].

View file

@ -69,3 +69,8 @@ server_settings:
# Maximum mebibytes per hour of images to server # Maximum mebibytes per hour of images to server
# Setting this to 0 disables the limiter # Setting this to 0 disables the limiter
max_mebibytes_per_hour: 0 max_mebibytes_per_hour: 0
# Number of threads for Netty worker pool
# Don't touch this if you know what you're doing
# 0 defaults to (2 * your available processors)
# https://docs.oracle.com/en/java/javase/11/docs/api/java.base/java/lang/Runtime.html#availableProcessors()
threads: 0

View file

@ -41,6 +41,7 @@ import io.netty.handler.traffic.TrafficCounter
import io.netty.incubator.channel.uring.IOUring import io.netty.incubator.channel.uring.IOUring
import io.netty.incubator.channel.uring.IOUringEventLoopGroup import io.netty.incubator.channel.uring.IOUringEventLoopGroup
import io.netty.incubator.channel.uring.IOUringServerSocketChannel import io.netty.incubator.channel.uring.IOUringServerSocketChannel
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor
import io.netty.util.internal.SystemPropertyUtil import io.netty.util.internal.SystemPropertyUtil
import mdnet.Constants import mdnet.Constants
import mdnet.data.Statistics import mdnet.data.Statistics
@ -65,43 +66,52 @@ import java.security.cert.X509Certificate
import java.util.Locale import java.util.Locale
import javax.net.ssl.SSLException import javax.net.ssl.SSLException
interface NettyTransport { sealed class NettyTransport(threads: Int) {
val bossGroup: EventLoopGroup abstract val bossGroup: EventLoopGroup
val workerGroup: EventLoopGroup abstract val workerGroup: EventLoopGroup
val factory: ChannelFactory<ServerChannel> abstract val factory: ChannelFactory<ServerChannel>
val executor = UnorderedThreadPoolEventExecutor(
threads.also {
require(threads > 0) { "Threads must be greater than zero" }
}
)
fun shutdownGracefully() { fun shutdownGracefully() {
bossGroup.shutdownGracefully() bossGroup.shutdownGracefully()
workerGroup.shutdownGracefully() workerGroup.shutdownGracefully()
executor.shutdownGracefully()
} }
private class NioTransport : NettyTransport { private class NioTransport(threads: Int) : NettyTransport(threads) {
override val bossGroup = NioEventLoopGroup(1) override val bossGroup = NioEventLoopGroup(1)
override val workerGroup = NioEventLoopGroup() override val workerGroup = NioEventLoopGroup(8)
override val factory = ChannelFactory<ServerChannel> { NioServerSocketChannel() } override val factory = ChannelFactory<ServerChannel> { NioServerSocketChannel() }
} }
private class EpollTransport : NettyTransport { private class EpollTransport(threads: Int) : NettyTransport(threads) {
override val bossGroup = EpollEventLoopGroup(1) override val bossGroup = EpollEventLoopGroup(1)
override val workerGroup = EpollEventLoopGroup() override val workerGroup = EpollEventLoopGroup(8)
override val factory = ChannelFactory<ServerChannel> { EpollServerSocketChannel() } override val factory = ChannelFactory<ServerChannel> { EpollServerSocketChannel() }
} }
private class IOUringTransport : NettyTransport { private class IOUringTransport(threads: Int) : NettyTransport(threads) {
override val bossGroup = IOUringEventLoopGroup(1) override val bossGroup = IOUringEventLoopGroup(1)
override val workerGroup = IOUringEventLoopGroup() override val workerGroup = IOUringEventLoopGroup(8)
override val factory = ChannelFactory<ServerChannel> { IOUringServerSocketChannel() } override val factory = ChannelFactory<ServerChannel> { IOUringServerSocketChannel() }
} }
companion object { companion object {
private val LOGGER = LoggerFactory.getLogger(NettyTransport::class.java) private val LOGGER = LoggerFactory.getLogger(NettyTransport::class.java)
private fun defaultNumThreads() = Runtime.getRuntime().availableProcessors() * 2
fun bestForPlatform(): NettyTransport { fun bestForPlatform(threads: Int): NettyTransport {
val name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.UK).trim { it <= ' ' } val name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.UK).trim { it <= ' ' }
val threadsToUse = if (threads == 0) defaultNumThreads() else threads
if (name.startsWith("linux")) { if (name.startsWith("linux")) {
if (IOUring.isAvailable()) { if (IOUring.isAvailable()) {
LOGGER.info("Using IOUring transport") LOGGER.info("Using IOUring transport")
return IOUringTransport() return IOUringTransport(threadsToUse)
} else { } else {
LOGGER.info(IOUring.unavailabilityCause()) { LOGGER.info(IOUring.unavailabilityCause()) {
"IOUring transport not available" "IOUring transport not available"
@ -110,7 +120,7 @@ interface NettyTransport {
if (Epoll.isAvailable()) { if (Epoll.isAvailable()) {
LOGGER.info("Using Epoll transport") LOGGER.info("Using Epoll transport")
return EpollTransport() return EpollTransport(threadsToUse)
} else { } else {
LOGGER.info(Epoll.unavailabilityCause()) { LOGGER.info(Epoll.unavailabilityCause()) {
"Epoll transport not available" "Epoll transport not available"
@ -119,7 +129,7 @@ interface NettyTransport {
} }
LOGGER.info("Using Nio transport") LOGGER.info("Using Nio transport")
return NioTransport() return NioTransport(threadsToUse)
} }
} }
} }
@ -130,13 +140,13 @@ class Netty(
private val statistics: Statistics private val statistics: Statistics
) : ServerConfig { ) : ServerConfig {
override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer { override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer {
private val transport = NettyTransport.bestForPlatform() private val transport = NettyTransport.bestForPlatform(serverSettings.threads)
private lateinit var closeFuture: ChannelFuture private lateinit var closeFuture: ChannelFuture
private lateinit var address: InetSocketAddress private lateinit var address: InetSocketAddress
private val burstLimiter = object : GlobalTrafficShapingHandler( private val burstLimiter = object : GlobalTrafficShapingHandler(
transport.workerGroup, serverSettings.maxKilobitsPerSecond * 1000L / 8L, 0, 50 transport.workerGroup, serverSettings.maxKilobitsPerSecond * 1000L / 8L, 0, 100
) { ) {
override fun doAccounting(counter: TrafficCounter) { override fun doAccounting(counter: TrafficCounter) {
statistics.bytesSent.getAndAccumulate(counter.cumulativeWrittenBytes()) { a, b -> a + b } statistics.bytesSent.getAndAccumulate(counter.cumulativeWrittenBytes()) { a, b -> a + b }
@ -176,7 +186,7 @@ class Netty(
) )
ch.pipeline().addLast("streamer", ChunkedWriteHandler()) ch.pipeline().addLast("streamer", ChunkedWriteHandler())
ch.pipeline().addLast("handler", Http4kChannelHandler(httpHandler)) ch.pipeline().addLast(transport.executor, "handler", Http4kChannelHandler(httpHandler))
ch.pipeline().addLast( ch.pipeline().addLast(
"exceptions", "exceptions",

View file

@ -41,6 +41,7 @@ data class ServerSettings(
val externalMaxKilobitsPerSecond: Long = 0, val externalMaxKilobitsPerSecond: Long = 0,
val maxMebibytesPerHour: Long = 0, val maxMebibytesPerHour: Long = 0,
val port: Int = 443, val port: Int = 443,
val threads: Int = 0,
) )
@JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy::class) @JsonNaming(PropertyNamingStrategies.SnakeCaseStrategy::class)