diff --git a/CHANGELOG.md b/CHANGELOG.md index 65a4c1b..bf8c699 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,7 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Security -## [2.0.0-rc6] - 2021-01-27 +## [2.0.0-rc6] - 2021-01-28 ### Fixed - [2021-01-27] Upped max Apache threadpool size [@carbotaniuman]. - [2021-01-27] Add ability for nodes to specify external ip [@carbotaniuman]. diff --git a/src/main/kotlin/mdnet/Constants.kt b/src/main/kotlin/mdnet/Constants.kt index 43015ed..fbb0c21 100644 --- a/src/main/kotlin/mdnet/Constants.kt +++ b/src/main/kotlin/mdnet/Constants.kt @@ -21,7 +21,7 @@ package mdnet import java.time.Duration object Constants { - const val CLIENT_BUILD = 23 + const val CLIENT_BUILD = 24 @JvmField val MAX_AGE_CACHE: Duration = Duration.ofDays(14) diff --git a/src/main/kotlin/mdnet/ServerManager.kt b/src/main/kotlin/mdnet/ServerManager.kt index 0e495a3..2e7d036 100644 --- a/src/main/kotlin/mdnet/ServerManager.kt +++ b/src/main/kotlin/mdnet/ServerManager.kt @@ -28,6 +28,14 @@ import mdnet.logging.warn import mdnet.metrics.DefaultMicrometerMetrics import mdnet.server.getServer import mdnet.settings.* +import org.apache.hc.client5.http.config.RequestConfig +import org.apache.hc.client5.http.cookie.StandardCookieSpec +import org.apache.hc.client5.http.impl.classic.HttpClients +import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder +import org.apache.hc.core5.util.TimeValue +import org.apache.hc.core5.util.Timeout +import org.http4k.client.ApacheClient +import org.http4k.core.BodyMode import org.http4k.server.Http4kServer import org.slf4j.LoggerFactory import java.util.concurrent.CountDownLatch @@ -61,6 +69,27 @@ class ServerManager( private val executor = Executors.newSingleThreadScheduledExecutor() private val registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT) private val statistics = Statistics() + private val connectionManager = PoolingHttpClientConnectionManagerBuilder.create() + .setMaxConnTotal(500) + .setMaxConnPerRoute(500) + .build() + private val apache = ApacheClient( + responseBodyMode = BodyMode.Stream, + client = HttpClients.custom() + .disableConnectionState() + .setDefaultRequestConfig( + RequestConfig.custom() + .setCookieSpec(StandardCookieSpec.IGNORE) + .setConnectTimeout(Timeout.ofSeconds(2)) + .setResponseTimeout(Timeout.ofSeconds(2)) + .setConnectionRequestTimeout(Timeout.ofSeconds(1)) + .build() + ) + .setConnectionManager( + connectionManager + ) + .build() + ) // state that must only be accessed from the thread on the executor private var state: State @@ -170,6 +199,20 @@ class ServerManager( 45, 45, TimeUnit.SECONDS ) + executor.scheduleWithFixedDelay( + { + try { + LOGGER.info { "Closing old Apache HTTP connections" } + + connectionManager.closeExpired() + connectionManager.closeIdle(TimeValue.ofSeconds(30)) + } catch (e: Exception) { + LOGGER.warn(e) { "Old Apache HTTP connection closer failed" } + } + }, + 45, 45, TimeUnit.SECONDS + ) + LOGGER.info { "Image server has started" } } @@ -221,7 +264,8 @@ class ServerManager( settings.serverSettings, settings.metricsSettings, statistics, - registry + registry, + apache, ).start() this.state = Running(server, remoteSettings) diff --git a/src/main/kotlin/mdnet/metrics/GeoIpMetricsFilter.kt b/src/main/kotlin/mdnet/metrics/GeoIpMetricsFilter.kt index a72324c..a69ff2d 100644 --- a/src/main/kotlin/mdnet/metrics/GeoIpMetricsFilter.kt +++ b/src/main/kotlin/mdnet/metrics/GeoIpMetricsFilter.kt @@ -26,6 +26,7 @@ import mdnet.logging.debug import mdnet.logging.warn import org.apache.commons.compress.archivers.tar.TarArchiveInputStream import org.apache.commons.io.IOUtils +import org.http4k.client.ApacheClient import org.http4k.core.Filter import org.http4k.core.HttpHandler import org.http4k.core.Method @@ -90,8 +91,8 @@ class GeoIpMetricsFilterBuilder( private val enableGeoIp: Boolean, private val license: String, private val registry: PrometheusMeterRegistry, - private val client: HttpHandler ) { + val client = ApacheClient() fun build(): GeoIpMetricsFilter { return if (enableGeoIp) { LOGGER.info("GeoIp initialising") diff --git a/src/main/kotlin/mdnet/netty/ApplicationNetty.kt b/src/main/kotlin/mdnet/netty/ApplicationNetty.kt index 9e66c43..1e7f0be 100644 --- a/src/main/kotlin/mdnet/netty/ApplicationNetty.kt +++ b/src/main/kotlin/mdnet/netty/ApplicationNetty.kt @@ -41,7 +41,7 @@ import io.netty.handler.traffic.TrafficCounter import io.netty.incubator.channel.uring.IOUring import io.netty.incubator.channel.uring.IOUringEventLoopGroup import io.netty.incubator.channel.uring.IOUringServerSocketChannel -import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor +import io.netty.util.concurrent.DefaultEventExecutorGroup import io.netty.util.internal.SystemPropertyUtil import mdnet.Constants import mdnet.data.Statistics @@ -70,7 +70,7 @@ sealed class NettyTransport(threads: Int) { abstract val bossGroup: EventLoopGroup abstract val workerGroup: EventLoopGroup abstract val factory: ChannelFactory - val executor = UnorderedThreadPoolEventExecutor( + val executor = DefaultEventExecutorGroup( threads.also { require(threads > 0) { "Threads must be greater than zero" } } diff --git a/src/main/kotlin/mdnet/server/ImageServer.kt b/src/main/kotlin/mdnet/server/ImageServer.kt index d4d95e7..e9d66bb 100644 --- a/src/main/kotlin/mdnet/server/ImageServer.kt +++ b/src/main/kotlin/mdnet/server/ImageServer.kt @@ -44,12 +44,6 @@ import mdnet.security.TweetNaclFast import mdnet.settings.MetricsSettings import mdnet.settings.RemoteSettings import mdnet.settings.ServerSettings -import org.apache.hc.client5.http.config.RequestConfig -import org.apache.hc.client5.http.cookie.StandardCookieSpec -import org.apache.hc.client5.http.impl.classic.HttpClients -import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder -import org.apache.hc.core5.util.Timeout -import org.http4k.client.ApacheClient import org.http4k.core.* import org.http4k.filter.CachingFilters import org.http4k.filter.ClientFilters @@ -77,7 +71,7 @@ private val JACKSON: ObjectMapper = jacksonObjectMapper() class ImageServer( private val storage: ImageStorage, - private val client: HttpHandler, + private val upstream: HttpHandler, registry: PrometheusMeterRegistry ) { private val executor = Executors.newCachedThreadPool() @@ -135,7 +129,7 @@ class ImageServer( } private fun Request.handleCacheMiss(sanitizedUri: String, imageId: String): Response { - val mdResponse = client(Request(Method.GET, sanitizedUri)) + val mdResponse = upstream(Request(Method.GET, sanitizedUri)) if (mdResponse.status != Status.OK) { LOGGER.warn { "Upstream query for $sanitizedUri errored with status ${mdResponse.status}" } @@ -234,36 +228,16 @@ fun getServer( metricsSettings: MetricsSettings, statistics: Statistics, registry: PrometheusMeterRegistry, + client: HttpHandler ): Http4kServer { - val apache = ApacheClient( - responseBodyMode = BodyMode.Stream, - client = HttpClients.custom() - .disableConnectionState() - .setDefaultRequestConfig( - RequestConfig.custom() - .setCookieSpec(StandardCookieSpec.IGNORE) - .setConnectTimeout(Timeout.ofSeconds(2)) - .setResponseTimeout(Timeout.ofSeconds(2)) - .setConnectionRequestTimeout(Timeout.ofSeconds(1)) - .build() - ) - .setConnectionManager( - PoolingHttpClientConnectionManagerBuilder.create() - .setMaxConnTotal(500) - .setMaxConnPerRoute(500) - .build() - ) - .build() - ) - - val client = + val upstream = ClientFilters.SetBaseUriFrom(remoteSettings.imageServer) .then(ClientFilters.MicrometerMetrics.RequestTimer(registry)) - .then(apache) + .then(client) val imageServer = ImageServer( storage = storage, - client = client, + upstream = upstream, registry = registry ) @@ -311,7 +285,7 @@ fun getServer( ).withFilter( ServerFilters.MicrometerMetrics.RequestTimer(registry, labeler = PostTransactionLabeler()) ).withFilter( - GeoIpMetricsFilterBuilder(metricsSettings.enableGeoip, metricsSettings.geoipLicenseKey, registry, apache).build() + GeoIpMetricsFilterBuilder(metricsSettings.enableGeoip, metricsSettings.geoipLicenseKey, registry).build() ) ) .asServer(Netty(remoteSettings.tls!!, serverSettings, statistics))