1
0
Fork 1
mirror of https://gitlab.com/mangadex-pub/mangadex_at_home.git synced 2024-01-19 02:48:37 +00:00

Merge branch 'state-machine' into 'master'

Refactor graceful shutdown

See merge request mangadex/mangadex_at_home!36
This commit is contained in:
Amos Ng 2020-06-22 03:49:17 +00:00
commit 3215b67825
9 changed files with 312 additions and 291 deletions

View file

@ -6,6 +6,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## [Unreleased]
### Added
- [2020-06-20] Added `graceful_shutdown_wait_seconds` client setting
### Changed

View file

@ -1,279 +0,0 @@
package mdnet.base;
import ch.qos.logback.classic.LoggerContext;
import mdnet.base.settings.ClientSettings;
import mdnet.base.server.ApplicationKt;
import mdnet.base.server.WebUiKt;
import mdnet.base.settings.ServerSettings;
import mdnet.cache.DiskLruCache;
import mdnet.cache.HeaderMismatchException;
import org.http4k.server.Http4kServer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.*;
import java.time.Instant;
import java.util.Collections;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import static mdnet.base.Constants.JACKSON;
public class MangaDexClient {
private final static Logger LOGGER = LoggerFactory.getLogger(MangaDexClient.class);
private final ScheduledExecutorService executorService = Executors.newSingleThreadScheduledExecutor();
private final ServerHandler serverHandler;
private final ClientSettings clientSettings;
private final Map<Instant, Statistics> statsMap = Collections
.synchronizedMap(new LinkedHashMap<Instant, Statistics>(240) {
@Override
protected boolean removeEldestEntry(Map.Entry eldest) {
return this.size() > 240;
}
});
private final AtomicReference<Statistics> statistics;
private final AtomicBoolean isHandled;
private ServerSettings serverSettings;
private Http4kServer engine; // if this is null, then the server has shutdown
private Http4kServer webUi;
private DiskLruCache cache;
// these variables are for runLoop();
private int counter = 0;
private long lastBytesSent = 0;
// a non-negative number here means we are shutting down
private int gracefulCounter = -1;
private Runnable gracefulAction;
public MangaDexClient(ClientSettings clientSettings) {
this.clientSettings = clientSettings;
this.serverHandler = new ServerHandler(clientSettings);
this.statistics = new AtomicReference<>();
this.isHandled = new AtomicBoolean();
try {
cache = DiskLruCache.open(new File("cache"), 1, 1,
clientSettings.getMaxCacheSizeInMebibytes() * 1024 * 1024 /* MiB to bytes */);
DiskLruCache.Snapshot snapshot = cache.get("statistics");
if (snapshot != null) {
statistics.set(JACKSON.readValue(snapshot.getInputStream(0), Statistics.class));
snapshot.close();
} else {
statistics.set(new Statistics());
}
} catch (HeaderMismatchException e) {
LOGGER.warn("Cache version may be outdated - remove if necessary");
Main.dieWithError(e);
} catch (IOException e) {
LOGGER.warn("Cache version may be corrupt - remove if necessary");
Main.dieWithError(e);
}
}
public void runLoop() {
loginAndStartServer();
if (serverSettings.getLatestBuild() > Constants.CLIENT_BUILD) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Outdated build detected! Latest: {}, Current: {}", serverSettings.getLatestBuild(),
Constants.CLIENT_BUILD);
}
}
lastBytesSent = statistics.get().getBytesSent();
statsMap.put(Instant.now(), statistics.get());
if (clientSettings.getWebSettings() != null) {
webUi = WebUiKt.getUiServer(clientSettings.getWebSettings(), statistics, statsMap);
webUi.start();
}
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Mangadex@Home Client initialization completed successfully. Starting normal operation.");
}
executorService.scheduleWithFixedDelay(() -> {
try {
// Converting from 15 seconds loop to 45 second loop
if (counter / 3 == 80) {
counter = 0;
lastBytesSent = statistics.get().getBytesSent();
if (engine == null) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Restarting server stopped due to hourly bandwidth limit");
}
loginAndStartServer();
}
} else {
counter++;
}
if (gracefulCounter == 0) {
logout();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Waiting another 15 seconds for graceful shutdown ({} out of {} tries)",
gracefulCounter + 1, 4);
}
gracefulCounter++;
} else if (gracefulCounter > 0) {
if (!isHandled.get() || gracefulCounter == 4 || engine == null) {
if (LOGGER.isInfoEnabled()) {
if (!isHandled.get()) {
LOGGER.info("No requests received, shutting down");
} else {
LOGGER.info("Max tries attempted, shutting down");
}
}
if (engine != null) {
stopServer();
}
if (gracefulAction != null) {
gracefulAction.run();
}
// reset variables
gracefulCounter = -1;
gracefulAction = null;
} else {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Waiting another 15 seconds for graceful shutdown ({} out of {} tries)",
gracefulCounter + 1, 4);
}
gracefulCounter++;
}
isHandled.set(false);
} else {
if (counter % 3 == 0) {
pingControl();
}
updateStats();
}
} catch (Exception e) {
LOGGER.warn("statistics update failed", e);
}
}, 15, 15, TimeUnit.SECONDS);
}
private void pingControl() {
// if the server is offline then don't try and refresh certs
if (engine == null) {
return;
}
long currentBytesSent = statistics.get().getBytesSent() - lastBytesSent;
if (clientSettings.getMaxMebibytesPerHour() != 0
&& clientSettings.getMaxMebibytesPerHour() * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Shutting down server as hourly bandwidth limit reached");
}
// Give enough time for graceful shutdown
if (240 - counter > 3) {
LOGGER.info("Graceful shutdown started");
gracefulCounter = 0;
}
}
ServerSettings n = serverHandler.pingControl(serverSettings);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Server settings received: {}", n);
}
if (n != null) {
if (n.getLatestBuild() > Constants.CLIENT_BUILD) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Outdated build detected! Latest: {}, Current: {}", n.getLatestBuild(),
Constants.CLIENT_BUILD);
}
}
if (n.getTls() != null || !n.getImageServer().equals(serverSettings.getImageServer())) {
// certificates or upstream url must have changed, restart webserver
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Doing internal restart of HTTP server to refresh certs/upstream URL");
}
LOGGER.info("Graceful shutdown started");
gracefulCounter = 0;
gracefulAction = this::loginAndStartServer;
}
}
}
private void updateStats() throws IOException {
statistics.updateAndGet(n -> n.copy(n.getRequestsServed(), n.getCacheHits(), n.getCacheMisses(),
n.getBrowserCached(), n.getBytesSent(), cache.size()));
statsMap.put(Instant.now(), statistics.get());
DiskLruCache.Editor editor = cache.edit("statistics");
if (editor != null) {
JACKSON.writeValue(editor.newOutputStream(0), statistics.get());
editor.commit();
}
}
private void loginAndStartServer() {
serverSettings = serverHandler.loginToControl();
if (serverSettings == null) {
Main.dieWithError("Failed to get a login response from server - check API secret for validity");
}
engine = ApplicationKt.getServer(cache, serverSettings, clientSettings, statistics, isHandled);
engine.start();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Internal HTTP server was successfully started");
}
}
private void logout() {
serverHandler.logoutFromControl();
}
private void stopServer() {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Shutting down HTTP server");
}
engine.stop();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("Internal HTTP server has gracefully shut down");
}
engine = null;
}
public void shutdown() {
LOGGER.info("Graceful shutdown started");
gracefulCounter = 0;
AtomicBoolean readyToExit = new AtomicBoolean(false);
gracefulAction = () -> {
if (webUi != null) {
webUi.close();
}
try {
cache.close();
} catch (IOException e) {
LOGGER.error("Cache failed to close", e);
}
readyToExit.set(true);
};
while (!readyToExit.get()) {
}
executorService.shutdown();
LoggerContext loggerContext = (LoggerContext) LoggerFactory.getILoggerFactory();
loggerContext.stop();
}
}

