mirror of
https://gitlab.com/mangadex-pub/mangadex_at_home.git
synced 2024-01-19 02:48:37 +00:00
Add HTTP connection pruner
This commit is contained in:
parent
378e606537
commit
994587e616
|
@ -17,7 +17,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
|
||||||
|
|
||||||
### Security
|
### Security
|
||||||
|
|
||||||
## [2.0.0-rc6] - 2021-01-27
|
## [2.0.0-rc6] - 2021-01-28
|
||||||
### Fixed
|
### Fixed
|
||||||
- [2021-01-27] Upped max Apache threadpool size [@carbotaniuman].
|
- [2021-01-27] Upped max Apache threadpool size [@carbotaniuman].
|
||||||
- [2021-01-27] Add ability for nodes to specify external ip [@carbotaniuman].
|
- [2021-01-27] Add ability for nodes to specify external ip [@carbotaniuman].
|
||||||
|
|
|
@ -21,7 +21,7 @@ package mdnet
|
||||||
import java.time.Duration
|
import java.time.Duration
|
||||||
|
|
||||||
object Constants {
|
object Constants {
|
||||||
const val CLIENT_BUILD = 23
|
const val CLIENT_BUILD = 24
|
||||||
|
|
||||||
@JvmField val MAX_AGE_CACHE: Duration = Duration.ofDays(14)
|
@JvmField val MAX_AGE_CACHE: Duration = Duration.ofDays(14)
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,14 @@ import mdnet.logging.warn
|
||||||
import mdnet.metrics.DefaultMicrometerMetrics
|
import mdnet.metrics.DefaultMicrometerMetrics
|
||||||
import mdnet.server.getServer
|
import mdnet.server.getServer
|
||||||
import mdnet.settings.*
|
import mdnet.settings.*
|
||||||
|
import org.apache.hc.client5.http.config.RequestConfig
|
||||||
|
import org.apache.hc.client5.http.cookie.StandardCookieSpec
|
||||||
|
import org.apache.hc.client5.http.impl.classic.HttpClients
|
||||||
|
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder
|
||||||
|
import org.apache.hc.core5.util.TimeValue
|
||||||
|
import org.apache.hc.core5.util.Timeout
|
||||||
|
import org.http4k.client.ApacheClient
|
||||||
|
import org.http4k.core.BodyMode
|
||||||
import org.http4k.server.Http4kServer
|
import org.http4k.server.Http4kServer
|
||||||
import org.slf4j.LoggerFactory
|
import org.slf4j.LoggerFactory
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
|
@ -61,6 +69,27 @@ class ServerManager(
|
||||||
private val executor = Executors.newSingleThreadScheduledExecutor()
|
private val executor = Executors.newSingleThreadScheduledExecutor()
|
||||||
private val registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
|
private val registry = PrometheusMeterRegistry(PrometheusConfig.DEFAULT)
|
||||||
private val statistics = Statistics()
|
private val statistics = Statistics()
|
||||||
|
private val connectionManager = PoolingHttpClientConnectionManagerBuilder.create()
|
||||||
|
.setMaxConnTotal(500)
|
||||||
|
.setMaxConnPerRoute(500)
|
||||||
|
.build()
|
||||||
|
private val apache = ApacheClient(
|
||||||
|
responseBodyMode = BodyMode.Stream,
|
||||||
|
client = HttpClients.custom()
|
||||||
|
.disableConnectionState()
|
||||||
|
.setDefaultRequestConfig(
|
||||||
|
RequestConfig.custom()
|
||||||
|
.setCookieSpec(StandardCookieSpec.IGNORE)
|
||||||
|
.setConnectTimeout(Timeout.ofSeconds(2))
|
||||||
|
.setResponseTimeout(Timeout.ofSeconds(2))
|
||||||
|
.setConnectionRequestTimeout(Timeout.ofSeconds(1))
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
.setConnectionManager(
|
||||||
|
connectionManager
|
||||||
|
)
|
||||||
|
.build()
|
||||||
|
)
|
||||||
|
|
||||||
// state that must only be accessed from the thread on the executor
|
// state that must only be accessed from the thread on the executor
|
||||||
private var state: State
|
private var state: State
|
||||||
|
@ -170,6 +199,20 @@ class ServerManager(
|
||||||
45, 45, TimeUnit.SECONDS
|
45, 45, TimeUnit.SECONDS
|
||||||
)
|
)
|
||||||
|
|
||||||
|
executor.scheduleWithFixedDelay(
|
||||||
|
{
|
||||||
|
try {
|
||||||
|
LOGGER.info { "Closing old Apache HTTP connections" }
|
||||||
|
|
||||||
|
connectionManager.closeExpired()
|
||||||
|
connectionManager.closeIdle(TimeValue.ofSeconds(30))
|
||||||
|
} catch (e: Exception) {
|
||||||
|
LOGGER.warn(e) { "Old Apache HTTP connection closer failed" }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
45, 45, TimeUnit.SECONDS
|
||||||
|
)
|
||||||
|
|
||||||
LOGGER.info { "Image server has started" }
|
LOGGER.info { "Image server has started" }
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -221,7 +264,8 @@ class ServerManager(
|
||||||
settings.serverSettings,
|
settings.serverSettings,
|
||||||
settings.metricsSettings,
|
settings.metricsSettings,
|
||||||
statistics,
|
statistics,
|
||||||
registry
|
registry,
|
||||||
|
apache,
|
||||||
).start()
|
).start()
|
||||||
|
|
||||||
this.state = Running(server, remoteSettings)
|
this.state = Running(server, remoteSettings)
|
||||||
|
|
|
@ -26,6 +26,7 @@ import mdnet.logging.debug
|
||||||
import mdnet.logging.warn
|
import mdnet.logging.warn
|
||||||
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
|
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream
|
||||||
import org.apache.commons.io.IOUtils
|
import org.apache.commons.io.IOUtils
|
||||||
|
import org.http4k.client.ApacheClient
|
||||||
import org.http4k.core.Filter
|
import org.http4k.core.Filter
|
||||||
import org.http4k.core.HttpHandler
|
import org.http4k.core.HttpHandler
|
||||||
import org.http4k.core.Method
|
import org.http4k.core.Method
|
||||||
|
@ -90,8 +91,8 @@ class GeoIpMetricsFilterBuilder(
|
||||||
private val enableGeoIp: Boolean,
|
private val enableGeoIp: Boolean,
|
||||||
private val license: String,
|
private val license: String,
|
||||||
private val registry: PrometheusMeterRegistry,
|
private val registry: PrometheusMeterRegistry,
|
||||||
private val client: HttpHandler
|
|
||||||
) {
|
) {
|
||||||
|
val client = ApacheClient()
|
||||||
fun build(): GeoIpMetricsFilter {
|
fun build(): GeoIpMetricsFilter {
|
||||||
return if (enableGeoIp) {
|
return if (enableGeoIp) {
|
||||||
LOGGER.info("GeoIp initialising")
|
LOGGER.info("GeoIp initialising")
|
||||||
|
|
|
@ -41,7 +41,7 @@ import io.netty.handler.traffic.TrafficCounter
|
||||||
import io.netty.incubator.channel.uring.IOUring
|
import io.netty.incubator.channel.uring.IOUring
|
||||||
import io.netty.incubator.channel.uring.IOUringEventLoopGroup
|
import io.netty.incubator.channel.uring.IOUringEventLoopGroup
|
||||||
import io.netty.incubator.channel.uring.IOUringServerSocketChannel
|
import io.netty.incubator.channel.uring.IOUringServerSocketChannel
|
||||||
import io.netty.util.concurrent.UnorderedThreadPoolEventExecutor
|
import io.netty.util.concurrent.DefaultEventExecutorGroup
|
||||||
import io.netty.util.internal.SystemPropertyUtil
|
import io.netty.util.internal.SystemPropertyUtil
|
||||||
import mdnet.Constants
|
import mdnet.Constants
|
||||||
import mdnet.data.Statistics
|
import mdnet.data.Statistics
|
||||||
|
@ -70,7 +70,7 @@ sealed class NettyTransport(threads: Int) {
|
||||||
abstract val bossGroup: EventLoopGroup
|
abstract val bossGroup: EventLoopGroup
|
||||||
abstract val workerGroup: EventLoopGroup
|
abstract val workerGroup: EventLoopGroup
|
||||||
abstract val factory: ChannelFactory<ServerChannel>
|
abstract val factory: ChannelFactory<ServerChannel>
|
||||||
val executor = UnorderedThreadPoolEventExecutor(
|
val executor = DefaultEventExecutorGroup(
|
||||||
threads.also {
|
threads.also {
|
||||||
require(threads > 0) { "Threads must be greater than zero" }
|
require(threads > 0) { "Threads must be greater than zero" }
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,12 +44,6 @@ import mdnet.security.TweetNaclFast
|
||||||
import mdnet.settings.MetricsSettings
|
import mdnet.settings.MetricsSettings
|
||||||
import mdnet.settings.RemoteSettings
|
import mdnet.settings.RemoteSettings
|
||||||
import mdnet.settings.ServerSettings
|
import mdnet.settings.ServerSettings
|
||||||
import org.apache.hc.client5.http.config.RequestConfig
|
|
||||||
import org.apache.hc.client5.http.cookie.StandardCookieSpec
|
|
||||||
import org.apache.hc.client5.http.impl.classic.HttpClients
|
|
||||||
import org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManagerBuilder
|
|
||||||
import org.apache.hc.core5.util.Timeout
|
|
||||||
import org.http4k.client.ApacheClient
|
|
||||||
import org.http4k.core.*
|
import org.http4k.core.*
|
||||||
import org.http4k.filter.CachingFilters
|
import org.http4k.filter.CachingFilters
|
||||||
import org.http4k.filter.ClientFilters
|
import org.http4k.filter.ClientFilters
|
||||||
|
@ -77,7 +71,7 @@ private val JACKSON: ObjectMapper = jacksonObjectMapper()
|
||||||
|
|
||||||
class ImageServer(
|
class ImageServer(
|
||||||
private val storage: ImageStorage,
|
private val storage: ImageStorage,
|
||||||
private val client: HttpHandler,
|
private val upstream: HttpHandler,
|
||||||
registry: PrometheusMeterRegistry
|
registry: PrometheusMeterRegistry
|
||||||
) {
|
) {
|
||||||
private val executor = Executors.newCachedThreadPool()
|
private val executor = Executors.newCachedThreadPool()
|
||||||
|
@ -135,7 +129,7 @@ class ImageServer(
|
||||||
}
|
}
|
||||||
|
|
||||||
private fun Request.handleCacheMiss(sanitizedUri: String, imageId: String): Response {
|
private fun Request.handleCacheMiss(sanitizedUri: String, imageId: String): Response {
|
||||||
val mdResponse = client(Request(Method.GET, sanitizedUri))
|
val mdResponse = upstream(Request(Method.GET, sanitizedUri))
|
||||||
|
|
||||||
if (mdResponse.status != Status.OK) {
|
if (mdResponse.status != Status.OK) {
|
||||||
LOGGER.warn { "Upstream query for $sanitizedUri errored with status ${mdResponse.status}" }
|
LOGGER.warn { "Upstream query for $sanitizedUri errored with status ${mdResponse.status}" }
|
||||||
|
@ -234,36 +228,16 @@ fun getServer(
|
||||||
metricsSettings: MetricsSettings,
|
metricsSettings: MetricsSettings,
|
||||||
statistics: Statistics,
|
statistics: Statistics,
|
||||||
registry: PrometheusMeterRegistry,
|
registry: PrometheusMeterRegistry,
|
||||||
|
client: HttpHandler
|
||||||
): Http4kServer {
|
): Http4kServer {
|
||||||
val apache = ApacheClient(
|
val upstream =
|
||||||
responseBodyMode = BodyMode.Stream,
|
|
||||||
client = HttpClients.custom()
|
|
||||||
.disableConnectionState()
|
|
||||||
.setDefaultRequestConfig(
|
|
||||||
RequestConfig.custom()
|
|
||||||
.setCookieSpec(StandardCookieSpec.IGNORE)
|
|
||||||
.setConnectTimeout(Timeout.ofSeconds(2))
|
|
||||||
.setResponseTimeout(Timeout.ofSeconds(2))
|
|
||||||
.setConnectionRequestTimeout(Timeout.ofSeconds(1))
|
|
||||||
.build()
|
|
||||||
)
|
|
||||||
.setConnectionManager(
|
|
||||||
PoolingHttpClientConnectionManagerBuilder.create()
|
|
||||||
.setMaxConnTotal(500)
|
|
||||||
.setMaxConnPerRoute(500)
|
|
||||||
.build()
|
|
||||||
)
|
|
||||||
.build()
|
|
||||||
)
|
|
||||||
|
|
||||||
val client =
|
|
||||||
ClientFilters.SetBaseUriFrom(remoteSettings.imageServer)
|
ClientFilters.SetBaseUriFrom(remoteSettings.imageServer)
|
||||||
.then(ClientFilters.MicrometerMetrics.RequestTimer(registry))
|
.then(ClientFilters.MicrometerMetrics.RequestTimer(registry))
|
||||||
.then(apache)
|
.then(client)
|
||||||
|
|
||||||
val imageServer = ImageServer(
|
val imageServer = ImageServer(
|
||||||
storage = storage,
|
storage = storage,
|
||||||
client = client,
|
upstream = upstream,
|
||||||
registry = registry
|
registry = registry
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -311,7 +285,7 @@ fun getServer(
|
||||||
).withFilter(
|
).withFilter(
|
||||||
ServerFilters.MicrometerMetrics.RequestTimer(registry, labeler = PostTransactionLabeler())
|
ServerFilters.MicrometerMetrics.RequestTimer(registry, labeler = PostTransactionLabeler())
|
||||||
).withFilter(
|
).withFilter(
|
||||||
GeoIpMetricsFilterBuilder(metricsSettings.enableGeoip, metricsSettings.geoipLicenseKey, registry, apache).build()
|
GeoIpMetricsFilterBuilder(metricsSettings.enableGeoip, metricsSettings.geoipLicenseKey, registry).build()
|
||||||
)
|
)
|
||||||
)
|
)
|
||||||
.asServer(Netty(remoteSettings.tls!!, serverSettings, statistics))
|
.asServer(Netty(remoteSettings.tls!!, serverSettings, statistics))
|
||||||
|
|
Loading…
Reference in a new issue