From 25cfc536746c3a26d1d851484a84ae9d729176db Mon Sep 17 00:00:00 2001 From: Amos Ng Date: Thu, 11 Jun 2020 23:34:42 +0800 Subject: [PATCH 01/10] Added missing connection closer(?) --- src/main/kotlin/mdnet/base/Application.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/kotlin/mdnet/base/Application.kt b/src/main/kotlin/mdnet/base/Application.kt index 4fb3094..e45c2c8 100644 --- a/src/main/kotlin/mdnet/base/Application.kt +++ b/src/main/kotlin/mdnet/base/Application.kt @@ -185,6 +185,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting editor.abort() } + mdResponse.close() } respondWithImage(tee, contentLength, contentType, lastModified) } else { From 5dbe29c8e94deb117b8450922c463a2757c63d45 Mon Sep 17 00:00:00 2001 From: Amos Ng Date: Fri, 12 Jun 2020 00:55:09 +0800 Subject: [PATCH 02/10] Shifted closing of pool connections better --- src/main/kotlin/mdnet/base/Application.kt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/mdnet/base/Application.kt b/src/main/kotlin/mdnet/base/Application.kt index e45c2c8..d270e45 100644 --- a/src/main/kotlin/mdnet/base/Application.kt +++ b/src/main/kotlin/mdnet/base/Application.kt @@ -185,8 +185,8 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting editor.abort() } - mdResponse.close() } + mdResponse.close() respondWithImage(tee, contentLength, contentType, lastModified) } else { editor?.abort() @@ -194,7 +194,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting if (LOGGER.isTraceEnabled) { LOGGER.trace("Request for $sanitizedUri is being served") } - + mdResponse.close() respondWithImage(mdResponse.body.stream, contentLength, contentType, lastModified) } } From da47b86d97d24828835a53ef6018ba7502252bee Mon Sep 17 00:00:00 2001 From: Amos Ng Date: Fri, 12 Jun 2020 00:55:47 +0800 Subject: [PATCH 03/10] Make Netty naive-multithreaded too Hopefully we can replace this that intelligently scales to a user setting next time... --- src/main/kotlin/mdnet/base/Application.kt | 2 +- src/main/kotlin/mdnet/base/Netty.kt | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/src/main/kotlin/mdnet/base/Application.kt b/src/main/kotlin/mdnet/base/Application.kt index d270e45..a5e771f 100644 --- a/src/main/kotlin/mdnet/base/Application.kt +++ b/src/main/kotlin/mdnet/base/Application.kt @@ -39,7 +39,7 @@ import javax.crypto.CipherOutputStream import javax.crypto.spec.SecretKeySpec private val LOGGER = LoggerFactory.getLogger("Application") -private val THREADS_TO_ALLOCATE = Runtime.getRuntime().availableProcessors() * 30 / 2 +private val THREADS_TO_ALLOCATE = Runtime.getRuntime().availableProcessors() * 30 fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSettings: ClientSettings, statistics: AtomicReference): Http4kServer { val executor = Executors.newCachedThreadPool() diff --git a/src/main/kotlin/mdnet/base/Netty.kt b/src/main/kotlin/mdnet/base/Netty.kt index 2cbde77..2846c92 100644 --- a/src/main/kotlin/mdnet/base/Netty.kt +++ b/src/main/kotlin/mdnet/base/Netty.kt @@ -38,8 +38,8 @@ private val LOGGER = LoggerFactory.getLogger("Application") class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val stats: AtomicReference) : ServerConfig { override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer { - private val masterGroup = NioEventLoopGroup() - private val workerGroup = NioEventLoopGroup() + private val masterGroup = NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 30 / 2) + private val workerGroup = NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 30 / 2) private lateinit var closeFuture: ChannelFuture private lateinit var address: InetSocketAddress From a1130d204ccb65be3adc640621940b6436bcaac2 Mon Sep 17 00:00:00 2001 From: Amos Ng Date: Fri, 12 Jun 2020 01:35:12 +0800 Subject: [PATCH 04/10] Slight typo. --- src/main/kotlin/mdnet/base/Application.kt | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/main/kotlin/mdnet/base/Application.kt b/src/main/kotlin/mdnet/base/Application.kt index a5e771f..7f5c0b6 100644 --- a/src/main/kotlin/mdnet/base/Application.kt +++ b/src/main/kotlin/mdnet/base/Application.kt @@ -186,7 +186,6 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting editor.abort() } } - mdResponse.close() respondWithImage(tee, contentLength, contentType, lastModified) } else { editor?.abort() @@ -194,9 +193,9 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting if (LOGGER.isTraceEnabled) { LOGGER.trace("Request for $sanitizedUri is being served") } - mdResponse.close() respondWithImage(mdResponse.body.stream, contentLength, contentType, lastModified) } + mdResponse.close() } } } From c87a0c00415260aae3051a35142010cd4b731b46 Mon Sep 17 00:00:00 2001 From: Amos Ng Date: Fri, 12 Jun 2020 02:46:48 +0800 Subject: [PATCH 05/10] Change ApacheClient to maximum open sockets allowed --- src/main/kotlin/mdnet/base/Application.kt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/kotlin/mdnet/base/Application.kt b/src/main/kotlin/mdnet/base/Application.kt index 7f5c0b6..d3b3dc2 100644 --- a/src/main/kotlin/mdnet/base/Application.kt +++ b/src/main/kotlin/mdnet/base/Application.kt @@ -39,7 +39,7 @@ import javax.crypto.CipherOutputStream import javax.crypto.spec.SecretKeySpec private val LOGGER = LoggerFactory.getLogger("Application") -private val THREADS_TO_ALLOCATE = Runtime.getRuntime().availableProcessors() * 30 +private val THREADS_TO_ALLOCATE = 65535 // Have it at the maximum open sockets a user can have in most modern OSes. No reason to limit this, just limit it at the Netty side. fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSettings: ClientSettings, statistics: AtomicReference): Http4kServer { val executor = Executors.newCachedThreadPool() From ac682e98c337ec157ca13bd80bcc62779e8e99dd Mon Sep 17 00:00:00 2001 From: Amos Ng Date: Fri, 12 Jun 2020 02:47:43 +0800 Subject: [PATCH 06/10] Made more changes to the numbers used for threading --- src/main/kotlin/mdnet/base/Application.kt | 4 +--- src/main/kotlin/mdnet/base/Netty.kt | 9 +++++++-- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/mdnet/base/Application.kt b/src/main/kotlin/mdnet/base/Application.kt index d3b3dc2..4c7e8c9 100644 --- a/src/main/kotlin/mdnet/base/Application.kt +++ b/src/main/kotlin/mdnet/base/Application.kt @@ -45,7 +45,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting val executor = Executors.newCachedThreadPool() if (LOGGER.isInfoEnabled) { - LOGGER.info("Starting ApacheClient with {} threads", THREADS_TO_ALLOCATE) + LOGGER.info("Starting image retriever") } val client = ApacheClient(responseBodyMode = BodyMode.Stream, client = HttpClients.custom() @@ -144,7 +144,6 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting if (LOGGER.isTraceEnabled) { LOGGER.trace("Upstream query for $sanitizedUri errored with status {}", mdResponse.status) } - mdResponse.close() Response(mdResponse.status) } else { if (LOGGER.isTraceEnabled) { @@ -195,7 +194,6 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting } respondWithImage(mdResponse.body.stream, contentLength, contentType, lastModified) } - mdResponse.close() } } } diff --git a/src/main/kotlin/mdnet/base/Netty.kt b/src/main/kotlin/mdnet/base/Netty.kt index 2846c92..070cdb2 100644 --- a/src/main/kotlin/mdnet/base/Netty.kt +++ b/src/main/kotlin/mdnet/base/Netty.kt @@ -35,11 +35,12 @@ import java.util.concurrent.atomic.AtomicReference import javax.net.ssl.SSLException private val LOGGER = LoggerFactory.getLogger("Application") +private val THREADS_TO_ALLOCATE = Runtime.getRuntime().availableProcessors() * 32 / 2 class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val stats: AtomicReference) : ServerConfig { override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer { - private val masterGroup = NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 30 / 2) - private val workerGroup = NioEventLoopGroup(Runtime.getRuntime().availableProcessors() * 30 / 2) + private val masterGroup = NioEventLoopGroup(THREADS_TO_ALLOCATE) + private val workerGroup = NioEventLoopGroup(THREADS_TO_ALLOCATE) private lateinit var closeFuture: ChannelFuture private lateinit var address: InetSocketAddress @@ -52,6 +53,10 @@ class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: } override fun start(): Http4kServer = apply { + if (LOGGER.isInfoEnabled) { + LOGGER.info("Starting webserver with {} threads", THREADS_TO_ALLOCATE) + } + val (mainCert, chainCert) = getX509Certs(tls.certificate) val sslContext = SslContextBuilder .forServer(getPrivateKey(tls.privateKey), mainCert, chainCert) From 40be05e4d494ca9c3673222f3c96849cc966b7fd Mon Sep 17 00:00:00 2001 From: Amos Ng Date: Fri, 12 Jun 2020 03:10:00 +0800 Subject: [PATCH 07/10] Made threads-per-cpu configurable for moar speed --- dev/settings.json | 3 ++- src/main/java/mdnet/base/ClientSettings.java | 11 +++++++++-- src/main/kotlin/mdnet/base/Netty.kt | 8 ++++---- 3 files changed, 15 insertions(+), 7 deletions(-) diff --git a/dev/settings.json b/dev/settings.json index 7bdb768..515d8e4 100644 --- a/dev/settings.json +++ b/dev/settings.json @@ -3,5 +3,6 @@ "max_cache_size_mib": 2048, "client_port": 8080, "max_burst_rate_kib_per_second": 100, - "max_bandwidth_mib_per_hour": 1 + "max_bandwidth_mib_per_hour": 1, + "threads_per_cpu": 32 } diff --git a/src/main/java/mdnet/base/ClientSettings.java b/src/main/java/mdnet/base/ClientSettings.java index d84687a..de17707 100644 --- a/src/main/java/mdnet/base/ClientSettings.java +++ b/src/main/java/mdnet/base/ClientSettings.java @@ -16,14 +16,17 @@ public final class ClientSettings { private final int clientPort; @SerializedName("client_secret") private final String clientSecret; + @SerializedName("threads_per_cpu") + private final int threadsPerCPU; public ClientSettings(long maxCacheSizeMib, long maxBandwidthMibPerHour, long maxBurstRateKibPerSecond, - int clientPort, String clientSecret) { + int clientPort, String clientSecret, int threadsPerCPU) { this.maxCacheSizeMib = maxCacheSizeMib; this.maxBandwidthMibPerHour = maxBandwidthMibPerHour; this.maxBurstRateKibPerSecond = maxBurstRateKibPerSecond; this.clientPort = clientPort; this.clientSecret = Objects.requireNonNull(clientSecret); + this.threadsPerCPU = threadsPerCPU; } public long getMaxCacheSizeMib() { @@ -46,11 +49,15 @@ public final class ClientSettings { return clientSecret; } + public int getThreadsPerCPU() { + return threadsPerCPU; + } + @Override public String toString() { return "ClientSettings{" + "maxCacheSizeMib=" + maxCacheSizeMib + ", maxBandwidthMibPerHour=" + maxBandwidthMibPerHour + ", maxBurstRateKibPerSecond=" + maxBurstRateKibPerSecond + ", clientPort=" - + clientPort + ", clientSecret='" + "" + '\'' + '}'; + + clientPort + ", clientSecret='" + "" + '\'' + ", threadsPerCPU=" + threadsPerCPU + "}"; } public static boolean isSecretValid(String clientSecret) { diff --git a/src/main/kotlin/mdnet/base/Netty.kt b/src/main/kotlin/mdnet/base/Netty.kt index 070cdb2..36dace2 100644 --- a/src/main/kotlin/mdnet/base/Netty.kt +++ b/src/main/kotlin/mdnet/base/Netty.kt @@ -35,12 +35,12 @@ import java.util.concurrent.atomic.AtomicReference import javax.net.ssl.SSLException private val LOGGER = LoggerFactory.getLogger("Application") -private val THREADS_TO_ALLOCATE = Runtime.getRuntime().availableProcessors() * 32 / 2 +private val THREADS_TO_ALLOCATE = Runtime.getRuntime().availableProcessors() class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val stats: AtomicReference) : ServerConfig { override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer { - private val masterGroup = NioEventLoopGroup(THREADS_TO_ALLOCATE) - private val workerGroup = NioEventLoopGroup(THREADS_TO_ALLOCATE) + private val masterGroup = NioEventLoopGroup(THREADS_TO_ALLOCATE * clientSettings.getThreadsPerCPU()) + private val workerGroup = NioEventLoopGroup(THREADS_TO_ALLOCATE * clientSettings.getThreadsPerCPU()) private lateinit var closeFuture: ChannelFuture private lateinit var address: InetSocketAddress @@ -54,7 +54,7 @@ class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: override fun start(): Http4kServer = apply { if (LOGGER.isInfoEnabled) { - LOGGER.info("Starting webserver with {} threads", THREADS_TO_ALLOCATE) + LOGGER.info("Starting webserver with {} threads", THREADS_TO_ALLOCATE * clientSettings.getThreadsPerCPU()) } val (mainCert, chainCert) = getX509Certs(tls.certificate) From 522c96f70f79e66f9fddc510c5a47b6ab2183fec Mon Sep 17 00:00:00 2001 From: Amos Ng Date: Fri, 12 Jun 2020 03:18:39 +0800 Subject: [PATCH 08/10] Added back mdResponse.close() Accidentally deleted it, whoops! --- src/main/kotlin/mdnet/base/Application.kt | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/kotlin/mdnet/base/Application.kt b/src/main/kotlin/mdnet/base/Application.kt index 4c7e8c9..9afd854 100644 --- a/src/main/kotlin/mdnet/base/Application.kt +++ b/src/main/kotlin/mdnet/base/Application.kt @@ -144,6 +144,7 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting if (LOGGER.isTraceEnabled) { LOGGER.trace("Upstream query for $sanitizedUri errored with status {}", mdResponse.status) } + mdResponse.close() Response(mdResponse.status) } else { if (LOGGER.isTraceEnabled) { From a70381eea4b81db941a9297aeac3bad925d70986 Mon Sep 17 00:00:00 2001 From: carbotaniuman <41451839+carbotaniuman@users.noreply.github.com> Date: Thu, 11 Jun 2020 15:02:07 -0500 Subject: [PATCH 09/10] Fix nits --- src/main/java/mdnet/base/ClientSettings.java | 12 ++++++------ src/main/kotlin/mdnet/base/Netty.kt | 10 ++++++---- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/java/mdnet/base/ClientSettings.java b/src/main/java/mdnet/base/ClientSettings.java index de17707..fe21099 100644 --- a/src/main/java/mdnet/base/ClientSettings.java +++ b/src/main/java/mdnet/base/ClientSettings.java @@ -17,16 +17,16 @@ public final class ClientSettings { @SerializedName("client_secret") private final String clientSecret; @SerializedName("threads_per_cpu") - private final int threadsPerCPU; + private final int threadsPerCpu; public ClientSettings(long maxCacheSizeMib, long maxBandwidthMibPerHour, long maxBurstRateKibPerSecond, - int clientPort, String clientSecret, int threadsPerCPU) { + int clientPort, String clientSecret, int threadsPerCpu) { this.maxCacheSizeMib = maxCacheSizeMib; this.maxBandwidthMibPerHour = maxBandwidthMibPerHour; this.maxBurstRateKibPerSecond = maxBurstRateKibPerSecond; this.clientPort = clientPort; this.clientSecret = Objects.requireNonNull(clientSecret); - this.threadsPerCPU = threadsPerCPU; + this.threadsPerCpu = threadsPerCpu; } public long getMaxCacheSizeMib() { @@ -49,15 +49,15 @@ public final class ClientSettings { return clientSecret; } - public int getThreadsPerCPU() { - return threadsPerCPU; + public int getThreadsPerCpu() { + return threadsPerCpu; } @Override public String toString() { return "ClientSettings{" + "maxCacheSizeMib=" + maxCacheSizeMib + ", maxBandwidthMibPerHour=" + maxBandwidthMibPerHour + ", maxBurstRateKibPerSecond=" + maxBurstRateKibPerSecond + ", clientPort=" - + clientPort + ", clientSecret='" + "" + '\'' + ", threadsPerCPU=" + threadsPerCPU + "}"; + + clientPort + ", clientSecret='" + "" + '\'' + ", threadsPerCPU=" + threadsPerCpu + "}"; } public static boolean isSecretValid(String clientSecret) { diff --git a/src/main/kotlin/mdnet/base/Netty.kt b/src/main/kotlin/mdnet/base/Netty.kt index 36dace2..7efb315 100644 --- a/src/main/kotlin/mdnet/base/Netty.kt +++ b/src/main/kotlin/mdnet/base/Netty.kt @@ -35,12 +35,14 @@ import java.util.concurrent.atomic.AtomicReference import javax.net.ssl.SSLException private val LOGGER = LoggerFactory.getLogger("Application") -private val THREADS_TO_ALLOCATE = Runtime.getRuntime().availableProcessors() class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val stats: AtomicReference) : ServerConfig { + private val threadsToAllocate: Int + get() = Runtime.getRuntime().availableProcessors() * clientSettings.threadsPerCpu + override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer { - private val masterGroup = NioEventLoopGroup(THREADS_TO_ALLOCATE * clientSettings.getThreadsPerCPU()) - private val workerGroup = NioEventLoopGroup(THREADS_TO_ALLOCATE * clientSettings.getThreadsPerCPU()) + private val masterGroup = NioEventLoopGroup(threadsToAllocate) + private val workerGroup = NioEventLoopGroup(threadsToAllocate) private lateinit var closeFuture: ChannelFuture private lateinit var address: InetSocketAddress @@ -54,7 +56,7 @@ class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: override fun start(): Http4kServer = apply { if (LOGGER.isInfoEnabled) { - LOGGER.info("Starting webserver with {} threads", THREADS_TO_ALLOCATE * clientSettings.getThreadsPerCPU()) + LOGGER.info("Starting webserver with {} threads", threadsToAllocate) } val (mainCert, chainCert) = getX509Certs(tls.certificate) From ce1fc9232258f5755631ac4b5a5e2ccba4b36306 Mon Sep 17 00:00:00 2001 From: carbotaniuman <41451839+carbotaniuman@users.noreply.github.com> Date: Thu, 11 Jun 2020 15:13:45 -0500 Subject: [PATCH 10/10] Clean stuff up --- src/main/java/mdnet/base/ClientSettings.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/mdnet/base/ClientSettings.java b/src/main/java/mdnet/base/ClientSettings.java index fe21099..8ce89fc 100644 --- a/src/main/java/mdnet/base/ClientSettings.java +++ b/src/main/java/mdnet/base/ClientSettings.java @@ -57,7 +57,7 @@ public final class ClientSettings { public String toString() { return "ClientSettings{" + "maxCacheSizeMib=" + maxCacheSizeMib + ", maxBandwidthMibPerHour=" + maxBandwidthMibPerHour + ", maxBurstRateKibPerSecond=" + maxBurstRateKibPerSecond + ", clientPort=" - + clientPort + ", clientSecret='" + "" + '\'' + ", threadsPerCPU=" + threadsPerCpu + "}"; + + clientPort + ", clientSecret='" + clientSecret + '\'' + ", threadsPerCpu=" + threadsPerCpu + '}'; } public static boolean isSecretValid(String clientSecret) {