From 907e90803d234d7ad13ce87c250b1f7976b8c669 Mon Sep 17 00:00:00 2001 From: carbotaniuman <41451839+carbotaniuman@users.noreply.github.com> Date: Mon, 8 Jun 2020 19:57:50 -0500 Subject: [PATCH] Rc6 release - proxy data-saver - optimize thread count --- build.gradle | 2 +- src/main/java/mdnet/base/Constants.java | 3 -- src/main/kotlin/mdnet/base/Application.kt | 35 ++++++++-------- src/main/kotlin/mdnet/base/Netty.kt | 49 ----------------------- 4 files changed, 20 insertions(+), 69 deletions(-) diff --git a/build.gradle b/build.gradle index 6b4c63f..9bfa63e 100644 --- a/build.gradle +++ b/build.gradle @@ -7,7 +7,7 @@ plugins { } group = "com.mangadex" -version = "1.0.0-rc5" +version = "1.0.0-rc6" mainClassName = "mdnet.base.MangaDexClient" repositories { diff --git a/src/main/java/mdnet/base/Constants.java b/src/main/java/mdnet/base/Constants.java index 4805455..c3cb073 100644 --- a/src/main/java/mdnet/base/Constants.java +++ b/src/main/java/mdnet/base/Constants.java @@ -6,7 +6,4 @@ public class Constants { public static final int CLIENT_BUILD = 1; public static final String CLIENT_VERSION = "1.0"; public static final Duration MAX_AGE_CACHE = Duration.ofDays(14); - - public static final int MAX_CONCURRENT_CONNECTIONS = 100; - public static final String OVERLOADED_MESSAGE = "This server is experiencing a surge in connections. Please try again later."; } diff --git a/src/main/kotlin/mdnet/base/Application.kt b/src/main/kotlin/mdnet/base/Application.kt index 2703acf..019eb5d 100644 --- a/src/main/kotlin/mdnet/base/Application.kt +++ b/src/main/kotlin/mdnet/base/Application.kt @@ -49,6 +49,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting .setConnectionRequestTimeout(3000) .build()) .setMaxConnTotal(10) + .setMaxConnPerRoute(10) .build()) val app = { dataSaver: Boolean -> @@ -57,7 +58,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting val fileName = Path.of("fileName")(request) if (LOGGER.isTraceEnabled) { - LOGGER.trace("Request for $chapterHash/$fileName received") + LOGGER.trace("Request for ${request.uri} received") } val rc4Bytes = if (dataSaver) { @@ -89,7 +90,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting // our files never change, so it's safe to use the browser cache if (request.header("If-Modified-Since") != null) { if (LOGGER.isTraceEnabled) { - LOGGER.trace("Request for $chapterHash/$fileName cached by browser") + LOGGER.trace("Request for ${request.uri} cached by browser") } val lastModified = snapshot.getString(2) @@ -99,7 +100,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting .header("Last-Modified", lastModified) } else { if (LOGGER.isTraceEnabled) { - LOGGER.trace("Request for $chapterHash/$fileName hit cache") + LOGGER.trace("Request for ${request.uri} hit cache") } respondWithImage( @@ -110,32 +111,32 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting } else { statistics.get().cacheMisses.incrementAndGet() if (LOGGER.isTraceEnabled) { - LOGGER.trace("Request for $chapterHash/$fileName missed cache") + LOGGER.trace("Request for ${request.uri} missed cache") } val mdResponse = client(Request(Method.GET, "${serverSettings.imageServer}${request.uri}")) if (mdResponse.status != Status.OK) { if (LOGGER.isTraceEnabled) { - LOGGER.trace("Upstream query for $chapterHash/$fileName errored with status {}", mdResponse.status) + LOGGER.trace("Upstream query for ${request.uri} errored with status {}", mdResponse.status) } mdResponse.close() Response(mdResponse.status) } else { if (LOGGER.isTraceEnabled) { - LOGGER.trace("Upstream query for $chapterHash/$fileName succeeded") + LOGGER.trace("Upstream query for ${request.uri} succeeded") } - val contentLength = mdResponse.header("Content-Length")!! val contentType = mdResponse.header("Content-Type")!! - val lastModified = mdResponse.header("Last-Modified")!! + val contentLength = mdResponse.header("Content-Length") + val lastModified = mdResponse.header("Last-Modified") val editor = cache.edit(cacheId) // A null editor means that this file is being written to // concurrently so we skip the cache process - if (editor != null) { + if (editor != null && contentLength != null && lastModified != null) { if (LOGGER.isTraceEnabled) { - LOGGER.trace("Request for $chapterHash/$fileName is being cached and served") + LOGGER.trace("Request for ${request.uri} is being cached and served") } editor.setString(1, contentType) editor.setString(2, lastModified) @@ -148,13 +149,13 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting // check that tee gets closed and for exceptions in this lambda if (editor.getLength(0) == contentLength.toLong()) { if (LOGGER.isTraceEnabled) { - LOGGER.trace("Cache download $chapterHash/$fileName committed") + LOGGER.trace("Cache download ${request.uri} committed") } editor.commit() } else { if (LOGGER.isTraceEnabled) { - LOGGER.trace("Cache download $chapterHash/$fileName aborted") + LOGGER.trace("Cache download ${request.uri} aborted") } editor.abort() @@ -162,11 +163,13 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting } respondWithImage(tee, contentLength, contentType, lastModified) } else { + editor?.abort() + if (LOGGER.isTraceEnabled) { - LOGGER.trace("Request for $chapterHash/$fileName is being served") + LOGGER.trace("Request for ${request.uri} is being served") } - respondWithImage(mdResponse.body.stream, contentLength, contentType, lastModified) + respondWithImage(mdResponse.body.stream, contentLength ?: "", contentType, lastModified ?: "") } } } @@ -180,8 +183,8 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting .then(addCommonHeaders()) .then( routes( - "/data/{chapterHash}/{fileName}" bind Method.GET to app(false) -// "/data-saver/{chapterHash}/{fileName}" bind Method.GET to app(true) + "/data/{chapterHash}/{fileName}" bind Method.GET to app(false), + "/data-saver/{chapterHash}/{fileName}" bind Method.GET to app(true) ) ) .asServer(Netty(serverSettings.tls, clientSettings, statistics)) diff --git a/src/main/kotlin/mdnet/base/Netty.kt b/src/main/kotlin/mdnet/base/Netty.kt index 703d3fd..4966b29 100644 --- a/src/main/kotlin/mdnet/base/Netty.kt +++ b/src/main/kotlin/mdnet/base/Netty.kt @@ -1,10 +1,8 @@ package mdnet.base import io.netty.bootstrap.ServerBootstrap -import io.netty.buffer.Unpooled import io.netty.channel.ChannelFactory import io.netty.channel.ChannelFuture -import io.netty.channel.ChannelHandler.Sharable import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelInitializer @@ -14,16 +12,10 @@ import io.netty.channel.nio.NioEventLoopGroup import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.handler.codec.DecoderException -import io.netty.handler.codec.http.DefaultFullHttpResponse -import io.netty.handler.codec.http.HttpHeaderNames import io.netty.handler.codec.http.HttpObjectAggregator -import io.netty.handler.codec.http.HttpResponseStatus import io.netty.handler.codec.http.HttpServerCodec -import io.netty.handler.codec.http.HttpUtil -import io.netty.handler.codec.http.HttpVersion import io.netty.handler.ssl.OptionalSslHandler import io.netty.handler.ssl.SslContextBuilder -import io.netty.handler.ssl.SslHandler import io.netty.handler.stream.ChunkedWriteHandler import io.netty.handler.traffic.GlobalTrafficShapingHandler import io.netty.handler.traffic.TrafficCounter @@ -33,51 +25,12 @@ import org.http4k.server.Http4kServer import org.http4k.server.ServerConfig import org.slf4j.LoggerFactory import java.net.InetSocketAddress -import java.nio.charset.StandardCharsets import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicReference import javax.net.ssl.SSLException private val LOGGER = LoggerFactory.getLogger("Application") -@Sharable -class ConnectionCounter : ChannelInboundHandlerAdapter() { - private val connections = AtomicInteger() - - override fun channelActive(ctx: ChannelHandlerContext) { - val sslHandler = ctx.pipeline()[SslHandler::class.java] - - if (sslHandler != null) { - sslHandler.handshakeFuture().addListener { - handleConnection(ctx) - } - } else { - handleConnection(ctx) - } - } - - private fun handleConnection(ctx: ChannelHandlerContext) { - if (connections.incrementAndGet() <= Constants.MAX_CONCURRENT_CONNECTIONS) { - super.channelActive(ctx) - } else { - val response = Unpooled.copiedBuffer(Constants.OVERLOADED_MESSAGE, StandardCharsets.UTF_8) - val res = - DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, response) - res.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8") - HttpUtil.setContentLength(res, response.readableBytes().toLong()) - - ctx.writeAndFlush(res) - ctx.close() - } - } - - override fun channelInactive(ctx: ChannelHandlerContext) { - super.channelInactive(ctx) - connections.decrementAndGet() - } -} - class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val stats: AtomicReference) : ServerConfig { override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer { private val masterGroup = NioEventLoopGroup() @@ -85,7 +38,6 @@ class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: private lateinit var closeFuture: ChannelFuture private lateinit var address: InetSocketAddress - private val counter = ConnectionCounter() private val burstLimiter = object : GlobalTrafficShapingHandler( workerGroup, 1024 * clientSettings.maxBurstRateKibPerSecond, 0, 50) { override fun doAccounting(counter: TrafficCounter) { @@ -105,7 +57,6 @@ class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ch.pipeline().addLast("ssl", OptionalSslHandler(sslContext)) ch.pipeline().addLast("codec", HttpServerCodec()) - ch.pipeline().addLast("counter", counter) ch.pipeline().addLast("aggregator", HttpObjectAggregator(65536)) ch.pipeline().addLast("burstLimiter", burstLimiter) ch.pipeline().addLast("streamer", ChunkedWriteHandler())