1
0
Fork 1
mirror of https://gitlab.com/mangadex-pub/mangadex_at_home.git synced 2024-01-19 02:48:37 +00:00
mangadex_at_home/src/main/kotlin/mdnet/netty/ApplicationNetty.kt

239 lines
10 KiB
Kotlin
Raw Normal View History

2020-06-22 17:02:36 +00:00
/*
Mangadex@Home
Copyright (c) 2020, MangaDex Network
This file is part of MangaDex@Home.
MangaDex@Home is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
MangaDex@Home is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this MangaDex@Home. If not, see <http://www.gnu.org/licenses/>.
2021-01-25 02:25:49 +00:00
*/
2021-01-24 04:55:11 +00:00
package mdnet.netty
2020-06-06 22:52:25 +00:00
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.*
2021-01-24 18:05:05 +00:00
import io.netty.channel.epoll.Epoll
import io.netty.channel.epoll.EpollEventLoopGroup
import io.netty.channel.epoll.EpollServerSocketChannel
2020-06-06 22:52:25 +00:00
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.DecoderException
2021-01-26 16:15:50 +00:00
import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.HttpServerKeepAliveHandler
2020-06-06 22:52:25 +00:00
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.stream.ChunkedWriteHandler
2020-07-04 17:14:51 +00:00
import io.netty.handler.timeout.ReadTimeoutException
import io.netty.handler.timeout.ReadTimeoutHandler
2020-07-04 17:14:51 +00:00
import io.netty.handler.timeout.WriteTimeoutException
import io.netty.handler.timeout.WriteTimeoutHandler
2020-06-06 22:52:25 +00:00
import io.netty.handler.traffic.GlobalTrafficShapingHandler
import io.netty.handler.traffic.TrafficCounter
2021-01-24 18:05:05 +00:00
import io.netty.incubator.channel.uring.IOUring
import io.netty.incubator.channel.uring.IOUringEventLoopGroup
import io.netty.incubator.channel.uring.IOUringServerSocketChannel
2021-01-28 14:17:24 +00:00
import io.netty.util.concurrent.DefaultEventExecutorGroup
2021-01-26 16:15:50 +00:00
import io.netty.util.internal.SystemPropertyUtil
2021-01-24 04:55:11 +00:00
import mdnet.Constants
import mdnet.data.Statistics
import mdnet.logging.info
import mdnet.logging.trace
2021-01-26 16:15:50 +00:00
import mdnet.logging.warn
2021-01-24 04:55:11 +00:00
import mdnet.settings.ServerSettings
import mdnet.settings.TlsCert
import org.http4k.core.HttpHandler
import org.http4k.server.Http4kChannelHandler
import org.http4k.server.Http4kServer
import org.http4k.server.ServerConfig
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
import java.io.IOException
import java.io.InputStream
2020-06-06 22:52:25 +00:00
import java.net.InetSocketAddress
2020-06-22 17:08:46 +00:00
import java.net.SocketException
import java.security.PrivateKey
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
2021-01-26 16:15:50 +00:00
import java.util.Locale
import javax.net.ssl.SSLException
2020-06-06 22:52:25 +00:00
2021-01-27 14:51:26 +00:00
sealed class NettyTransport(threads: Int) {
abstract val bossGroup: EventLoopGroup
abstract val workerGroup: EventLoopGroup
abstract val factory: ChannelFactory<ServerChannel>
2021-01-28 14:17:24 +00:00
val executor = DefaultEventExecutorGroup(
2021-01-27 14:51:26 +00:00
threads.also {
require(threads > 0) { "Threads must be greater than zero" }
}
)
2021-01-24 18:05:05 +00:00
fun shutdownGracefully() {
2021-02-11 15:11:03 +00:00
bossGroup.shutdownGracefully().sync()
workerGroup.shutdownGracefully().sync()
executor.shutdownGracefully().sync()
2021-01-24 18:05:05 +00:00
}
2021-01-27 14:51:26 +00:00
private class NioTransport(threads: Int) : NettyTransport(threads) {
2021-01-26 16:15:50 +00:00
override val bossGroup = NioEventLoopGroup(1)
2021-01-27 14:51:26 +00:00
override val workerGroup = NioEventLoopGroup(8)
2021-01-24 18:05:05 +00:00
override val factory = ChannelFactory<ServerChannel> { NioServerSocketChannel() }
}
2021-01-27 14:51:26 +00:00
private class EpollTransport(threads: Int) : NettyTransport(threads) {
2021-01-26 16:15:50 +00:00
override val bossGroup = EpollEventLoopGroup(1)
2021-01-27 14:51:26 +00:00
override val workerGroup = EpollEventLoopGroup(8)
2021-01-24 18:05:05 +00:00
override val factory = ChannelFactory<ServerChannel> { EpollServerSocketChannel() }
}
2021-01-27 14:51:26 +00:00
private class IOUringTransport(threads: Int) : NettyTransport(threads) {
2021-01-26 16:15:50 +00:00
override val bossGroup = IOUringEventLoopGroup(1)
2021-01-27 14:51:26 +00:00
override val workerGroup = IOUringEventLoopGroup(8)
2021-01-24 18:05:05 +00:00
override val factory = ChannelFactory<ServerChannel> { IOUringServerSocketChannel() }
}
companion object {
private val LOGGER = LoggerFactory.getLogger(NettyTransport::class.java)
2021-01-27 14:51:26 +00:00
private fun defaultNumThreads() = Runtime.getRuntime().availableProcessors() * 2
2021-01-24 18:05:05 +00:00
2021-01-27 14:51:26 +00:00
fun bestForPlatform(threads: Int): NettyTransport {
2021-01-26 16:15:50 +00:00
val name = SystemPropertyUtil.get("os.name").toLowerCase(Locale.UK).trim { it <= ' ' }
2021-01-27 14:51:26 +00:00
val threadsToUse = if (threads == 0) defaultNumThreads() else threads
2021-01-27 19:36:49 +00:00
LOGGER.info { "Choosing a transport using $threadsToUse threads" }
2021-01-27 19:33:28 +00:00
2021-01-26 16:15:50 +00:00
if (name.startsWith("linux")) {
2021-02-11 15:11:03 +00:00
if (!SystemPropertyUtil.get("no-iouring").toBoolean()) {
if (IOUring.isAvailable()) {
LOGGER.info { "Using IOUring transport" }
return IOUringTransport(threadsToUse)
} else {
LOGGER.info(IOUring.unavailabilityCause()) {
2021-02-23 17:18:47 +00:00
"IOUring transport not available (this may be normal)"
2021-02-11 15:11:03 +00:00
}
2021-01-26 16:15:50 +00:00
}
2021-01-24 18:05:05 +00:00
}
2021-02-11 15:11:03 +00:00
if (!SystemPropertyUtil.get("no-epoll").toBoolean()) {
if (Epoll.isAvailable()) {
LOGGER.info { "Using Epoll transport" }
return EpollTransport(threadsToUse)
} else {
LOGGER.info(Epoll.unavailabilityCause()) {
2021-02-23 17:18:47 +00:00
"Epoll transport not available (this may be normal)"
2021-02-11 15:11:03 +00:00
}
2021-01-26 16:15:50 +00:00
}
2021-01-24 18:05:05 +00:00
}
}
2021-01-27 19:33:28 +00:00
LOGGER.info { "Using Nio transport" }
2021-01-27 14:51:26 +00:00
return NioTransport(threadsToUse)
2021-01-24 18:05:05 +00:00
}
}
}
2021-01-26 16:15:50 +00:00
class Netty(
private val tls: TlsCert,
private val serverSettings: ServerSettings,
private val statistics: Statistics
) : ServerConfig {
2020-06-06 22:52:25 +00:00
override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer {
2021-01-27 14:51:26 +00:00
private val transport = NettyTransport.bestForPlatform(serverSettings.threads)
2021-01-24 04:55:11 +00:00
2021-02-11 15:11:03 +00:00
private lateinit var channel: Channel
2020-06-06 22:52:25 +00:00
private val burstLimiter = object : GlobalTrafficShapingHandler(
2021-01-27 14:51:26 +00:00
transport.workerGroup, serverSettings.maxKilobitsPerSecond * 1000L / 8L, 0, 100
2021-01-24 04:55:11 +00:00
) {
2020-06-06 22:52:25 +00:00
override fun doAccounting(counter: TrafficCounter) {
2021-01-26 16:15:50 +00:00
statistics.bytesSent.getAndAccumulate(counter.cumulativeWrittenBytes()) { a, b -> a + b }
counter.resetCumulativeTime()
2020-06-06 22:52:25 +00:00
}
}
override fun start(): Http4kServer = apply {
2021-01-26 16:15:50 +00:00
LOGGER.info { "Starting Netty!" }
2020-06-27 18:15:49 +00:00
val certs = getX509Certs(tls.certificate)
val sslContext = SslContextBuilder
2021-01-24 04:55:11 +00:00
.forServer(getPrivateKey(tls.privateKey), certs)
.protocols("TLSv1.3", "TLSv1.2", "TLSv1.1", "TLSv1")
.build()
2020-06-06 22:52:25 +00:00
val bootstrap = ServerBootstrap()
2021-01-26 16:15:50 +00:00
bootstrap.group(transport.bossGroup, transport.workerGroup)
2021-01-24 18:05:05 +00:00
.channelFactory(transport.factory)
2021-01-24 04:55:11 +00:00
.childHandler(object : ChannelInitializer<SocketChannel>() {
public override fun initChannel(ch: SocketChannel) {
ch.pipeline().addLast("ssl", sslContext.newHandler(ch.alloc()))
2020-06-06 22:52:25 +00:00
2021-01-24 04:55:11 +00:00
ch.pipeline().addLast("codec", HttpServerCodec())
ch.pipeline().addLast("keepAlive", HttpServerKeepAliveHandler())
ch.pipeline().addLast("aggregator", HttpObjectAggregator(65536))
2020-06-09 19:29:33 +00:00
2021-01-24 04:55:11 +00:00
ch.pipeline().addLast("burstLimiter", burstLimiter)
2021-01-26 16:15:50 +00:00
ch.pipeline().addLast(
"readTimeoutHandler",
ReadTimeoutHandler(Constants.MAX_READ_TIME_SECONDS)
)
ch.pipeline().addLast(
"writeTimeoutHandler",
WriteTimeoutHandler(Constants.MAX_WRITE_TIME_SECONDS)
)
2021-01-24 04:55:11 +00:00
ch.pipeline().addLast("streamer", ChunkedWriteHandler())
2021-01-27 14:51:26 +00:00
ch.pipeline().addLast(transport.executor, "handler", Http4kChannelHandler(httpHandler))
2021-01-24 04:55:11 +00:00
ch.pipeline().addLast(
"exceptions",
object : ChannelInboundHandlerAdapter() {
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
if (cause is SSLException || (cause is DecoderException && cause.cause is SSLException)) {
2021-01-26 16:15:50 +00:00
LOGGER.trace(cause) { "Ignored invalid SSL connection" }
2020-06-22 17:08:46 +00:00
} else if (cause is IOException || cause is SocketException) {
2021-01-26 16:15:50 +00:00
LOGGER.trace(cause) { "User (downloader) abruptly closed the connection" }
2020-07-04 17:14:51 +00:00
} else if (cause !is ReadTimeoutException && cause !is WriteTimeoutException) {
2021-01-26 16:15:50 +00:00
LOGGER.warn(cause) { "Exception in pipeline" }
}
}
2021-01-24 04:55:11 +00:00
}
)
}
})
.option(ChannelOption.SO_BACKLOG, 1000)
.childOption(ChannelOption.SO_KEEPALIVE, true)
2021-02-11 15:11:03 +00:00
channel = bootstrap.bind(InetSocketAddress(serverSettings.hostname, serverSettings.port)).sync().channel()
2020-06-06 22:52:25 +00:00
}
override fun stop() = apply {
2021-02-21 17:59:11 +00:00
channel.close().sync()
2021-01-24 18:05:05 +00:00
transport.shutdownGracefully()
2020-06-06 22:52:25 +00:00
}
2021-01-26 16:15:50 +00:00
override fun port(): Int = serverSettings.port
2021-01-24 04:55:11 +00:00
}
companion object {
private val LOGGER = LoggerFactory.getLogger(Netty::class.java)
2020-06-06 22:52:25 +00:00
}
}
2020-06-27 18:15:49 +00:00
fun getX509Certs(certificates: String): Collection<X509Certificate> {
val targetStream: InputStream = ByteArrayInputStream(certificates.toByteArray())
2020-06-27 18:15:49 +00:00
@Suppress("unchecked_cast")
return CertificateFactory.getInstance("X509").generateCertificates(targetStream) as Collection<X509Certificate>
}
fun getPrivateKey(privateKey: String): PrivateKey {
return loadKey(privateKey)!!
2020-06-09 19:21:29 +00:00
}