1
0
Fork 1
mirror of https://gitlab.com/mangadex-pub/mangadex_at_home.git synced 2024-01-19 02:48:37 +00:00
mangadex_at_home/src/main/kotlin/mdnet/netty/ApplicationNetty.kt

217 lines
9.1 KiB
Kotlin
Raw Normal View History

2020-06-22 17:02:36 +00:00
/*
Mangadex@Home
Copyright (c) 2020, MangaDex Network
This file is part of MangaDex@Home.
MangaDex@Home is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
MangaDex@Home is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this MangaDex@Home. If not, see <http://www.gnu.org/licenses/>.
2021-01-25 02:25:49 +00:00
*/
2021-01-24 04:55:11 +00:00
package mdnet.netty
2020-06-06 22:52:25 +00:00
import io.netty.bootstrap.ServerBootstrap
import io.netty.channel.*
2021-01-24 18:05:05 +00:00
import io.netty.channel.epoll.Epoll
import io.netty.channel.epoll.EpollEventLoopGroup
import io.netty.channel.epoll.EpollServerSocketChannel
2020-06-06 22:52:25 +00:00
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.http.*
2020-06-06 22:52:25 +00:00
import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.stream.ChunkedWriteHandler
2020-07-04 17:14:51 +00:00
import io.netty.handler.timeout.ReadTimeoutException
import io.netty.handler.timeout.ReadTimeoutHandler
2020-07-04 17:14:51 +00:00
import io.netty.handler.timeout.WriteTimeoutException
import io.netty.handler.timeout.WriteTimeoutHandler
2020-06-06 22:52:25 +00:00
import io.netty.handler.traffic.GlobalTrafficShapingHandler
import io.netty.handler.traffic.TrafficCounter
2021-01-24 18:05:05 +00:00
import io.netty.incubator.channel.uring.IOUring
import io.netty.incubator.channel.uring.IOUringEventLoopGroup
import io.netty.incubator.channel.uring.IOUringServerSocketChannel
2021-01-24 04:55:11 +00:00
import io.netty.util.concurrent.DefaultEventExecutorGroup
import mdnet.Constants
import mdnet.data.Statistics
import mdnet.logging.info
import mdnet.logging.trace
import mdnet.settings.ServerSettings
import mdnet.settings.TlsCert
import org.http4k.core.HttpHandler
import org.http4k.server.Http4kChannelHandler
import org.http4k.server.Http4kServer
import org.http4k.server.ServerConfig
import org.slf4j.LoggerFactory
import java.io.ByteArrayInputStream
import java.io.IOException
import java.io.InputStream
2020-06-06 22:52:25 +00:00
import java.net.InetSocketAddress
2020-06-22 17:08:46 +00:00
import java.net.SocketException
import java.security.PrivateKey
import java.security.cert.CertificateFactory
import java.security.cert.X509Certificate
2020-06-06 22:52:25 +00:00
import java.util.concurrent.atomic.AtomicReference
import javax.net.ssl.SSLException
2020-06-06 22:52:25 +00:00
2021-01-24 18:05:05 +00:00
interface NettyTransport {
val masterGroup: EventLoopGroup
val workerGroup: EventLoopGroup
val factory: ChannelFactory<ServerChannel>
fun shutdownGracefully() {
masterGroup.shutdownGracefully()
workerGroup.shutdownGracefully()
}
private class NioTransport : NettyTransport {
override val masterGroup = NioEventLoopGroup()
override val workerGroup = NioEventLoopGroup()
override val factory = ChannelFactory<ServerChannel> { NioServerSocketChannel() }
}
private class EpollTransport : NettyTransport {
override val masterGroup = EpollEventLoopGroup()
override val workerGroup = EpollEventLoopGroup()
override val factory = ChannelFactory<ServerChannel> { EpollServerSocketChannel() }
}
private class IOUringTransport : NettyTransport {
override val masterGroup = IOUringEventLoopGroup()
override val workerGroup = IOUringEventLoopGroup()
override val factory = ChannelFactory<ServerChannel> { IOUringServerSocketChannel() }
}
companion object {
private val LOGGER = LoggerFactory.getLogger(NettyTransport::class.java)
fun bestForPlatform(): NettyTransport {
if (IOUring.isAvailable()) {
LOGGER.info("Using IOUring transport")
return IOUringTransport()
} else {
LOGGER.info(IOUring.unavailabilityCause()) {
"IOUring transport not available"
}
}
if (Epoll.isAvailable()) {
LOGGER.info("Using Epoll transport")
return EpollTransport()
} else {
LOGGER.info(Epoll.unavailabilityCause()) {
"Epoll transport not available"
}
}
LOGGER.info("Using Nio transport")
return NioTransport()
}
}
}
2020-08-11 19:12:01 +00:00
class Netty(private val tls: TlsCert, private val serverSettings: ServerSettings, private val statistics: AtomicReference<Statistics>) : ServerConfig {
2020-06-06 22:52:25 +00:00
override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer {
2021-01-24 18:05:05 +00:00
private val transport = NettyTransport.bestForPlatform()
2021-01-24 04:55:11 +00:00
private val executor = DefaultEventExecutorGroup(serverSettings.threads)
2020-06-06 22:52:25 +00:00
private lateinit var closeFuture: ChannelFuture
private lateinit var address: InetSocketAddress
private val burstLimiter = object : GlobalTrafficShapingHandler(
2021-01-24 18:05:05 +00:00
transport.workerGroup, serverSettings.maxKilobitsPerSecond * 1000L / 8L, 0, 50
2021-01-24 04:55:11 +00:00
) {
2020-06-06 22:52:25 +00:00
override fun doAccounting(counter: TrafficCounter) {
2020-06-13 03:35:08 +00:00
statistics.getAndUpdate {
it.copy(bytesSent = it.bytesSent + counter.cumulativeWrittenBytes())
}
counter.resetCumulativeTime()
2020-06-06 22:52:25 +00:00
}
}
override fun start(): Http4kServer = apply {
LOGGER.info { "Starting Netty with ${serverSettings.threads} threads" }
2020-06-27 18:15:49 +00:00
val certs = getX509Certs(tls.certificate)
val sslContext = SslContextBuilder
2021-01-24 04:55:11 +00:00
.forServer(getPrivateKey(tls.privateKey), certs)
.protocols("TLSv1.3", "TLSv1.2", "TLSv1.1", "TLSv1")
.build()
2020-06-06 22:52:25 +00:00
val bootstrap = ServerBootstrap()
2021-01-24 18:05:05 +00:00
bootstrap.group(transport.masterGroup, transport.workerGroup)
.channelFactory(transport.factory)
2021-01-24 04:55:11 +00:00
.childHandler(object : ChannelInitializer<SocketChannel>() {
public override fun initChannel(ch: SocketChannel) {
ch.pipeline().addLast("ssl", sslContext.newHandler(ch.alloc()))
2020-06-06 22:52:25 +00:00
2021-01-24 04:55:11 +00:00
ch.pipeline().addLast("codec", HttpServerCodec())
ch.pipeline().addLast("keepAlive", HttpServerKeepAliveHandler())
ch.pipeline().addLast("aggregator", HttpObjectAggregator(65536))
2020-06-09 19:29:33 +00:00
2021-01-24 04:55:11 +00:00
ch.pipeline().addLast("burstLimiter", burstLimiter)
2021-01-24 04:55:11 +00:00
ch.pipeline().addLast("readTimeoutHandler", ReadTimeoutHandler(Constants.MAX_READ_TIME_SECONDS))
ch.pipeline().addLast("writeTimeoutHandler", WriteTimeoutHandler(Constants.MAX_WRITE_TIME_SECONDS))
2021-01-24 04:55:11 +00:00
ch.pipeline().addLast("streamer", ChunkedWriteHandler())
ch.pipeline().addLast(executor, "handler", Http4kChannelHandler(httpHandler))
2021-01-24 04:55:11 +00:00
ch.pipeline().addLast(
"exceptions",
object : ChannelInboundHandlerAdapter() {
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
if (cause is SSLException || (cause is DecoderException && cause.cause is SSLException)) {
2020-07-04 19:39:11 +00:00
LOGGER.trace { "Ignored invalid SSL connection" }
2021-01-24 04:55:11 +00:00
LOGGER.trace(cause) { "Exception in pipeline" }
2020-06-22 17:08:46 +00:00
} else if (cause is IOException || cause is SocketException) {
2020-07-04 19:39:11 +00:00
LOGGER.info { "User (downloader) abruptly closed the connection" }
LOGGER.trace(cause) { "Exception in pipeline" }
2020-07-04 17:14:51 +00:00
} else if (cause !is ReadTimeoutException && cause !is WriteTimeoutException) {
ctx.fireExceptionCaught(cause)
}
}
2021-01-24 04:55:11 +00:00
}
)
}
})
.option(ChannelOption.SO_BACKLOG, 1000)
.childOption(ChannelOption.SO_KEEPALIVE, true)
val channel = bootstrap.bind(InetSocketAddress(serverSettings.hostname, serverSettings.port)).sync().channel()
2020-06-06 22:52:25 +00:00
address = channel.localAddress() as InetSocketAddress
closeFuture = channel.closeFuture()
}
override fun stop() = apply {
2020-08-21 15:50:02 +00:00
closeFuture.cancel(false)
2021-01-24 18:05:05 +00:00
transport.shutdownGracefully()
2021-01-24 04:55:11 +00:00
executor.shutdownGracefully()
2020-06-06 22:52:25 +00:00
}
2021-01-24 04:55:11 +00:00
override fun port(): Int = if (serverSettings.port > 0) serverSettings.port else address.port
}
companion object {
private val LOGGER = LoggerFactory.getLogger(Netty::class.java)
2020-06-06 22:52:25 +00:00
}
}
2020-06-27 18:15:49 +00:00
fun getX509Certs(certificates: String): Collection<X509Certificate> {
val targetStream: InputStream = ByteArrayInputStream(certificates.toByteArray())
2020-06-27 18:15:49 +00:00
@Suppress("unchecked_cast")
return CertificateFactory.getInstance("X509").generateCertificates(targetStream) as Collection<X509Certificate>
}
fun getPrivateKey(privateKey: String): PrivateKey {
return loadKey(privateKey)!!
2020-06-09 19:21:29 +00:00
}