View file

@ -250,8 +250,7 @@ public final class DiskLruCache implements Closeable {
|| !Integer.toString(valueCount).equals(valueCountString) || !"".equals(blank)) {
throw new HeaderMismatchException(
new String[]{magic, version, appVersionString, valueCountString, blank},
new String[]{MAGIC, VERSION_1, Integer.toString(appVersion), Integer.toString(valueCount), ""}
);
new String[]{MAGIC, VERSION_1, Integer.toString(appVersion), Integer.toString(valueCount), ""});
}
int lineCount = 0;

View file

@ -4,10 +4,9 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import java.time.Duration
object Constants {
const val CLIENT_BUILD = 11
const val CLIENT_BUILD = 12
const val CLIENT_VERSION = "1.0"
const val WEBUI_VERSION = "0.1.1"
val MAX_AGE_CACHE: Duration = Duration.ofDays(14)
@JvmField
val JACKSON = jacksonObjectMapper()
}

View file

@ -30,7 +30,7 @@ object Main {
dieWithError("Expected one argument: path to config file, or nothing")
}
val settings: ClientSettings = try {
val settings = try {
JACKSON.readValue<ClientSettings>(FileReader(file))
} catch (e: UnrecognizedPropertyException) {
dieWithError("'${e.propertyName}' is not a valid setting")
@ -55,7 +55,6 @@ object Main {
client.runLoop()
}
@JvmStatic
fun dieWithError(e: Throwable): Nothing {
if (LOGGER.isErrorEnabled) {
LOGGER.error("Critical Error", e)
@ -64,7 +63,6 @@ object Main {
exitProcess(1)
}
@JvmStatic
fun dieWithError(error: String): Nothing {
if (LOGGER.isErrorEnabled) {
LOGGER.error("Critical Error: {}", error)

View file

@ -0,0 +1,299 @@
/* ktlint-disable no-wildcard-imports */
package mdnet.base
import ch.qos.logback.classic.LoggerContext
import com.fasterxml.jackson.module.kotlin.readValue
import mdnet.base.Constants.JACKSON
import mdnet.base.Main.dieWithError
import mdnet.base.server.getServer
import mdnet.base.server.getUiServer
import mdnet.base.settings.ClientSettings
import mdnet.base.settings.ServerSettings
import mdnet.cache.DiskLruCache
import mdnet.cache.HeaderMismatchException
import org.http4k.server.Http4kServer
import org.slf4j.LoggerFactory
import java.io.File
import java.io.IOException
import java.time.Instant
import java.util.*
import java.util.concurrent.CountDownLatch
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicReference
sealed class State
// server is not running
object Uninitialized : State()
// server has shut down
object Shutdown : State()
// server is in the process of shutting down
data class GracefulShutdown(val lastRunning: Running, val counts: Int = 0, val nextState: State = Uninitialized, val action: () -> Unit = {}) : State()
// server is currently running
data class Running(val server: Http4kServer, val settings: ServerSettings) : State()
class MangaDexClient(private val clientSettings: ClientSettings) {
// this must remain singlethreaded because of how the state mechanism works
private val executorService = Executors.newSingleThreadScheduledExecutor()
// state must only be accessed from the thread on the executorService
private var state: State = Uninitialized
private val serverHandler: ServerHandler = ServerHandler(clientSettings)
private val statsMap: MutableMap<Instant, Statistics> = Collections
.synchronizedMap(object : LinkedHashMap<Instant, Statistics>(240) {
override fun removeEldestEntry(eldest: Map.Entry<Instant, Statistics>): Boolean {
return this.size > 240
}
})
private val statistics: AtomicReference<Statistics> = AtomicReference(Statistics())
private val isHandled: AtomicBoolean = AtomicBoolean(false)
private var webUi: Http4kServer? = null
private val cache: DiskLruCache
init {
try {
cache = DiskLruCache.open(
File("cache"), 1, 1,
clientSettings.maxCacheSizeInMebibytes * 1024 * 1024 /* MiB to bytes */
)
cache.get("statistics")?.use {
statistics.set(JACKSON.readValue<Statistics>(it.getInputStream(0)))
}
} catch (e: HeaderMismatchException) {
LOGGER.warn("Cache version may be outdated - remove if necessary")
dieWithError(e)
} catch (e: IOException) {
LOGGER.warn("Cache version may be corrupt - remove if necessary")
dieWithError(e)
}
}
fun runLoop() {
loginAndStartServer()
statsMap[Instant.now()] = statistics.get()
if (clientSettings.webSettings != null) {
webUi = getUiServer(clientSettings.webSettings, statistics, statsMap)
webUi!!.start()
}
if (LOGGER.isInfoEnabled) {
LOGGER.info("Mangadex@Home Client initialized. Starting normal operation.")
}
executorService.scheduleAtFixedRate({
try {
statistics.updateAndGet {
it.copy(bytesOnDisk = cache.size())
}
statsMap[Instant.now()] = statistics.get()
val editor = cache.edit("statistics")
if (editor != null) {
JACKSON.writeValue(editor.newOutputStream(0), statistics.get())
editor.commit()
}
} catch (e: Exception) {
LOGGER.warn("Statistics update failed", e)
}
}, 15, 15, TimeUnit.SECONDS)
var lastBytesSent = statistics.get().bytesSent
executorService.scheduleAtFixedRate({
try {
lastBytesSent = statistics.get().bytesSent
val state = this.state
if (state is GracefulShutdown) {
if (LOGGER.isInfoEnabled) {
LOGGER.info("Aborting graceful shutdown started due to hourly bandwidth limit")
}
this.state = state.lastRunning
}
if (state is Uninitialized) {
if (LOGGER.isInfoEnabled) {
LOGGER.info("Restarting server stopped due to hourly bandwidth limit")
}
loginAndStartServer()
}
} catch (e: Exception) {
LOGGER.warn("Hourly bandwidth check failed", e)
}
}, 1, 1, TimeUnit.HOURS)
val timesToWait = clientSettings.gracefulShutdownWaitSeconds / 15
executorService.scheduleAtFixedRate({
try {
val state = this.state
if (state is GracefulShutdown) {
when {
state.counts == 0 -> {
if (LOGGER.isInfoEnabled) {
LOGGER.info("Starting graceful shutdown")
}
logout()
isHandled.set(false)
this.state = state.copy(counts = state.counts + 1)
}
state.counts == timesToWait || !isHandled.get() -> {
if (LOGGER.isInfoEnabled) {
if (!isHandled.get()) {
LOGGER.info("No requests received, shutting down")
} else {
LOGGER.info("Max tries attempted (${state.counts} out of $timesToWait), shutting down")
}
}
stopServer(state.nextState)
state.action()
}
else -> {
if (LOGGER.isInfoEnabled) {
LOGGER.info(
"Waiting another 15 seconds for graceful shutdown (${state.counts} out of $timesToWait)"
)
}
isHandled.set(false)
this.state = state.copy(counts = state.counts + 1)
}
}
}
} catch (e: Exception) {
LOGGER.warn("Main loop failed", e)
}
}, 15, 15, TimeUnit.SECONDS)
executorService.scheduleWithFixedDelay({
try {
val state = this.state
if (state is Running) {
val currentBytesSent = statistics.get().bytesSent - lastBytesSent
if (clientSettings.maxMebibytesPerHour != 0L && clientSettings.maxMebibytesPerHour * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) {
if (LOGGER.isInfoEnabled) {
LOGGER.info("Shutting down server as hourly bandwidth limit reached")
}
this.state = GracefulShutdown(lastRunning = state)
}
pingControl()
}
} catch (e: Exception) {
LOGGER.warn("Main loop failed", e)
}
}, 45, 45, TimeUnit.SECONDS)
}
private fun pingControl() {
val state = this.state as Running
val newSettings = serverHandler.pingControl(state.settings)
if (LOGGER.isInfoEnabled) {
LOGGER.info("Server settings received: {}", newSettings)
}
if (newSettings != null) {
if (newSettings.latestBuild > Constants.CLIENT_BUILD) {
if (LOGGER.isWarnEnabled) {
LOGGER.warn(
"Outdated build detected! Latest: {}, Current: {}", newSettings.latestBuild,
Constants.CLIENT_BUILD
)
}
}
if (newSettings.tls != null || newSettings.imageServer != state.settings.imageServer) {
// certificates or upstream url must have changed, restart webserver
if (LOGGER.isInfoEnabled) {
LOGGER.info("Doing internal restart of HTTP server to refresh certs/upstream URL")
}
this.state = GracefulShutdown(lastRunning = state) {
loginAndStartServer()
}
}
}
}
private fun loginAndStartServer() {
this.state as Uninitialized
val serverSettings = serverHandler.loginToControl()
?: dieWithError("Failed to get a login response from server - check API secret for validity")
val server = getServer(cache, serverSettings, clientSettings, statistics, isHandled).start()
if (serverSettings.latestBuild > Constants.CLIENT_BUILD) {
if (LOGGER.isWarnEnabled) {
LOGGER.warn(
"Outdated build detected! Latest: {}, Current: {}", serverSettings.latestBuild,
Constants.CLIENT_BUILD
)
}
}
state = Running(server, serverSettings)
if (LOGGER.isInfoEnabled) {
LOGGER.info("Internal HTTP server was successfully started")
}
}
private fun logout() {
serverHandler.logoutFromControl()
}
private fun stopServer(nextState: State = Uninitialized) {
val state = this.state.let {
when (it) {
is Running ->
it
is GracefulShutdown ->
it.lastRunning
else ->
throw AssertionError()
}
}
if (LOGGER.isInfoEnabled) {
LOGGER.info("Shutting down HTTP server")
}
state.server.stop()
if (LOGGER.isInfoEnabled) {
LOGGER.info("Internal HTTP server has shut down")
}
this.state = nextState
}
fun shutdown() {
LOGGER.info("Mangadex@Home Client stopping")
val latch = CountDownLatch(1)
executorService.schedule({
val state = this.state
if (state is Running) {
this.state = GracefulShutdown(state, nextState = Shutdown) {
latch.countDown()
}
latch.await()
} else if (state is GracefulShutdown) {
this.state = state.copy(nextState = Shutdown) {
latch.countDown()
}
} else if (state is Uninitialized || state is Shutdown) {
this.state = Shutdown
latch.countDown()
}
}, 0, TimeUnit.SECONDS)
latch.await()
webUi?.close()
try {
cache.close()
} catch (e: IOException) {
LOGGER.error("Cache failed to close", e)
}
executorService.shutdown()
LOGGER.info("Mangadex@Home Client stopped")
(LoggerFactory.getILoggerFactory() as LoggerContext).stop()
}
companion object {
private val LOGGER = LoggerFactory.getLogger(MangaDexClient::class.java)
}
}

View file

@ -31,8 +31,7 @@ import javax.crypto.CipherInputStream
import javax.crypto.CipherOutputStream
import javax.crypto.spec.SecretKeySpec
private const val THREADS_TO_ALLOCATE = 262144 // 2**18 // Honestly, no reason to not just let 'er rip. Inactive connections will expire on their own :D
private val LOGGER = LoggerFactory.getLogger(ImageServer::class.java)
private const val THREADS_TO_ALLOCATE = 262144 // 2**18
class ImageServer(private val cache: DiskLruCache, private val statistics: AtomicReference<Statistics>, private val upstreamUrl: String, private val database: Database, private val handled: AtomicBoolean) {
init {
@ -252,6 +251,10 @@ class ImageServer(private val cache: DiskLruCache, private val statistics: Atomi
}
}
.header("X-Cache", if (cached) "HIT" else "MISS")
companion object {
private val LOGGER = LoggerFactory.getLogger(ImageServer::class.java)
}
}
private fun getRc4(key: ByteArray): Cipher {

View file

@ -13,6 +13,7 @@ data class ClientSettings(
val clientPort: Int = 443,
@field:Secret val clientSecret: String = "PASTE-YOUR-SECRET-HERE",
val threads: Int = 4,
val gracefulShutdownWaitSeconds: Int = 60,
val webSettings: WebSettings? = null
)