Rc6 release

- proxy data-saver
- optimize thread count
This commit is contained in:
carbotaniuman 2020-06-08 19:57:50 -05:00
parent 98c4956b80
commit 907e90803d
4 changed files with 20 additions and 69 deletions

View file

@ -7,7 +7,7 @@ plugins {
} }
group = "com.mangadex" group = "com.mangadex"
version = "1.0.0-rc5" version = "1.0.0-rc6"
mainClassName = "mdnet.base.MangaDexClient" mainClassName = "mdnet.base.MangaDexClient"
repositories { repositories {

View file

@ -6,7 +6,4 @@ public class Constants {
public static final int CLIENT_BUILD = 1; public static final int CLIENT_BUILD = 1;
public static final String CLIENT_VERSION = "1.0"; public static final String CLIENT_VERSION = "1.0";
public static final Duration MAX_AGE_CACHE = Duration.ofDays(14); public static final Duration MAX_AGE_CACHE = Duration.ofDays(14);
public static final int MAX_CONCURRENT_CONNECTIONS = 100;
public static final String OVERLOADED_MESSAGE = "This server is experiencing a surge in connections. Please try again later.";
} }

View file

@ -49,6 +49,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
.setConnectionRequestTimeout(3000) .setConnectionRequestTimeout(3000)
.build()) .build())
.setMaxConnTotal(10) .setMaxConnTotal(10)
.setMaxConnPerRoute(10)
.build()) .build())
val app = { dataSaver: Boolean -> val app = { dataSaver: Boolean ->
@ -57,7 +58,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
val fileName = Path.of("fileName")(request) val fileName = Path.of("fileName")(request)
if (LOGGER.isTraceEnabled) { if (LOGGER.isTraceEnabled) {
LOGGER.trace("Request for $chapterHash/$fileName received") LOGGER.trace("Request for ${request.uri} received")
} }
val rc4Bytes = if (dataSaver) { val rc4Bytes = if (dataSaver) {
@ -89,7 +90,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
// our files never change, so it's safe to use the browser cache // our files never change, so it's safe to use the browser cache
if (request.header("If-Modified-Since") != null) { if (request.header("If-Modified-Since") != null) {
if (LOGGER.isTraceEnabled) { if (LOGGER.isTraceEnabled) {
LOGGER.trace("Request for $chapterHash/$fileName cached by browser") LOGGER.trace("Request for ${request.uri} cached by browser")
} }
val lastModified = snapshot.getString(2) val lastModified = snapshot.getString(2)
@ -99,7 +100,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
.header("Last-Modified", lastModified) .header("Last-Modified", lastModified)
} else { } else {
if (LOGGER.isTraceEnabled) { if (LOGGER.isTraceEnabled) {
LOGGER.trace("Request for $chapterHash/$fileName hit cache") LOGGER.trace("Request for ${request.uri} hit cache")
} }
respondWithImage( respondWithImage(
@ -110,32 +111,32 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
} else { } else {
statistics.get().cacheMisses.incrementAndGet() statistics.get().cacheMisses.incrementAndGet()
if (LOGGER.isTraceEnabled) { if (LOGGER.isTraceEnabled) {
LOGGER.trace("Request for $chapterHash/$fileName missed cache") LOGGER.trace("Request for ${request.uri} missed cache")
} }
val mdResponse = client(Request(Method.GET, "${serverSettings.imageServer}${request.uri}")) val mdResponse = client(Request(Method.GET, "${serverSettings.imageServer}${request.uri}"))
if (mdResponse.status != Status.OK) { if (mdResponse.status != Status.OK) {
if (LOGGER.isTraceEnabled) { if (LOGGER.isTraceEnabled) {
LOGGER.trace("Upstream query for $chapterHash/$fileName errored with status {}", mdResponse.status) LOGGER.trace("Upstream query for ${request.uri} errored with status {}", mdResponse.status)
} }
mdResponse.close() mdResponse.close()
Response(mdResponse.status) Response(mdResponse.status)
} else { } else {
if (LOGGER.isTraceEnabled) { if (LOGGER.isTraceEnabled) {
LOGGER.trace("Upstream query for $chapterHash/$fileName succeeded") LOGGER.trace("Upstream query for ${request.uri} succeeded")
} }
val contentLength = mdResponse.header("Content-Length")!!
val contentType = mdResponse.header("Content-Type")!! val contentType = mdResponse.header("Content-Type")!!
val lastModified = mdResponse.header("Last-Modified")!! val contentLength = mdResponse.header("Content-Length")
val lastModified = mdResponse.header("Last-Modified")
val editor = cache.edit(cacheId) val editor = cache.edit(cacheId)
// A null editor means that this file is being written to // A null editor means that this file is being written to
// concurrently so we skip the cache process // concurrently so we skip the cache process
if (editor != null) { if (editor != null && contentLength != null && lastModified != null) {
if (LOGGER.isTraceEnabled) { if (LOGGER.isTraceEnabled) {
LOGGER.trace("Request for $chapterHash/$fileName is being cached and served") LOGGER.trace("Request for ${request.uri} is being cached and served")
} }
editor.setString(1, contentType) editor.setString(1, contentType)
editor.setString(2, lastModified) editor.setString(2, lastModified)
@ -148,13 +149,13 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
// check that tee gets closed and for exceptions in this lambda // check that tee gets closed and for exceptions in this lambda
if (editor.getLength(0) == contentLength.toLong()) { if (editor.getLength(0) == contentLength.toLong()) {
if (LOGGER.isTraceEnabled) { if (LOGGER.isTraceEnabled) {
LOGGER.trace("Cache download $chapterHash/$fileName committed") LOGGER.trace("Cache download ${request.uri} committed")
} }
editor.commit() editor.commit()
} else { } else {
if (LOGGER.isTraceEnabled) { if (LOGGER.isTraceEnabled) {
LOGGER.trace("Cache download $chapterHash/$fileName aborted") LOGGER.trace("Cache download ${request.uri} aborted")
} }
editor.abort() editor.abort()
@ -162,11 +163,13 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
} }
respondWithImage(tee, contentLength, contentType, lastModified) respondWithImage(tee, contentLength, contentType, lastModified)
} else { } else {
editor?.abort()
if (LOGGER.isTraceEnabled) { if (LOGGER.isTraceEnabled) {
LOGGER.trace("Request for $chapterHash/$fileName is being served") LOGGER.trace("Request for ${request.uri} is being served")
} }
respondWithImage(mdResponse.body.stream, contentLength, contentType, lastModified) respondWithImage(mdResponse.body.stream, contentLength ?: "", contentType, lastModified ?: "")
} }
} }
} }
@ -180,8 +183,8 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
.then(addCommonHeaders()) .then(addCommonHeaders())
.then( .then(
routes( routes(
"/data/{chapterHash}/{fileName}" bind Method.GET to app(false) "/data/{chapterHash}/{fileName}" bind Method.GET to app(false),
// "/data-saver/{chapterHash}/{fileName}" bind Method.GET to app(true) "/data-saver/{chapterHash}/{fileName}" bind Method.GET to app(true)
) )
) )
.asServer(Netty(serverSettings.tls, clientSettings, statistics)) .asServer(Netty(serverSettings.tls, clientSettings, statistics))

View file

@ -1,10 +1,8 @@
package mdnet.base package mdnet.base
import io.netty.bootstrap.ServerBootstrap import io.netty.bootstrap.ServerBootstrap
import io.netty.buffer.Unpooled
import io.netty.channel.ChannelFactory import io.netty.channel.ChannelFactory
import io.netty.channel.ChannelFuture import io.netty.channel.ChannelFuture
import io.netty.channel.ChannelHandler.Sharable
import io.netty.channel.ChannelHandlerContext import io.netty.channel.ChannelHandlerContext
import io.netty.channel.ChannelInboundHandlerAdapter import io.netty.channel.ChannelInboundHandlerAdapter
import io.netty.channel.ChannelInitializer import io.netty.channel.ChannelInitializer
@ -14,16 +12,10 @@ import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.SocketChannel import io.netty.channel.socket.SocketChannel
import io.netty.channel.socket.nio.NioServerSocketChannel import io.netty.channel.socket.nio.NioServerSocketChannel
import io.netty.handler.codec.DecoderException import io.netty.handler.codec.DecoderException
import io.netty.handler.codec.http.DefaultFullHttpResponse
import io.netty.handler.codec.http.HttpHeaderNames
import io.netty.handler.codec.http.HttpObjectAggregator import io.netty.handler.codec.http.HttpObjectAggregator
import io.netty.handler.codec.http.HttpResponseStatus
import io.netty.handler.codec.http.HttpServerCodec import io.netty.handler.codec.http.HttpServerCodec
import io.netty.handler.codec.http.HttpUtil
import io.netty.handler.codec.http.HttpVersion
import io.netty.handler.ssl.OptionalSslHandler import io.netty.handler.ssl.OptionalSslHandler
import io.netty.handler.ssl.SslContextBuilder import io.netty.handler.ssl.SslContextBuilder
import io.netty.handler.ssl.SslHandler
import io.netty.handler.stream.ChunkedWriteHandler import io.netty.handler.stream.ChunkedWriteHandler
import io.netty.handler.traffic.GlobalTrafficShapingHandler import io.netty.handler.traffic.GlobalTrafficShapingHandler
import io.netty.handler.traffic.TrafficCounter import io.netty.handler.traffic.TrafficCounter
@ -33,51 +25,12 @@ import org.http4k.server.Http4kServer
import org.http4k.server.ServerConfig import org.http4k.server.ServerConfig
import org.slf4j.LoggerFactory import org.slf4j.LoggerFactory
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.nio.charset.StandardCharsets
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import javax.net.ssl.SSLException import javax.net.ssl.SSLException
private val LOGGER = LoggerFactory.getLogger("Application") private val LOGGER = LoggerFactory.getLogger("Application")
@Sharable
class ConnectionCounter : ChannelInboundHandlerAdapter() {
private val connections = AtomicInteger()
override fun channelActive(ctx: ChannelHandlerContext) {
val sslHandler = ctx.pipeline()[SslHandler::class.java]
if (sslHandler != null) {
sslHandler.handshakeFuture().addListener {
handleConnection(ctx)
}
} else {
handleConnection(ctx)
}
}
private fun handleConnection(ctx: ChannelHandlerContext) {
if (connections.incrementAndGet() <= Constants.MAX_CONCURRENT_CONNECTIONS) {
super.channelActive(ctx)
} else {
val response = Unpooled.copiedBuffer(Constants.OVERLOADED_MESSAGE, StandardCharsets.UTF_8)
val res =
DefaultFullHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.SERVICE_UNAVAILABLE, response)
res.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html; charset=UTF-8")
HttpUtil.setContentLength(res, response.readableBytes().toLong())
ctx.writeAndFlush(res)
ctx.close()
}
}
override fun channelInactive(ctx: ChannelHandlerContext) {
super.channelInactive(ctx)
connections.decrementAndGet()
}
}
class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val stats: AtomicReference<Statistics>) : ServerConfig { class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val stats: AtomicReference<Statistics>) : ServerConfig {
override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer { override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer {
private val masterGroup = NioEventLoopGroup() private val masterGroup = NioEventLoopGroup()
@ -85,7 +38,6 @@ class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings:
private lateinit var closeFuture: ChannelFuture private lateinit var closeFuture: ChannelFuture
private lateinit var address: InetSocketAddress private lateinit var address: InetSocketAddress
private val counter = ConnectionCounter()
private val burstLimiter = object : GlobalTrafficShapingHandler( private val burstLimiter = object : GlobalTrafficShapingHandler(
workerGroup, 1024 * clientSettings.maxBurstRateKibPerSecond, 0, 50) { workerGroup, 1024 * clientSettings.maxBurstRateKibPerSecond, 0, 50) {
override fun doAccounting(counter: TrafficCounter) { override fun doAccounting(counter: TrafficCounter) {
@ -105,7 +57,6 @@ class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings:
ch.pipeline().addLast("ssl", OptionalSslHandler(sslContext)) ch.pipeline().addLast("ssl", OptionalSslHandler(sslContext))
ch.pipeline().addLast("codec", HttpServerCodec()) ch.pipeline().addLast("codec", HttpServerCodec())
ch.pipeline().addLast("counter", counter)
ch.pipeline().addLast("aggregator", HttpObjectAggregator(65536)) ch.pipeline().addLast("aggregator", HttpObjectAggregator(65536))
ch.pipeline().addLast("burstLimiter", burstLimiter) ch.pipeline().addLast("burstLimiter", burstLimiter)
ch.pipeline().addLast("streamer", ChunkedWriteHandler()) ch.pipeline().addLast("streamer", ChunkedWriteHandler())