mirror of
https://gitlab.com/mangadex-pub/mangadex_at_home.git
synced 2024-01-19 02:48:37 +00:00
268 lines
10 KiB
Kotlin
268 lines
10 KiB
Kotlin
package mdnet.base
|
|
|
|
import com.fasterxml.jackson.core.JsonProcessingException
|
|
import com.fasterxml.jackson.databind.ObjectMapper
|
|
import com.fasterxml.jackson.databind.SerializationFeature
|
|
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
|
|
import com.fasterxml.jackson.module.kotlin.readValue
|
|
import java.time.Instant
|
|
import java.util.Collections
|
|
import java.util.LinkedHashMap
|
|
import java.util.concurrent.CountDownLatch
|
|
import java.util.concurrent.Executors
|
|
import java.util.concurrent.TimeUnit
|
|
import java.util.concurrent.atomic.AtomicBoolean
|
|
import java.util.concurrent.atomic.AtomicReference
|
|
import mdnet.base.data.Statistics
|
|
import mdnet.base.server.getServer
|
|
import mdnet.base.settings.DevSettings
|
|
import mdnet.base.settings.RemoteSettings
|
|
import mdnet.base.settings.ServerSettings
|
|
import mdnet.cache.DiskLruCache
|
|
import org.http4k.server.Http4kServer
|
|
import org.jetbrains.exposed.sql.Database
|
|
import org.slf4j.LoggerFactory
|
|
|
|
sealed class State
|
|
// server is not running
|
|
data class Uninitialized(val serverSettings: ServerSettings, val devSettings: DevSettings) : State()
|
|
// server has shut down
|
|
object Shutdown : State()
|
|
// server is in the process of stopping
|
|
data class GracefulStop(val lastRunning: Running, val counts: Int = 0, val nextState: State = Uninitialized(lastRunning.serverSettings, lastRunning.devSettings), val action: () -> Unit = {}) : State()
|
|
// server is currently running
|
|
data class Running(val server: Http4kServer, val settings: RemoteSettings, val serverSettings: ServerSettings, val devSettings: DevSettings) : State()
|
|
|
|
class ServerManager(serverSettings: ServerSettings, devSettings: DevSettings, maxCacheSizeInMebibytes: Long, private val cache: DiskLruCache, private val database: Database) {
|
|
// this must remain single-threaded because of how the state mechanism works
|
|
private val executor = Executors.newSingleThreadScheduledExecutor()
|
|
|
|
// state that must only be accessed from the thread on the executor
|
|
private var state: State
|
|
private var serverHandler: ServerHandler
|
|
// end protected state
|
|
|
|
val statsMap: MutableMap<Instant, Statistics> = Collections
|
|
.synchronizedMap(object : LinkedHashMap<Instant, Statistics>(240) {
|
|
override fun removeEldestEntry(eldest: Map.Entry<Instant, Statistics>): Boolean {
|
|
return this.size > 240
|
|
}
|
|
})
|
|
val statistics: AtomicReference<Statistics> = AtomicReference(
|
|
Statistics()
|
|
)
|
|
|
|
private val isHandled: AtomicBoolean = AtomicBoolean(false)
|
|
|
|
init {
|
|
state = Uninitialized(serverSettings, devSettings)
|
|
serverHandler = ServerHandler(serverSettings, devSettings, maxCacheSizeInMebibytes)
|
|
|
|
cache.get("statistics")?.use {
|
|
try {
|
|
statistics.set(JACKSON.readValue<Statistics>(it.getInputStream(0)))
|
|
} catch (_: JsonProcessingException) {
|
|
cache.remove("statistics")
|
|
}
|
|
}
|
|
}
|
|
|
|
fun start() {
|
|
LOGGER.info { "Image server starting" }
|
|
loginAndStartServer()
|
|
statsMap[Instant.now()] = statistics.get()
|
|
|
|
executor.scheduleAtFixedRate({
|
|
try {
|
|
if (state is Running || state is GracefulStop || state is Uninitialized) {
|
|
statistics.updateAndGet {
|
|
it.copy(bytesOnDisk = cache.size())
|
|
}
|
|
statsMap[Instant.now()] = statistics.get()
|
|
val editor = cache.edit("statistics")
|
|
if (editor != null) {
|
|
JACKSON.writeValue(editor.newOutputStream(0), statistics.get())
|
|
editor.commit()
|
|
}
|
|
}
|
|
} catch (e: Exception) {
|
|
LOGGER.warn(e) { "Statistics update failed" }
|
|
}
|
|
}, 15, 15, TimeUnit.SECONDS)
|
|
|
|
var lastBytesSent = statistics.get().bytesSent
|
|
executor.scheduleAtFixedRate({
|
|
try {
|
|
lastBytesSent = statistics.get().bytesSent
|
|
|
|
val state = this.state
|
|
if (state is GracefulStop) {
|
|
LOGGER.info { "Aborting graceful shutdown started due to hourly bandwidth limit" }
|
|
|
|
this.state = state.lastRunning
|
|
}
|
|
if (state is Uninitialized) {
|
|
LOGGER.info { "Restarting server stopped due to hourly bandwidth limit" }
|
|
|
|
loginAndStartServer()
|
|
}
|
|
} catch (e: Exception) {
|
|
LOGGER.warn(e) { "Hourly bandwidth check failed" }
|
|
}
|
|
}, 1, 1, TimeUnit.HOURS)
|
|
|
|
executor.scheduleAtFixedRate({
|
|
try {
|
|
val state = this.state
|
|
if (state is GracefulStop) {
|
|
val timesToWait = state.lastRunning.serverSettings.gracefulShutdownWaitSeconds / 15
|
|
when {
|
|
state.counts == 0 -> {
|
|
LOGGER.info { "Starting graceful stop" }
|
|
|
|
logout()
|
|
isHandled.set(false)
|
|
this.state = state.copy(counts = state.counts + 1)
|
|
}
|
|
state.counts == timesToWait || !isHandled.get() -> {
|
|
if (!isHandled.get()) {
|
|
LOGGER.info { "No requests received, stopping" }
|
|
} else {
|
|
LOGGER.info { "Max tries attempted (${state.counts} out of $timesToWait), shutting down" }
|
|
}
|
|
|
|
stopServer(state.nextState)
|
|
state.action()
|
|
}
|
|
else -> {
|
|
LOGGER.info {
|
|
"Waiting another 15 seconds for graceful stop (${state.counts} out of $timesToWait)"
|
|
}
|
|
|
|
isHandled.set(false)
|
|
this.state = state.copy(counts = state.counts + 1)
|
|
}
|
|
}
|
|
}
|
|
} catch (e: Exception) {
|
|
LOGGER.warn(e) { "Main loop failed" }
|
|
}
|
|
}, 15, 15, TimeUnit.SECONDS)
|
|
|
|
executor.scheduleWithFixedDelay({
|
|
try {
|
|
val state = this.state
|
|
if (state is Running) {
|
|
val currentBytesSent = statistics.get().bytesSent - lastBytesSent
|
|
if (state.serverSettings.maxMebibytesPerHour != 0L && state.serverSettings.maxMebibytesPerHour * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) {
|
|
LOGGER.info { "Stopping image server as hourly bandwidth limit reached" }
|
|
|
|
this.state = GracefulStop(lastRunning = state)
|
|
} else {
|
|
pingControl()
|
|
}
|
|
}
|
|
} catch (e: Exception) {
|
|
LOGGER.warn(e) { "Graceful shutdown checker failed" }
|
|
}
|
|
}, 45, 45, TimeUnit.SECONDS)
|
|
|
|
LOGGER.info { "Image server has started" }
|
|
}
|
|
|
|
private fun pingControl() {
|
|
val state = this.state as Running
|
|
|
|
val newSettings = serverHandler.pingControl(state.settings)
|
|
if (newSettings != null) {
|
|
LOGGER.info { "Server settings received: $newSettings" }
|
|
|
|
if (newSettings.latestBuild > Constants.CLIENT_BUILD) {
|
|
LOGGER.warn {
|
|
"Outdated build detected! Latest: ${newSettings.latestBuild}, Current: ${Constants.CLIENT_BUILD}"
|
|
}
|
|
}
|
|
if (newSettings.tls != null || newSettings.imageServer != state.settings.imageServer) {
|
|
// certificates or upstream url must have changed, restart webserver
|
|
LOGGER.info { "Doing internal restart of HTTP server to refresh certs/upstream URL" }
|
|
|
|
this.state = GracefulStop(lastRunning = state) {
|
|
loginAndStartServer()
|
|
}
|
|
}
|
|
} else {
|
|
LOGGER.info { "Server ping failed - ignoring" }
|
|
}
|
|
}
|
|
|
|
private fun loginAndStartServer() {
|
|
val state = this.state as Uninitialized
|
|
|
|
val remoteSettings = serverHandler.loginToControl()
|
|
?: Main.dieWithError("Failed to get a login response from server - check API secret for validity")
|
|
val server = getServer(cache, database, remoteSettings, state.serverSettings, statistics, isHandled).start()
|
|
|
|
if (remoteSettings.latestBuild > Constants.CLIENT_BUILD) {
|
|
LOGGER.warn {
|
|
"Outdated build detected! Latest: ${remoteSettings.latestBuild}, Current: ${Constants.CLIENT_BUILD}"
|
|
}
|
|
}
|
|
|
|
this.state = Running(server, remoteSettings, state.serverSettings, state.devSettings)
|
|
LOGGER.info { "Internal HTTP server was successfully started" }
|
|
}
|
|
|
|
private fun logout() {
|
|
serverHandler.logoutFromControl()
|
|
}
|
|
|
|
private fun stopServer(nextState: State) {
|
|
val state = this.state.let {
|
|
when (it) {
|
|
is Running ->
|
|
it
|
|
is GracefulStop ->
|
|
it.lastRunning
|
|
else ->
|
|
throw AssertionError()
|
|
}
|
|
}
|
|
|
|
LOGGER.info { "Image server stopping" }
|
|
state.server.stop()
|
|
LOGGER.info { "Image server has stopped" }
|
|
|
|
this.state = nextState
|
|
}
|
|
|
|
fun shutdown() {
|
|
LOGGER.info { "Image server shutting down" }
|
|
|
|
val latch = CountDownLatch(1)
|
|
executor.schedule({
|
|
val state = this.state
|
|
if (state is Running) {
|
|
this.state = GracefulStop(state, nextState = Shutdown) {
|
|
latch.countDown()
|
|
}
|
|
} else if (state is GracefulStop) {
|
|
this.state = state.copy(nextState = Shutdown) {
|
|
latch.countDown()
|
|
}
|
|
} else if (state is Uninitialized || state is Shutdown) {
|
|
this.state = Shutdown
|
|
latch.countDown()
|
|
}
|
|
}, 0, TimeUnit.SECONDS)
|
|
latch.await()
|
|
|
|
executor.shutdown()
|
|
LOGGER.info { "Image server has shut down" }
|
|
}
|
|
|
|
companion object {
|
|
private val LOGGER = LoggerFactory.getLogger(ServerManager::class.java)
|
|
private val JACKSON: ObjectMapper = jacksonObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
|
|
}
|
|
}
|