From 6efff4ce404317f9dd7c51e2f197cab3cb30f0a8 Mon Sep 17 00:00:00 2001 From: carbotaniuman <41451839+carbotaniuman@users.noreply.github.com> Date: Sun, 21 Jun 2020 14:49:10 -0500 Subject: [PATCH] Refactor graceful shutdown --- CHANGELOG.md | 1 + src/main/java/mdnet/base/MangaDexClient.java | 279 ----------------- src/main/java/mdnet/cache/DiskLruCache.java | 3 +- .../mdnet/cache/HeaderMismatchException.java | 6 +- src/main/kotlin/mdnet/base/Constants.kt | 3 +- src/main/kotlin/mdnet/base/Main.kt | 4 +- src/main/kotlin/mdnet/base/MangaDexClient.kt | 287 ++++++++++++++++++ .../kotlin/mdnet/base/server/ImageServer.kt | 7 +- .../mdnet/base/settings/ClientSettings.kt | 1 + 9 files changed, 300 insertions(+), 291 deletions(-) delete mode 100644 src/main/java/mdnet/base/MangaDexClient.java create mode 100644 src/main/kotlin/mdnet/base/MangaDexClient.kt diff --git a/CHANGELOG.md b/CHANGELOG.md index a258762..2b7439f 100755 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ## [Unreleased] ### Added +- [2020-06-20] Added `graceful_shutdown_wait_seconds` client setting ### Changed diff --git a/src/main/java/mdnet/base/MangaDexClient.java b/src/main/java/mdnet/base/MangaDexClient.java deleted file mode 100644 index 4c390f2..0000000 --- a/src/main/java/mdnet/base/MangaDexClient.java +++ /dev/null @@ -1,279 +0,0 @@ -package mdnet.base; - -import ch.qos.logback.classic.LoggerContext; -import mdnet.base.settings.ClientSettings; -import mdnet.base.server.ApplicationKt; -import mdnet.base.server.WebUiKt; -import mdnet.base.settings.ServerSettings; -import mdnet.cache.DiskLruCache; -import mdnet.cache.HeaderMismatchException; -import org.http4k.server.Http4kServer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.*; -import java.time.Instant; -import java.util.Collections; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; - -import static mdnet.base.Constants.JACKSON; - -public class MangaDexClient { - private final static Logger LOGGER = LoggerFactory.getLogger(MangaDexClient.class); - - private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor(); - private final ServerHandler serverHandler; - private final ClientSettings clientSettings; - - private final Map statsMap = Collections - .synchronizedMap(new LinkedHashMap(240) { - @Override - protected boolean removeEldestEntry(Map.Entry eldest) { - return this.size() > 240; - } - }); - private final AtomicReference statistics; - private final AtomicBoolean isHandled; - - private ServerSettings serverSettings; - private Http4kServer engine; // if this is null, then the server has shutdown - private Http4kServer webUi; - private DiskLruCache cache; - - // these variables are for runLoop(); - private int counter = 0; - private long lastBytesSent = 0; - // a non-negative number here means we are shutting down - private int gracefulCounter = -1; - private Runnable gracefulAction; - - public MangaDexClient(ClientSettings clientSettings) { - this.clientSettings = clientSettings; - this.serverHandler = new ServerHandler(clientSettings); - this.statistics = new AtomicReference<>(); - this.isHandled = new AtomicBoolean(); - - try { - cache = DiskLruCache.open(new File("cache"), 1, 1, - clientSettings.getMaxCacheSizeInMebibytes() * 1024 * 1024 /* MiB to bytes */); - - DiskLruCache.Snapshot snapshot = cache.get("statistics"); - if (snapshot != null) { - statistics.set(JACKSON.readValue(snapshot.getInputStream(0), Statistics.class)); - snapshot.close(); - } else { - statistics.set(new Statistics()); - } - } catch (HeaderMismatchException e) { - LOGGER.warn("Cache version may be outdated - remove if necessary"); - Main.dieWithError(e); - } catch (IOException e) { - LOGGER.warn("Cache version may be corrupt - remove if necessary"); - Main.dieWithError(e); - } - } - - public void runLoop() { - loginAndStartServer(); - if (serverSettings.getLatestBuild() > Constants.CLIENT_BUILD) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Outdated build detected! Latest: {}, Current: {}", serverSettings.getLatestBuild(), - Constants.CLIENT_BUILD); - } - } - - lastBytesSent = statistics.get().getBytesSent(); - statsMap.put(Instant.now(), statistics.get()); - - if (clientSettings.getWebSettings() != null) { - webUi = WebUiKt.getUiServer(clientSettings.getWebSettings(), statistics, statsMap); - webUi.start(); - } - - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Mangadex@Home Client initialization completed successfully. Starting normal operation."); - } - - executorService.scheduleWithFixedDelay(() -> { - try { - // Converting from 15 seconds loop to 45 second loop - if (counter / 3 == 80) { - counter = 0; - lastBytesSent = statistics.get().getBytesSent(); - - if (engine == null) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Restarting server stopped due to hourly bandwidth limit"); - } - - loginAndStartServer(); - } - } else { - counter++; - } - - if (gracefulCounter == 0) { - logout(); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Waiting another 15 seconds for graceful shutdown ({} out of {} tries)", - gracefulCounter + 1, 4); - } - gracefulCounter++; - } else if (gracefulCounter > 0) { - if (!isHandled.get() || gracefulCounter == 4 || engine == null) { - if (LOGGER.isInfoEnabled()) { - if (!isHandled.get()) { - LOGGER.info("No requests received, shutting down"); - } else { - LOGGER.info("Max tries attempted, shutting down"); - } - } - - if (engine != null) { - stopServer(); - } - if (gracefulAction != null) { - gracefulAction.run(); - } - - // reset variables - gracefulCounter = -1; - gracefulAction = null; - } else { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Waiting another 15 seconds for graceful shutdown ({} out of {} tries)", - gracefulCounter + 1, 4); - } - gracefulCounter++; - } - isHandled.set(false); - } else { - if (counter % 3 == 0) { - pingControl(); - } - updateStats(); - } - - } catch (Exception e) { - LOGGER.warn("statistics update failed", e); - } - - }, 15, 15, TimeUnit.SECONDS); - } - - private void pingControl() { - // if the server is offline then don't try and refresh certs - if (engine == null) { - return; - } - - long currentBytesSent = statistics.get().getBytesSent() - lastBytesSent; - if (clientSettings.getMaxMebibytesPerHour() != 0 - && clientSettings.getMaxMebibytesPerHour() * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Shutting down server as hourly bandwidth limit reached"); - } - - // Give enough time for graceful shutdown - if (240 - counter > 3) { - LOGGER.info("Graceful shutdown started"); - gracefulCounter = 0; - } - } - - ServerSettings n = serverHandler.pingControl(serverSettings); - - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Server settings received: {}", n); - } - - if (n != null) { - if (n.getLatestBuild() > Constants.CLIENT_BUILD) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Outdated build detected! Latest: {}, Current: {}", n.getLatestBuild(), - Constants.CLIENT_BUILD); - } - } - - if (n.getTls() != null || !n.getImageServer().equals(serverSettings.getImageServer())) { - // certificates or upstream url must have changed, restart webserver - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Doing internal restart of HTTP server to refresh certs/upstream URL"); - } - - LOGGER.info("Graceful shutdown started"); - gracefulCounter = 0; - gracefulAction = this::loginAndStartServer; - } - } - } - - private void updateStats() throws IOException { - statistics.updateAndGet(n -> n.copy(n.getRequestsServed(), n.getCacheHits(), n.getCacheMisses(), - n.getBrowserCached(), n.getBytesSent(), cache.size())); - - statsMap.put(Instant.now(), statistics.get()); - - DiskLruCache.Editor editor = cache.edit("statistics"); - if (editor != null) { - JACKSON.writeValue(editor.newOutputStream(0), statistics.get()); - editor.commit(); - } - } - - private void loginAndStartServer() { - serverSettings = serverHandler.loginToControl(); - if (serverSettings == null) { - Main.dieWithError("Failed to get a login response from server - check API secret for validity"); - } - engine = ApplicationKt.getServer(cache, serverSettings, clientSettings, statistics, isHandled); - engine.start(); - - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Internal HTTP server was successfully started"); - } - } - - private void logout() { - serverHandler.logoutFromControl(); - } - - private void stopServer() { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Shutting down HTTP server"); - } - engine.stop(); - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Internal HTTP server has gracefully shut down"); - } - engine = null; - } - - public void shutdown() { - LOGGER.info("Graceful shutdown started"); - gracefulCounter = 0; - AtomicBoolean readyToExit = new AtomicBoolean(false); - gracefulAction = () -> { - if (webUi != null) { - webUi.close(); - } - try { - cache.close(); - } catch (IOException e) { - LOGGER.error("Cache failed to close", e); - } - readyToExit.set(true); - }; - while (!readyToExit.get()) { - } - executorService.shutdown(); - LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory(); - loggerContext.stop(); - } -} diff --git a/src/main/java/mdnet/cache/DiskLruCache.java b/src/main/java/mdnet/cache/DiskLruCache.java index 4cd4625..0fcd052 100644 --- a/src/main/java/mdnet/cache/DiskLruCache.java +++ b/src/main/java/mdnet/cache/DiskLruCache.java @@ -250,8 +250,7 @@ public final class DiskLruCache implements Closeable { || !Integer.toString(valueCount).equals(valueCountString) || !"".equals(blank)) { throw new HeaderMismatchException( new String[]{magic, version, appVersionString, valueCountString, blank}, - new String[]{MAGIC, VERSION_1, Integer.toString(appVersion), Integer.toString(valueCount), ""} - ); + new String[]{MAGIC, VERSION_1, Integer.toString(appVersion), Integer.toString(valueCount), ""}); } int lineCount = 0; diff --git a/src/main/java/mdnet/cache/HeaderMismatchException.java b/src/main/java/mdnet/cache/HeaderMismatchException.java index 754e569..663cc3b 100644 --- a/src/main/java/mdnet/cache/HeaderMismatchException.java +++ b/src/main/java/mdnet/cache/HeaderMismatchException.java @@ -4,7 +4,7 @@ import java.io.IOException; import java.util.Arrays; public class HeaderMismatchException extends IOException { - public HeaderMismatchException(String[] actual, String[] expected) { - super("expected header " + Arrays.toString(expected) + ", found " + Arrays.toString(actual)); - } + public HeaderMismatchException(String[] actual, String[] expected) { + super("expected header " + Arrays.toString(expected) + ", found " + Arrays.toString(actual)); + } } diff --git a/src/main/kotlin/mdnet/base/Constants.kt b/src/main/kotlin/mdnet/base/Constants.kt index e4d8187..c1076ef 100644 --- a/src/main/kotlin/mdnet/base/Constants.kt +++ b/src/main/kotlin/mdnet/base/Constants.kt @@ -4,10 +4,9 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper import java.time.Duration object Constants { - const val CLIENT_BUILD = 11 + const val CLIENT_BUILD = 12 const val CLIENT_VERSION = "1.0" const val WEBUI_VERSION = "0.1.1" val MAX_AGE_CACHE: Duration = Duration.ofDays(14) - @JvmField val JACKSON = jacksonObjectMapper() } diff --git a/src/main/kotlin/mdnet/base/Main.kt b/src/main/kotlin/mdnet/base/Main.kt index 8aa2fae..85cdee0 100644 --- a/src/main/kotlin/mdnet/base/Main.kt +++ b/src/main/kotlin/mdnet/base/Main.kt @@ -30,7 +30,7 @@ object Main { dieWithError("Expected one argument: path to config file, or nothing") } - val settings: ClientSettings = try { + val settings = try { JACKSON.readValue(FileReader(file)) } catch (e: UnrecognizedPropertyException) { dieWithError("'${e.propertyName}' is not a valid setting") @@ -55,7 +55,6 @@ object Main { client.runLoop() } - @JvmStatic fun dieWithError(e: Throwable): Nothing { if (LOGGER.isErrorEnabled) { LOGGER.error("Critical Error", e) @@ -64,7 +63,6 @@ object Main { exitProcess(1) } - @JvmStatic fun dieWithError(error: String): Nothing { if (LOGGER.isErrorEnabled) { LOGGER.error("Critical Error: {}", error) diff --git a/src/main/kotlin/mdnet/base/MangaDexClient.kt b/src/main/kotlin/mdnet/base/MangaDexClient.kt new file mode 100644 index 0000000..e476058 --- /dev/null +++ b/src/main/kotlin/mdnet/base/MangaDexClient.kt @@ -0,0 +1,287 @@ +/* ktlint-disable no-wildcard-imports */ +package mdnet.base + +import ch.qos.logback.classic.LoggerContext +import com.fasterxml.jackson.module.kotlin.readValue +import mdnet.base.Constants.JACKSON +import mdnet.base.Main.dieWithError +import mdnet.base.server.getServer +import mdnet.base.server.getUiServer +import mdnet.base.settings.ClientSettings +import mdnet.base.settings.ServerSettings +import mdnet.cache.DiskLruCache +import mdnet.cache.HeaderMismatchException +import org.http4k.server.Http4kServer +import org.slf4j.LoggerFactory +import java.io.File +import java.io.IOException +import java.time.Instant +import java.util.* +import java.util.concurrent.CountDownLatch +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit +import java.util.concurrent.atomic.AtomicBoolean +import java.util.concurrent.atomic.AtomicReference + +sealed class State +// server is not running +object Uninitialized : State() +// server has shut down +object Shutdown : State() +// server is in the process of shutting down +data class GracefulShutdown(val lastRunning: Running, val counts: Int = 0, val nextState: State = Uninitialized, val action: () -> Unit = {}) : State() +// server is currently running +data class Running(val server: Http4kServer, val settings: ServerSettings) : State() + +class MangaDexClient(private val clientSettings: ClientSettings) { + // this must remain singlethreaded because of how the state mechanism works + private val executorService = Executors.newSingleThreadScheduledExecutor() + private var state: State = Uninitialized + + private val serverHandler: ServerHandler = ServerHandler(clientSettings) + private val statsMap: MutableMap = Collections + .synchronizedMap(object : LinkedHashMap(240) { + override fun removeEldestEntry(eldest: Map.Entry): Boolean { + return this.size > 240 + } + }) + private val statistics: AtomicReference = AtomicReference(Statistics()) + private val isHandled: AtomicBoolean = AtomicBoolean(false) + private var webUi: Http4kServer? = null + private val cache: DiskLruCache + + init { + try { + cache = DiskLruCache.open( + File("cache"), 1, 1, + clientSettings.maxCacheSizeInMebibytes * 1024 * 1024 /* MiB to bytes */ + ) + cache.get("statistics")?.use { + statistics.set(JACKSON.readValue(it.getInputStream(0))) + } + } catch (e: HeaderMismatchException) { + LOGGER.warn("Cache version may be outdated - remove if necessary") + dieWithError(e) + } catch (e: IOException) { + LOGGER.warn("Cache version may be corrupt - remove if necessary") + dieWithError(e) + } + } + + fun runLoop() { + loginAndStartServer() + statsMap[Instant.now()] = statistics.get() + + if (clientSettings.webSettings != null) { + webUi = getUiServer(clientSettings.webSettings, statistics, statsMap) + webUi!!.start() + } + if (LOGGER.isInfoEnabled) { + LOGGER.info("Mangadex@Home Client initialized. Starting normal operation.") + } + + executorService.scheduleAtFixedRate({ + try { + statistics.updateAndGet { + it.copy(bytesOnDisk = cache.size()) + } + statsMap[Instant.now()] = statistics.get() + val editor = cache.edit("statistics") + if (editor != null) { + JACKSON.writeValue(editor.newOutputStream(0), statistics.get()) + editor.commit() + } + } catch (e: Exception) { + LOGGER.warn("Statistics update failed", e) + } + }, 15, 15, TimeUnit.SECONDS) + + var lastBytesSent = statistics.get().bytesSent + executorService.scheduleAtFixedRate({ + try { + lastBytesSent = statistics.get().bytesSent + + val state = this.state + if (state is GracefulShutdown) { + if (LOGGER.isInfoEnabled) { + LOGGER.info("Aborting graceful shutdown started due to hourly bandwidth limit") + } + this.state = state.lastRunning + } + if (state is Uninitialized) { + if (LOGGER.isInfoEnabled) { + LOGGER.info("Restarting server stopped due to hourly bandwidth limit") + } + loginAndStartServer() + } + } catch (e: Exception) { + LOGGER.warn("Hourly bandwidth check failed", e) + } + }, 1, 1, TimeUnit.HOURS) + + val timesToWait = clientSettings.gracefulShutdownWaitSeconds / 15 + executorService.scheduleAtFixedRate({ + try { + val state = this.state + if (state is GracefulShutdown) { + when { + state.counts == 0 -> { + if (LOGGER.isInfoEnabled) { + LOGGER.info("Starting graceful shutdown") + } + logout() + isHandled.set(false) + this.state = state.copy(counts = state.counts + 1) + } + state.counts == timesToWait || !isHandled.get() -> { + if (LOGGER.isInfoEnabled) { + if (!isHandled.get()) { + LOGGER.info("No requests received, shutting down") + } else { + LOGGER.info("Max tries attempted (${state.counts} out of $timesToWait), shutting down") + } + } + + stopServer(state.nextState) + state.action() + } + else -> { + if (LOGGER.isInfoEnabled) { + LOGGER.info( + "Waiting another 15 seconds for graceful shutdown (${state.counts} out of $timesToWait)" + ) + } + isHandled.set(false) + this.state = state.copy(counts = state.counts + 1) + } + } + } + } catch (e: Exception) { + LOGGER.warn("Main loop failed", e) + } + }, 15, 15, TimeUnit.SECONDS) + + executorService.scheduleWithFixedDelay({ + try { + val state = this.state + if (state is Running) { + val currentBytesSent = statistics.get().bytesSent - lastBytesSent + if (clientSettings.maxMebibytesPerHour != 0L && clientSettings.maxMebibytesPerHour * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) { + if (LOGGER.isInfoEnabled) { + LOGGER.info("Shutting down server as hourly bandwidth limit reached") + } + this.state = GracefulShutdown(lastRunning = state) + } + + pingControl() + } + } catch (e: Exception) { + LOGGER.warn("Main loop failed", e) + } + }, 45, 45, TimeUnit.SECONDS) + } + + private fun pingControl() { + val state = this.state as Running + + val newSettings = serverHandler.pingControl(state.settings) + if (LOGGER.isInfoEnabled) { + LOGGER.info("Server settings received: {}", newSettings) + } + if (newSettings != null) { + if (newSettings.latestBuild > Constants.CLIENT_BUILD) { + if (LOGGER.isWarnEnabled) { + LOGGER.warn( + "Outdated build detected! Latest: {}, Current: {}", newSettings.latestBuild, + Constants.CLIENT_BUILD + ) + } + } + if (newSettings.tls != null || newSettings.imageServer != state.settings.imageServer) { + // certificates or upstream url must have changed, restart webserver + if (LOGGER.isInfoEnabled) { + LOGGER.info("Doing internal restart of HTTP server to refresh certs/upstream URL") + } + this.state = GracefulShutdown(lastRunning = state) { + loginAndStartServer() + } + } + } + } + + private fun loginAndStartServer() { + this.state as Uninitialized + + val serverSettings = serverHandler.loginToControl() + ?: dieWithError("Failed to get a login response from server - check API secret for validity") + val server = getServer(cache, serverSettings, clientSettings, statistics, isHandled).start() + + if (serverSettings.latestBuild > Constants.CLIENT_BUILD) { + if (LOGGER.isWarnEnabled) { + LOGGER.warn( + "Outdated build detected! Latest: {}, Current: {}", serverSettings.latestBuild, + Constants.CLIENT_BUILD + ) + } + } + + state = Running(server, serverSettings) + if (LOGGER.isInfoEnabled) { + LOGGER.info("Internal HTTP server was successfully started") + } + } + + private fun logout() { + serverHandler.logoutFromControl() + } + + private fun stopServer(nextState: State = Uninitialized) { + val state = this.state.let { + when (it) { + is Running -> + it + is GracefulShutdown -> + it.lastRunning + else -> + throw AssertionError() + } + } + + if (LOGGER.isInfoEnabled) { + LOGGER.info("Shutting down HTTP server") + } + state.server.stop() + if (LOGGER.isInfoEnabled) { + LOGGER.info("Internal HTTP server has shut down") + } + this.state = nextState + } + + fun shutdown() { + LOGGER.info("Mangadex@Home Client stopping") + + val state = this.state + if (state is Running) { + val latch = CountDownLatch(1) + + this.state = GracefulShutdown(state, nextState = Shutdown) { + webUi?.close() + try { + cache.close() + } catch (e: IOException) { + LOGGER.error("Cache failed to close", e) + } + latch.countDown() + } + latch.await() + } + executorService.shutdown() + LOGGER.info("Mangadex@Home Client stopped") + + (LoggerFactory.getILoggerFactory() as LoggerContext).stop() + } + + companion object { + private val LOGGER = LoggerFactory.getLogger(MangaDexClient::class.java) + } +} diff --git a/src/main/kotlin/mdnet/base/server/ImageServer.kt b/src/main/kotlin/mdnet/base/server/ImageServer.kt index aeeb661..b425870 100644 --- a/src/main/kotlin/mdnet/base/server/ImageServer.kt +++ b/src/main/kotlin/mdnet/base/server/ImageServer.kt @@ -31,8 +31,7 @@ import javax.crypto.CipherInputStream import javax.crypto.CipherOutputStream import javax.crypto.spec.SecretKeySpec -private const val THREADS_TO_ALLOCATE = 262144 // 2**18 // Honestly, no reason to not just let 'er rip. Inactive connections will expire on their own :D -private val LOGGER = LoggerFactory.getLogger(ImageServer::class.java) +private const val THREADS_TO_ALLOCATE = 262144 // 2**18 class ImageServer(private val cache: DiskLruCache, private val statistics: AtomicReference, private val upstreamUrl: String, private val database: Database, private val handled: AtomicBoolean) { init { @@ -252,6 +251,10 @@ class ImageServer(private val cache: DiskLruCache, private val statistics: Atomi } } .header("X-Cache", if (cached) "HIT" else "MISS") + + companion object { + private val LOGGER = LoggerFactory.getLogger(ImageServer::class.java) + } } private fun getRc4(key: ByteArray): Cipher { diff --git a/src/main/kotlin/mdnet/base/settings/ClientSettings.kt b/src/main/kotlin/mdnet/base/settings/ClientSettings.kt index 469a3cb..1c1ece1 100644 --- a/src/main/kotlin/mdnet/base/settings/ClientSettings.kt +++ b/src/main/kotlin/mdnet/base/settings/ClientSettings.kt @@ -13,6 +13,7 @@ data class ClientSettings( val clientPort: Int = 443, @field:Secret val clientSecret: String = "PASTE-YOUR-SECRET-HERE", val threads: Int = 4, + val gracefulShutdownWaitSeconds: Int = 60, val webSettings: WebSettings? = null )