Massive refactor - separate settings reload/webui and the image server
This commit is contained in:
parent
bf4192d584
commit
398ab05788
|
@ -56,7 +56,10 @@ object Main {
|
|||
}
|
||||
|
||||
val client = MangaDexClient(file)
|
||||
Runtime.getRuntime().addShutdownHook(Thread { client.shutdown() })
|
||||
Runtime.getRuntime().addShutdownHook(Thread {
|
||||
client.shutdown()
|
||||
(LoggerFactory.getILoggerFactory() as LoggerContext).stop()
|
||||
})
|
||||
client.runLoop()
|
||||
}
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ along with this MangaDex@Home. If not, see <http://www.gnu.org/licenses/>.
|
|||
/* ktlint-disable no-wildcard-imports */
|
||||
package mdnet.base
|
||||
|
||||
import ch.qos.logback.classic.LoggerContext
|
||||
import com.fasterxml.jackson.core.JsonParser
|
||||
import com.fasterxml.jackson.core.JsonProcessingException
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.databind.exc.UnrecognizedPropertyException
|
||||
|
@ -27,349 +27,106 @@ import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
|
|||
import com.fasterxml.jackson.module.kotlin.readValue
|
||||
import java.io.File
|
||||
import java.io.FileReader
|
||||
import java.io.FileWriter
|
||||
import java.io.IOException
|
||||
import java.time.Instant
|
||||
import java.util.*
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.lang.AssertionError
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import java.util.regex.Pattern
|
||||
import mdnet.base.Main.dieWithError
|
||||
import mdnet.base.data.Statistics
|
||||
import mdnet.base.server.getServer
|
||||
import mdnet.base.server.getUiServer
|
||||
import mdnet.base.settings.ClientSettings
|
||||
import mdnet.base.settings.ServerSettings
|
||||
import mdnet.base.settings.*
|
||||
import mdnet.cache.DiskLruCache
|
||||
import mdnet.cache.HeaderMismatchException
|
||||
import org.http4k.server.Http4kServer
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
private const val CLIENT_KEY_LENGTH = 52
|
||||
|
||||
// Exception class to handle when Client Settings have invalid values
|
||||
class ClientSettingsException(message: String) : Exception(message)
|
||||
|
||||
sealed class State
|
||||
// server is not running
|
||||
data class Uninitialized(val clientSettings: ClientSettings) : State()
|
||||
// server has shut down
|
||||
object Shutdown : State()
|
||||
// server is in the process of shutting down
|
||||
data class GracefulShutdown(val lastRunning: Running, val counts: Int = 0, val nextState: State = Uninitialized(lastRunning.clientSettings), val action: () -> Unit = {}) : State()
|
||||
// server is currently running
|
||||
data class Running(val server: Http4kServer, val settings: ServerSettings, val clientSettings: ClientSettings) : State()
|
||||
|
||||
class MangaDexClient(private val clientSettingsFile: String) {
|
||||
// this must remain singlethreaded because of how the state mechanism works
|
||||
private val executorService = Executors.newSingleThreadScheduledExecutor()
|
||||
// state must only be accessed from the thread on the executorService
|
||||
private var state: State
|
||||
|
||||
private var serverHandler: ServerHandler
|
||||
private val statsMap: MutableMap<Instant, Statistics> = Collections
|
||||
.synchronizedMap(object : LinkedHashMap<Instant, Statistics>(240) {
|
||||
override fun removeEldestEntry(eldest: Map.Entry<Instant, Statistics>): Boolean {
|
||||
return this.size > 240
|
||||
}
|
||||
})
|
||||
private val statistics: AtomicReference<Statistics> = AtomicReference(
|
||||
Statistics()
|
||||
)
|
||||
private val isHandled: AtomicBoolean = AtomicBoolean(false)
|
||||
private var webUi: Http4kServer? = null
|
||||
// just for scheduling one task, so single-threaded
|
||||
private val executor = Executors.newSingleThreadScheduledExecutor()
|
||||
private val cache: DiskLruCache
|
||||
private var settings: ClientSettings
|
||||
|
||||
private var imageServer: ServerManager? = null
|
||||
private var webUi: Http4kServer? = null
|
||||
|
||||
init {
|
||||
// Read ClientSettings
|
||||
val clientSettings = try {
|
||||
settings = try {
|
||||
readClientSettings()
|
||||
} catch (e: UnrecognizedPropertyException) {
|
||||
dieWithError("'${e.propertyName}' is not a valid setting")
|
||||
} catch (e: JsonProcessingException) {
|
||||
dieWithError(e)
|
||||
} catch (ignored: IOException) {
|
||||
ClientSettings().also {
|
||||
LOGGER.warn { "Settings file $clientSettingsFile not found, generating file" }
|
||||
try {
|
||||
FileWriter(clientSettingsFile).use { writer -> JACKSON.writeValue(writer, it) }
|
||||
} catch (e: IOException) {
|
||||
dieWithError(e)
|
||||
}
|
||||
}
|
||||
} catch (e: ClientSettingsException) {
|
||||
dieWithError(e)
|
||||
} catch (e: IOException) {
|
||||
dieWithError(e)
|
||||
}
|
||||
|
||||
// Initialize things that depend on Client Settings
|
||||
LOGGER.info { "Client settings loaded: $clientSettings" }
|
||||
state = Uninitialized(clientSettings)
|
||||
serverHandler = ServerHandler(clientSettings)
|
||||
|
||||
// Initialize everything else
|
||||
try {
|
||||
cache = DiskLruCache.open(
|
||||
File("cache"), 1, 1,
|
||||
(clientSettings.maxCacheSizeInMebibytes * 1024 * 1024 * 0.8).toLong() /* MiB to bytes */
|
||||
(settings.maxCacheSizeInMebibytes * 1024 * 1024 * 0.8).toLong() /* MiB to bytes */
|
||||
)
|
||||
cache.get("statistics")?.use {
|
||||
try {
|
||||
statistics.set(JACKSON.readValue<Statistics>(it.getInputStream(0)))
|
||||
} catch (_: JsonProcessingException) {
|
||||
cache.remove("statistics")
|
||||
}
|
||||
}
|
||||
} catch (e: HeaderMismatchException) {
|
||||
LOGGER.warn { "Cache version may be outdated - remove if necessary" }
|
||||
dieWithError(e)
|
||||
} catch (e: IOException) {
|
||||
LOGGER.warn { "Cache may be corrupt - remove if necessary" }
|
||||
dieWithError(e)
|
||||
}
|
||||
}
|
||||
|
||||
fun runLoop() {
|
||||
loginAndStartServer()
|
||||
statsMap[Instant.now()] = statistics.get()
|
||||
startWebUi()
|
||||
LOGGER.info { "Mangadex@Home Client initialized. Starting normal operation." }
|
||||
LOGGER.info { "Mangadex@Home Client initialized - starting normal operation." }
|
||||
|
||||
executorService.scheduleAtFixedRate({
|
||||
executor.scheduleWithFixedDelay({
|
||||
try {
|
||||
if (state is Running || state is GracefulShutdown || state is Uninitialized) {
|
||||
statistics.updateAndGet {
|
||||
it.copy(bytesOnDisk = cache.size())
|
||||
}
|
||||
statsMap[Instant.now()] = statistics.get()
|
||||
val editor = cache.edit("statistics")
|
||||
if (editor != null) {
|
||||
JACKSON.writeValue(editor.newOutputStream(0), statistics.get())
|
||||
editor.commit()
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
LOGGER.warn(e) { "Statistics update failed" }
|
||||
}
|
||||
}, 15, 15, TimeUnit.SECONDS)
|
||||
|
||||
var lastBytesSent = statistics.get().bytesSent
|
||||
executorService.scheduleAtFixedRate({
|
||||
try {
|
||||
lastBytesSent = statistics.get().bytesSent
|
||||
|
||||
val state = this.state
|
||||
if (state is GracefulShutdown) {
|
||||
LOGGER.info { "Aborting graceful shutdown started due to hourly bandwidth limit" }
|
||||
|
||||
this.state = state.lastRunning
|
||||
}
|
||||
if (state is Uninitialized) {
|
||||
LOGGER.info { "Restarting server stopped due to hourly bandwidth limit" }
|
||||
|
||||
loginAndStartServer()
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
LOGGER.warn(e) { "Hourly bandwidth check failed" }
|
||||
}
|
||||
}, 1, 1, TimeUnit.HOURS)
|
||||
|
||||
executorService.scheduleAtFixedRate({
|
||||
try {
|
||||
val state = this.state
|
||||
if (state is GracefulShutdown) {
|
||||
val timesToWait = state.lastRunning.clientSettings.gracefulShutdownWaitSeconds / 15
|
||||
when {
|
||||
state.counts == 0 -> {
|
||||
LOGGER.info { "Starting graceful shutdown" }
|
||||
|
||||
logout()
|
||||
isHandled.set(false)
|
||||
this.state = state.copy(counts = state.counts + 1)
|
||||
}
|
||||
state.counts == timesToWait || !isHandled.get() -> {
|
||||
if (!isHandled.get()) {
|
||||
LOGGER.info { "No requests received, shutting down" }
|
||||
} else {
|
||||
LOGGER.info { "Max tries attempted (${state.counts} out of $timesToWait), shutting down" }
|
||||
}
|
||||
|
||||
stopServer(state.nextState)
|
||||
state.action()
|
||||
}
|
||||
else -> {
|
||||
LOGGER.info {
|
||||
"Waiting another 15 seconds for graceful shutdown (${state.counts} out of $timesToWait)"
|
||||
}
|
||||
|
||||
isHandled.set(false)
|
||||
this.state = state.copy(counts = state.counts + 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
LOGGER.warn(e) { "Main loop failed" }
|
||||
}
|
||||
}, 15, 15, TimeUnit.SECONDS)
|
||||
|
||||
executorService.scheduleWithFixedDelay({
|
||||
try {
|
||||
val state = this.state
|
||||
if (state is Running) {
|
||||
val currentBytesSent = statistics.get().bytesSent - lastBytesSent
|
||||
if (state.clientSettings.maxMebibytesPerHour != 0L && state.clientSettings.maxMebibytesPerHour * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) {
|
||||
LOGGER.info { "Shutting down server as hourly bandwidth limit reached" }
|
||||
|
||||
this.state = GracefulShutdown(lastRunning = state)
|
||||
} else {
|
||||
pingControl()
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
LOGGER.warn(e) { "Graceful shutdown checker failed" }
|
||||
}
|
||||
}, 45, 45, TimeUnit.SECONDS)
|
||||
|
||||
// Check every minute to see if client settings have changed
|
||||
executorService.scheduleWithFixedDelay({
|
||||
try {
|
||||
val state = this.state
|
||||
if (state is Running) {
|
||||
reloadClientSettings()
|
||||
}
|
||||
reloadClientSettings()
|
||||
} catch (e: Exception) {
|
||||
LOGGER.warn(e) { "Reload of ClientSettings failed" }
|
||||
}
|
||||
}, 60, 60, TimeUnit.SECONDS)
|
||||
}, 1, 1, TimeUnit.MINUTES)
|
||||
}
|
||||
|
||||
private fun pingControl() {
|
||||
val state = this.state as Running
|
||||
|
||||
val newSettings = serverHandler.pingControl(state.settings)
|
||||
if (newSettings != null) {
|
||||
LOGGER.info { "Server settings received: $newSettings" }
|
||||
|
||||
if (newSettings.latestBuild > Constants.CLIENT_BUILD) {
|
||||
LOGGER.warn {
|
||||
"Outdated build detected! Latest: ${newSettings.latestBuild}, Current: ${Constants.CLIENT_BUILD}"
|
||||
}
|
||||
}
|
||||
if (newSettings.tls != null || newSettings.imageServer != state.settings.imageServer) {
|
||||
// certificates or upstream url must have changed, restart webserver
|
||||
LOGGER.info { "Doing internal restart of HTTP server to refresh certs/upstream URL" }
|
||||
|
||||
this.state = GracefulShutdown(lastRunning = state) {
|
||||
loginAndStartServer()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOGGER.info { "Server ping failed - ignoring" }
|
||||
}
|
||||
}
|
||||
|
||||
private fun loginAndStartServer() {
|
||||
val state = this.state as Uninitialized
|
||||
|
||||
val serverSettings = serverHandler.loginToControl()
|
||||
?: dieWithError("Failed to get a login response from server - check API secret for validity")
|
||||
val server = getServer(cache, serverSettings, state.clientSettings, statistics, isHandled).start()
|
||||
|
||||
if (serverSettings.latestBuild > Constants.CLIENT_BUILD) {
|
||||
LOGGER.warn {
|
||||
"Outdated build detected! Latest: ${serverSettings.latestBuild}, Current: ${Constants.CLIENT_BUILD}"
|
||||
}
|
||||
}
|
||||
|
||||
this.state = Running(server, serverSettings, state.clientSettings)
|
||||
LOGGER.info { "Internal HTTP server was successfully started" }
|
||||
}
|
||||
|
||||
private fun logout() {
|
||||
serverHandler.logoutFromControl()
|
||||
}
|
||||
|
||||
private fun stopServer(nextState: State) {
|
||||
val state = this.state.let {
|
||||
when (it) {
|
||||
is Running ->
|
||||
it
|
||||
is GracefulShutdown ->
|
||||
it.lastRunning
|
||||
else ->
|
||||
throw AssertionError()
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.info { "Shutting down HTTP server" }
|
||||
state.server.stop()
|
||||
LOGGER.info { "Internal HTTP server has shut down" }
|
||||
|
||||
this.state = nextState
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the WebUI if the ClientSettings demand it.
|
||||
* This method checks if the WebUI is needed,
|
||||
*/
|
||||
// Precondition: settings must be filled with up-to-date settings and `imageServer` must not be null
|
||||
private fun startWebUi() {
|
||||
val state = this.state
|
||||
// Grab the client settings if available
|
||||
val clientSettings = state.let {
|
||||
when (it) {
|
||||
is Running ->
|
||||
it.clientSettings
|
||||
is Uninitialized ->
|
||||
it.clientSettings
|
||||
else ->
|
||||
null
|
||||
}
|
||||
}
|
||||
settings.webSettings?.let { webSettings ->
|
||||
val imageServer = requireNotNull(imageServer)
|
||||
|
||||
// Only start the Web UI if the settings demand it
|
||||
if (clientSettings?.webSettings != null) {
|
||||
webUi = getUiServer(clientSettings.webSettings, statistics, statsMap)
|
||||
webUi!!.start()
|
||||
if (webUi != null) throw AssertionError()
|
||||
webUi = getUiServer(webSettings, imageServer.statistics, imageServer.statsMap).also {
|
||||
it.start()
|
||||
}
|
||||
LOGGER.info { "WebUI was successfully started" }
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Shutdowns the MangaDexClient
|
||||
*/
|
||||
fun shutdown() {
|
||||
LOGGER.info { "Mangadex@Home Client stopping" }
|
||||
|
||||
val latch = CountDownLatch(1)
|
||||
executorService.schedule({
|
||||
val state = this.state
|
||||
if (state is Running) {
|
||||
this.state = GracefulShutdown(state, nextState = Shutdown) {
|
||||
latch.countDown()
|
||||
}
|
||||
} else if (state is GracefulShutdown) {
|
||||
this.state = state.copy(nextState = Shutdown) {
|
||||
latch.countDown()
|
||||
}
|
||||
} else if (state is Uninitialized || state is Shutdown) {
|
||||
this.state = Shutdown
|
||||
latch.countDown()
|
||||
}
|
||||
}, 0, TimeUnit.SECONDS)
|
||||
latch.await()
|
||||
|
||||
webUi?.close()
|
||||
try {
|
||||
cache.close()
|
||||
} catch (e: IOException) {
|
||||
LOGGER.error(e) { "Cache failed to close" }
|
||||
// Precondition: settings must be filled with up-to-date settings
|
||||
private fun startImageServer() {
|
||||
if (imageServer != null) throw AssertionError()
|
||||
imageServer = ServerManager(settings.serverSettings, settings.devSettings, settings.maxCacheSizeInMebibytes, cache).also {
|
||||
it.start()
|
||||
}
|
||||
LOGGER.info { "Server manager was successfully started" }
|
||||
}
|
||||
|
||||
executorService.shutdown()
|
||||
LOGGER.info { "Mangadex@Home Client stopped" }
|
||||
private fun stopImageServer() {
|
||||
requireNotNull(imageServer).shutdown()
|
||||
LOGGER.info { "Server manager was successfully stopped" }
|
||||
}
|
||||
|
||||
(LoggerFactory.getILoggerFactory() as LoggerContext).stop()
|
||||
private fun stopWebUi() {
|
||||
requireNotNull(webUi).stop()
|
||||
LOGGER.info { "Server manager was successfully stopped" }
|
||||
}
|
||||
|
||||
fun shutdown() {
|
||||
LOGGER.info { "Mangadex@Home Client shutting down" }
|
||||
stopWebUi()
|
||||
stopImageServer()
|
||||
LOGGER.info { "Mangadex@Home Client has shut down" }
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -377,102 +134,81 @@ class MangaDexClient(private val clientSettingsFile: String) {
|
|||
* Web UI and/or the server if needed
|
||||
*/
|
||||
private fun reloadClientSettings() {
|
||||
val state = this.state as Running
|
||||
LOGGER.info { "Reloading client settings" }
|
||||
LOGGER.info { "Checking client settings" }
|
||||
try {
|
||||
val newSettings = readClientSettings()
|
||||
|
||||
if (newSettings == state.clientSettings) {
|
||||
LOGGER.info { "Client Settings have not changed" }
|
||||
if (newSettings == settings) {
|
||||
LOGGER.info { "Client settings unchanged" }
|
||||
return
|
||||
}
|
||||
|
||||
cache.maxSize = (newSettings.maxCacheSizeInMebibytes * 1024 * 1024 * 0.8).toLong()
|
||||
|
||||
// Setting loaded without issue. Figure out
|
||||
// if there are changes that require a restart
|
||||
val restartServer = newSettings.clientSecret != state.clientSettings.clientSecret ||
|
||||
newSettings.clientHostname != state.clientSettings.clientHostname ||
|
||||
newSettings.clientPort != state.clientSettings.clientPort ||
|
||||
newSettings.clientExternalPort != state.clientSettings.clientExternalPort ||
|
||||
newSettings.threads != state.clientSettings.threads ||
|
||||
newSettings.devSettings?.isDev != state.clientSettings.devSettings?.isDev
|
||||
val stopWebUi = newSettings.webSettings != state.clientSettings.webSettings ||
|
||||
newSettings.webSettings?.uiPort != state.clientSettings.webSettings?.uiPort ||
|
||||
newSettings.webSettings?.uiHostname != state.clientSettings.webSettings?.uiHostname
|
||||
val startWebUi = (stopWebUi && newSettings.webSettings != null)
|
||||
val restartServer = newSettings.serverSettings != settings.serverSettings ||
|
||||
newSettings.devSettings != settings.devSettings
|
||||
|
||||
val stopWebUi = restartServer || newSettings.webSettings != settings.webSettings
|
||||
val startWebUi = stopWebUi && newSettings.webSettings != null
|
||||
|
||||
// Stop the the WebUI if needed
|
||||
if (stopWebUi) {
|
||||
LOGGER.info { "Stopping WebUI to reload ClientSettings" }
|
||||
webUi?.close()
|
||||
webUi = null
|
||||
stopWebUi()
|
||||
}
|
||||
|
||||
if (restartServer) {
|
||||
// If we are restarting the server
|
||||
// We must do it gracefully and set
|
||||
// the new settings later
|
||||
LOGGER.info { "Stopping Server to reload ClientSettings" }
|
||||
|
||||
this.state = GracefulShutdown(state, nextState = Uninitialized(clientSettings = newSettings), action = {
|
||||
serverHandler = ServerHandler(newSettings)
|
||||
LOGGER.info { "Reloaded ClientSettings: $newSettings" }
|
||||
|
||||
LOGGER.info { "Starting Server after reloading ClientSettings" }
|
||||
loginAndStartServer()
|
||||
})
|
||||
} else {
|
||||
// If we aren't restarting the server
|
||||
// We can update the settings now
|
||||
this.state = state.copy(clientSettings = newSettings)
|
||||
serverHandler.settings = newSettings
|
||||
LOGGER.info { "Reloaded ClientSettings: $newSettings" }
|
||||
stopImageServer()
|
||||
startImageServer()
|
||||
}
|
||||
|
||||
// Start the WebUI if we had to stop it
|
||||
// and still want it
|
||||
if (startWebUi) {
|
||||
LOGGER.info { "Starting WebUI after reloading ClientSettings" }
|
||||
startWebUi()
|
||||
LOGGER.info { "Started WebUI after reloading ClientSettings" }
|
||||
}
|
||||
} catch (e: UnrecognizedPropertyException) {
|
||||
LOGGER.warn { "Settings file is invalid: '$e.propertyName' is not a valid setting" }
|
||||
} catch (e: JsonProcessingException) {
|
||||
LOGGER.warn { "Settings file is invalid: $e.message" }
|
||||
} catch (e: IOException) {
|
||||
LOGGER.warn { "Settings file is could not be found: $e.message" }
|
||||
} catch (e: ClientSettingsException) {
|
||||
LOGGER.warn { "Can't reload client settings: $e.message" }
|
||||
LOGGER.warn { "Settings file is invalid: $e.message" }
|
||||
} catch (e: IOException) {
|
||||
LOGGER.warn { "Error loading settings file: $e.message" }
|
||||
}
|
||||
}
|
||||
|
||||
private fun validateSettings(settings: ClientSettings) {
|
||||
if (!isSecretValid(settings.clientSecret)) throw ClientSettingsException("Config Error: API Secret is invalid, must be 52 alphanumeric characters")
|
||||
if (settings.clientPort == 0) {
|
||||
throw ClientSettingsException("Config Error: Invalid port number")
|
||||
}
|
||||
if (settings.clientPort in Constants.RESTRICTED_PORTS) {
|
||||
throw ClientSettingsException("Config Error: Unsafe port number")
|
||||
}
|
||||
if (settings.maxCacheSizeInMebibytes < 1024) {
|
||||
throw ClientSettingsException("Config Error: Invalid max cache size, must be >= 1024 MiB (1GiB)")
|
||||
}
|
||||
if (settings.threads < 4) {
|
||||
throw ClientSettingsException("Config Error: Invalid number of threads, must be >= 4")
|
||||
|
||||
fun isSecretValid(clientSecret: String): Boolean {
|
||||
return Pattern.matches("^[a-zA-Z0-9]{$CLIENT_KEY_LENGTH}$", clientSecret)
|
||||
}
|
||||
if (settings.maxMebibytesPerHour < 0) {
|
||||
throw ClientSettingsException("Config Error: Max bandwidth must be >= 0")
|
||||
|
||||
settings.serverSettings.let {
|
||||
if (!isSecretValid(it.clientSecret)) {
|
||||
throw ClientSettingsException("Config Error: API Secret is invalid, must be 52 alphanumeric characters")
|
||||
}
|
||||
if (it.clientPort == 0) {
|
||||
throw ClientSettingsException("Config Error: Invalid port number")
|
||||
}
|
||||
if (it.clientPort in Constants.RESTRICTED_PORTS) {
|
||||
throw ClientSettingsException("Config Error: Unsafe port number")
|
||||
}
|
||||
if (it.threads < 4) {
|
||||
throw ClientSettingsException("Config Error: Invalid number of threads, must be >= 4")
|
||||
}
|
||||
if (it.maxMebibytesPerHour < 0) {
|
||||
throw ClientSettingsException("Config Error: Max bandwidth must be >= 0")
|
||||
}
|
||||
if (it.maxKilobitsPerSecond < 0) {
|
||||
throw ClientSettingsException("Config Error: Max burst rate must be >= 0")
|
||||
}
|
||||
if (it.gracefulShutdownWaitSeconds < 15) {
|
||||
throw ClientSettingsException("Config Error: Graceful shutdown wait must be >= 15")
|
||||
}
|
||||
}
|
||||
if (settings.maxKilobitsPerSecond < 0) {
|
||||
throw ClientSettingsException("Config Error: Max burst rate must be >= 0")
|
||||
}
|
||||
if (settings.gracefulShutdownWaitSeconds < 15) {
|
||||
throw ClientSettingsException("Config Error: Graceful shutdown wait must be >= 15")
|
||||
}
|
||||
if (settings.webSettings != null) {
|
||||
if (settings.webSettings.uiPort == 0) {
|
||||
settings.webSettings?.let {
|
||||
if (it.uiPort == 0) {
|
||||
throw ClientSettingsException("Config Error: Invalid UI port number")
|
||||
}
|
||||
}
|
||||
|
@ -482,12 +218,9 @@ class MangaDexClient(private val clientSettingsFile: String) {
|
|||
return JACKSON.readValue<ClientSettings>(FileReader(clientSettingsFile)).apply(::validateSettings)
|
||||
}
|
||||
|
||||
private fun isSecretValid(clientSecret: String): Boolean {
|
||||
return Pattern.matches("^[a-zA-Z0-9]*$CLIENT_KEY_LENGTH}$", clientSecret)
|
||||
}
|
||||
|
||||
companion object {
|
||||
private const val CLIENT_KEY_LENGTH = 52
|
||||
private val LOGGER = LoggerFactory.getLogger(MangaDexClient::class.java)
|
||||
private val JACKSON: ObjectMapper = jacksonObjectMapper()
|
||||
private val JACKSON: ObjectMapper = jacksonObjectMapper().configure(JsonParser.Feature.ALLOW_COMMENTS, true)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,7 +22,8 @@ import com.fasterxml.jackson.databind.DeserializationFeature
|
|||
import com.fasterxml.jackson.module.kotlin.KotlinModule
|
||||
import java.net.InetAddress
|
||||
import mdnet.base.ServerHandlerJackson.auto
|
||||
import mdnet.base.settings.ClientSettings
|
||||
import mdnet.base.settings.DevSettings
|
||||
import mdnet.base.settings.RemoteSettings
|
||||
import mdnet.base.settings.ServerSettings
|
||||
import org.apache.http.client.config.RequestConfig
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
|
@ -43,13 +44,13 @@ object ServerHandlerJackson : ConfigurableJackson(
|
|||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
)
|
||||
|
||||
class ServerHandler(var settings: ClientSettings) {
|
||||
class ServerHandler(private val serverSettings: ServerSettings, private val devSettings: DevSettings, private val maxCacheSizeInMebibytes: Long) {
|
||||
private val client = ApacheClient(client = HttpClients.custom()
|
||||
.setDefaultRequestConfig(
|
||||
RequestConfig.custom()
|
||||
.apply {
|
||||
if (settings.clientHostname != "0.0.0.0") {
|
||||
setLocalAddress(InetAddress.getByName(settings.clientHostname))
|
||||
if (serverSettings.clientHostname != "0.0.0.0") {
|
||||
setLocalAddress(InetAddress.getByName(serverSettings.clientHostname))
|
||||
}
|
||||
}
|
||||
.build())
|
||||
|
@ -58,7 +59,7 @@ class ServerHandler(var settings: ClientSettings) {
|
|||
fun logoutFromControl(): Boolean {
|
||||
LOGGER.info { "Disconnecting from the control server" }
|
||||
val params = mapOf<String, Any>(
|
||||
"secret" to settings.clientSecret
|
||||
"secret" to serverSettings.clientSecret
|
||||
)
|
||||
|
||||
val request = STRING_ANY_MAP_LENS(params, Request(Method.POST, getServerAddress() + "stop"))
|
||||
|
@ -69,16 +70,16 @@ class ServerHandler(var settings: ClientSettings) {
|
|||
|
||||
private fun getPingParams(tlsCreatedAt: String? = null): Map<String, Any> =
|
||||
mapOf<String, Any>(
|
||||
"secret" to settings.clientSecret,
|
||||
"secret" to serverSettings.clientSecret,
|
||||
"port" to let {
|
||||
if (settings.clientExternalPort != 0) {
|
||||
settings.clientExternalPort
|
||||
if (serverSettings.clientExternalPort != 0) {
|
||||
serverSettings.clientExternalPort
|
||||
} else {
|
||||
settings.clientPort
|
||||
serverSettings.clientPort
|
||||
}
|
||||
},
|
||||
"disk_space" to settings.maxCacheSizeInMebibytes * 1024 * 1024,
|
||||
"network_speed" to settings.maxKilobitsPerSecond * 1000 / 8,
|
||||
"disk_space" to maxCacheSizeInMebibytes * 1024 * 1024,
|
||||
"network_speed" to serverSettings.maxKilobitsPerSecond * 1000 / 8,
|
||||
"build_version" to Constants.CLIENT_BUILD
|
||||
).let {
|
||||
if (tlsCreatedAt != null) {
|
||||
|
@ -88,7 +89,7 @@ class ServerHandler(var settings: ClientSettings) {
|
|||
}
|
||||
}
|
||||
|
||||
fun loginToControl(): ServerSettings? {
|
||||
fun loginToControl(): RemoteSettings? {
|
||||
LOGGER.info { "Connecting to the control server" }
|
||||
|
||||
val request = STRING_ANY_MAP_LENS(getPingParams(), Request(Method.POST, getServerAddress() + "ping"))
|
||||
|
@ -101,7 +102,7 @@ class ServerHandler(var settings: ClientSettings) {
|
|||
}
|
||||
}
|
||||
|
||||
fun pingControl(old: ServerSettings): ServerSettings? {
|
||||
fun pingControl(old: RemoteSettings): RemoteSettings? {
|
||||
LOGGER.info { "Pinging the control server" }
|
||||
|
||||
val request = STRING_ANY_MAP_LENS(getPingParams(old.tls!!.createdAt), Request(Method.POST, getServerAddress() + "ping"))
|
||||
|
@ -115,7 +116,7 @@ class ServerHandler(var settings: ClientSettings) {
|
|||
}
|
||||
|
||||
private fun getServerAddress(): String {
|
||||
return if (settings.devSettings?.isDev != true)
|
||||
return if (!devSettings.isDev)
|
||||
SERVER_ADDRESS
|
||||
else
|
||||
SERVER_ADDRESS_DEV
|
||||
|
@ -124,7 +125,7 @@ class ServerHandler(var settings: ClientSettings) {
|
|||
companion object {
|
||||
private val LOGGER = LoggerFactory.getLogger(ServerHandler::class.java)
|
||||
private val STRING_ANY_MAP_LENS = Body.auto<Map<String, Any>>().toLens()
|
||||
private val SERVER_SETTINGS_LENS = Body.auto<ServerSettings>().toLens()
|
||||
private val SERVER_SETTINGS_LENS = Body.auto<RemoteSettings>().toLens()
|
||||
private const val SERVER_ADDRESS = "https://api.mangadex.network/"
|
||||
private const val SERVER_ADDRESS_DEV = "https://mangadex-test.net/"
|
||||
}
|
||||
|
|
270
src/main/kotlin/mdnet/base/ServerManager.kt
Normal file
270
src/main/kotlin/mdnet/base/ServerManager.kt
Normal file
|
@ -0,0 +1,270 @@
|
|||
package mdnet.base
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException
|
||||
import com.fasterxml.jackson.databind.ObjectMapper
|
||||
import com.fasterxml.jackson.databind.SerializationFeature
|
||||
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
|
||||
import com.fasterxml.jackson.module.kotlin.readValue
|
||||
import java.io.IOException
|
||||
import java.time.Instant
|
||||
import java.util.Collections
|
||||
import java.util.LinkedHashMap
|
||||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import mdnet.base.data.Statistics
|
||||
import mdnet.base.server.getServer
|
||||
import mdnet.base.settings.DevSettings
|
||||
import mdnet.base.settings.RemoteSettings
|
||||
import mdnet.base.settings.ServerSettings
|
||||
import mdnet.cache.DiskLruCache
|
||||
import org.http4k.server.Http4kServer
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
sealed class State
|
||||
// server is not running
|
||||
data class Uninitialized(val serverSettings: ServerSettings, val devSettings: DevSettings) : State()
|
||||
// server has shut down
|
||||
object Shutdown : State()
|
||||
// server is in the process of stopping
|
||||
data class GracefulStop(val lastRunning: Running, val counts: Int = 0, val nextState: State = Uninitialized(lastRunning.serverSettings, lastRunning.devSettings), val action: () -> Unit = {}) : State()
|
||||
// server is currently running
|
||||
data class Running(val server: Http4kServer, val settings: RemoteSettings, val serverSettings: ServerSettings, val devSettings: DevSettings) : State()
|
||||
|
||||
class ServerManager(serverSettings: ServerSettings, devSettings: DevSettings, maxCacheSizeInMebibytes: Long, private val cache: DiskLruCache) {
|
||||
// this must remain single-threaded because of how the state mechanism works
|
||||
private val executor = Executors.newSingleThreadScheduledExecutor()
|
||||
|
||||
// state that must only be accessed from the thread on the executorService
|
||||
private var state: State
|
||||
private var serverHandler: ServerHandler
|
||||
// end protected state
|
||||
|
||||
val statsMap: MutableMap<Instant, Statistics> = Collections
|
||||
.synchronizedMap(object : LinkedHashMap<Instant, Statistics>(240) {
|
||||
override fun removeEldestEntry(eldest: Map.Entry<Instant, Statistics>): Boolean {
|
||||
return this.size > 240
|
||||
}
|
||||
})
|
||||
val statistics: AtomicReference<Statistics> = AtomicReference(
|
||||
Statistics()
|
||||
)
|
||||
|
||||
private val isHandled: AtomicBoolean = AtomicBoolean(false)
|
||||
|
||||
init {
|
||||
state = Uninitialized(serverSettings, devSettings)
|
||||
serverHandler = ServerHandler(serverSettings, devSettings, maxCacheSizeInMebibytes)
|
||||
|
||||
cache.get("statistics")?.use {
|
||||
try {
|
||||
statistics.set(JACKSON.readValue<Statistics>(it.getInputStream(0)))
|
||||
} catch (_: JsonProcessingException) {
|
||||
cache.remove("statistics")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun start() {
|
||||
loginAndStartServer()
|
||||
statsMap[Instant.now()] = statistics.get()
|
||||
|
||||
executor.scheduleAtFixedRate({
|
||||
try {
|
||||
if (state is Running || state is GracefulStop || state is Uninitialized) {
|
||||
statistics.updateAndGet {
|
||||
it.copy(bytesOnDisk = cache.size())
|
||||
}
|
||||
statsMap[Instant.now()] = statistics.get()
|
||||
val editor = cache.edit("statistics")
|
||||
if (editor != null) {
|
||||
JACKSON.writeValue(editor.newOutputStream(0), statistics.get())
|
||||
editor.commit()
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
LOGGER.warn(e) { "Statistics update failed" }
|
||||
}
|
||||
}, 15, 15, TimeUnit.SECONDS)
|
||||
|
||||
var lastBytesSent = statistics.get().bytesSent
|
||||
executor.scheduleAtFixedRate({
|
||||
try {
|
||||
lastBytesSent = statistics.get().bytesSent
|
||||
|
||||
val state = this.state
|
||||
if (state is GracefulStop) {
|
||||
LOGGER.info { "Aborting graceful shutdown started due to hourly bandwidth limit" }
|
||||
|
||||
this.state = state.lastRunning
|
||||
}
|
||||
if (state is Uninitialized) {
|
||||
LOGGER.info { "Restarting server stopped due to hourly bandwidth limit" }
|
||||
|
||||
loginAndStartServer()
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
LOGGER.warn(e) { "Hourly bandwidth check failed" }
|
||||
}
|
||||
}, 1, 1, TimeUnit.HOURS)
|
||||
|
||||
executor.scheduleAtFixedRate({
|
||||
try {
|
||||
val state = this.state
|
||||
if (state is GracefulStop) {
|
||||
val timesToWait = state.lastRunning.serverSettings.gracefulShutdownWaitSeconds / 15
|
||||
when {
|
||||
state.counts == 0 -> {
|
||||
LOGGER.info { "Starting graceful stop" }
|
||||
|
||||
logout()
|
||||
isHandled.set(false)
|
||||
this.state = state.copy(counts = state.counts + 1)
|
||||
}
|
||||
state.counts == timesToWait || !isHandled.get() -> {
|
||||
if (!isHandled.get()) {
|
||||
LOGGER.info { "No requests received, stopping" }
|
||||
} else {
|
||||
LOGGER.info { "Max tries attempted (${state.counts} out of $timesToWait), shutting down" }
|
||||
}
|
||||
|
||||
stopServer(state.nextState)
|
||||
state.action()
|
||||
}
|
||||
else -> {
|
||||
LOGGER.info {
|
||||
"Waiting another 15 seconds for graceful stop (${state.counts} out of $timesToWait)"
|
||||
}
|
||||
|
||||
isHandled.set(false)
|
||||
this.state = state.copy(counts = state.counts + 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
LOGGER.warn(e) { "Main loop failed" }
|
||||
}
|
||||
}, 15, 15, TimeUnit.SECONDS)
|
||||
|
||||
executor.scheduleWithFixedDelay({
|
||||
try {
|
||||
val state = this.state
|
||||
if (state is Running) {
|
||||
val currentBytesSent = statistics.get().bytesSent - lastBytesSent
|
||||
if (state.serverSettings.maxMebibytesPerHour != 0L && state.serverSettings.maxMebibytesPerHour * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) {
|
||||
LOGGER.info { "Stopping image server as hourly bandwidth limit reached" }
|
||||
|
||||
this.state = GracefulStop(lastRunning = state)
|
||||
} else {
|
||||
pingControl()
|
||||
}
|
||||
}
|
||||
} catch (e: Exception) {
|
||||
LOGGER.warn(e) { "Graceful shutdown checker failed" }
|
||||
}
|
||||
}, 45, 45, TimeUnit.SECONDS)
|
||||
}
|
||||
|
||||
private fun pingControl() {
|
||||
val state = this.state as Running
|
||||
|
||||
val newSettings = serverHandler.pingControl(state.settings)
|
||||
if (newSettings != null) {
|
||||
LOGGER.info { "Server settings received: $newSettings" }
|
||||
|
||||
if (newSettings.latestBuild > Constants.CLIENT_BUILD) {
|
||||
LOGGER.warn {
|
||||
"Outdated build detected! Latest: ${newSettings.latestBuild}, Current: ${Constants.CLIENT_BUILD}"
|
||||
}
|
||||
}
|
||||
if (newSettings.tls != null || newSettings.imageServer != state.settings.imageServer) {
|
||||
// certificates or upstream url must have changed, restart webserver
|
||||
LOGGER.info { "Doing internal restart of HTTP server to refresh certs/upstream URL" }
|
||||
|
||||
this.state = GracefulStop(lastRunning = state) {
|
||||
loginAndStartServer()
|
||||
}
|
||||
}
|
||||
} else {
|
||||
LOGGER.info { "Server ping failed - ignoring" }
|
||||
}
|
||||
}
|
||||
|
||||
private fun loginAndStartServer() {
|
||||
val state = this.state as Uninitialized
|
||||
|
||||
val remoteSettings = serverHandler.loginToControl()
|
||||
?: Main.dieWithError("Failed to get a login response from server - check API secret for validity")
|
||||
val server = getServer(cache, remoteSettings, state.serverSettings, statistics, isHandled).start()
|
||||
|
||||
if (remoteSettings.latestBuild > Constants.CLIENT_BUILD) {
|
||||
LOGGER.warn {
|
||||
"Outdated build detected! Latest: ${remoteSettings.latestBuild}, Current: ${Constants.CLIENT_BUILD}"
|
||||
}
|
||||
}
|
||||
|
||||
this.state = Running(server, remoteSettings, state.serverSettings, state.devSettings)
|
||||
LOGGER.info { "Internal HTTP server was successfully started" }
|
||||
}
|
||||
|
||||
private fun logout() {
|
||||
serverHandler.logoutFromControl()
|
||||
}
|
||||
|
||||
private fun stopServer(nextState: State) {
|
||||
val state = this.state.let {
|
||||
when (it) {
|
||||
is Running ->
|
||||
it
|
||||
is GracefulStop ->
|
||||
it.lastRunning
|
||||
else ->
|
||||
throw AssertionError()
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.info { "Image server stopping" }
|
||||
state.server.stop()
|
||||
LOGGER.info { "Image server has stopped" }
|
||||
|
||||
this.state = nextState
|
||||
}
|
||||
|
||||
fun shutdown() {
|
||||
LOGGER.info { "Image server shutting down" }
|
||||
|
||||
val latch = CountDownLatch(1)
|
||||
executor.schedule({
|
||||
val state = this.state
|
||||
if (state is Running) {
|
||||
this.state = GracefulStop(state, nextState = Shutdown) {
|
||||
latch.countDown()
|
||||
}
|
||||
} else if (state is GracefulStop) {
|
||||
this.state = state.copy(nextState = Shutdown) {
|
||||
latch.countDown()
|
||||
}
|
||||
} else if (state is Uninitialized || state is Shutdown) {
|
||||
this.state = Shutdown
|
||||
latch.countDown()
|
||||
}
|
||||
}, 0, TimeUnit.SECONDS)
|
||||
latch.await()
|
||||
|
||||
try {
|
||||
cache.close()
|
||||
} catch (e: IOException) {
|
||||
LOGGER.error(e) { "Cache failed to close" }
|
||||
}
|
||||
|
||||
executor.shutdown()
|
||||
LOGGER.info { "Image server has shut down" }
|
||||
}
|
||||
|
||||
companion object {
|
||||
private val LOGGER = LoggerFactory.getLogger(ServerManager::class.java)
|
||||
private val JACKSON: ObjectMapper = jacksonObjectMapper().enable(SerializationFeature.INDENT_OUTPUT)
|
||||
}
|
||||
}
|
|
@ -48,7 +48,7 @@ import javax.net.ssl.SSLException
|
|||
import mdnet.base.Constants
|
||||
import mdnet.base.data.Statistics
|
||||
import mdnet.base.info
|
||||
import mdnet.base.settings.ClientSettings
|
||||
import mdnet.base.settings.ServerSettings
|
||||
import mdnet.base.settings.TlsCert
|
||||
import mdnet.base.trace
|
||||
import org.http4k.core.HttpHandler
|
||||
|
@ -59,15 +59,15 @@ import org.slf4j.LoggerFactory
|
|||
|
||||
private val LOGGER = LoggerFactory.getLogger("Application")
|
||||
|
||||
class Netty(private val tls: TlsCert, private val clientSettings: ClientSettings, private val statistics: AtomicReference<Statistics>) : ServerConfig {
|
||||
class Netty(private val tls: TlsCert, internal val serverSettings: ServerSettings, private val statistics: AtomicReference<Statistics>) : ServerConfig {
|
||||
override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer {
|
||||
private val masterGroup = NioEventLoopGroup(clientSettings.threads)
|
||||
private val workerGroup = NioEventLoopGroup(clientSettings.threads)
|
||||
private val masterGroup = NioEventLoopGroup(serverSettings.threads)
|
||||
private val workerGroup = NioEventLoopGroup(serverSettings.threads)
|
||||
private lateinit var closeFuture: ChannelFuture
|
||||
private lateinit var address: InetSocketAddress
|
||||
|
||||
private val burstLimiter = object : GlobalTrafficShapingHandler(
|
||||
workerGroup, clientSettings.maxKilobitsPerSecond * 1000L / 8L, 0, 50) {
|
||||
workerGroup, serverSettings.maxKilobitsPerSecond * 1000L / 8L, 0, 50) {
|
||||
override fun doAccounting(counter: TrafficCounter) {
|
||||
statistics.getAndUpdate {
|
||||
it.copy(bytesSent = it.bytesSent + counter.cumulativeWrittenBytes())
|
||||
|
@ -77,7 +77,7 @@ class Netty(private val tls: TlsCert, private val clientSettings: ClientSettings
|
|||
}
|
||||
|
||||
override fun start(): Http4kServer = apply {
|
||||
LOGGER.info { "Starting Netty with ${clientSettings.threads} threads" }
|
||||
LOGGER.info { "Starting Netty with ${serverSettings.threads} threads" }
|
||||
|
||||
val certs = getX509Certs(tls.certificate)
|
||||
val sslContext = SslContextBuilder
|
||||
|
@ -104,7 +104,7 @@ class Netty(private val tls: TlsCert, private val clientSettings: ClientSettings
|
|||
ch.pipeline().addLast("streamer", ChunkedWriteHandler())
|
||||
ch.pipeline().addLast("handler", Http4kChannelHandler(httpHandler))
|
||||
|
||||
ch.pipeline().addLast("handle_ssl", object : ChannelInboundHandlerAdapter() {
|
||||
ch.pipeline().addLast("exceptions", object : ChannelInboundHandlerAdapter() {
|
||||
override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) {
|
||||
if (cause is SSLException || (cause is DecoderException && cause.cause is SSLException)) {
|
||||
LOGGER.trace { "Ignored invalid SSL connection" }
|
||||
|
@ -121,7 +121,7 @@ class Netty(private val tls: TlsCert, private val clientSettings: ClientSettings
|
|||
.option(ChannelOption.SO_BACKLOG, 1000)
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true)
|
||||
|
||||
val channel = bootstrap.bind(InetSocketAddress(clientSettings.clientHostname, clientSettings.clientPort)).sync().channel()
|
||||
val channel = bootstrap.bind(InetSocketAddress(serverSettings.clientHostname, serverSettings.clientPort)).sync().channel()
|
||||
address = channel.localAddress() as InetSocketAddress
|
||||
closeFuture = channel.closeFuture()
|
||||
}
|
||||
|
@ -132,7 +132,7 @@ class Netty(private val tls: TlsCert, private val clientSettings: ClientSettings
|
|||
closeFuture.sync()
|
||||
}
|
||||
|
||||
override fun port(): Int = if (clientSettings.clientPort > 0) clientSettings.clientPort else address.port
|
||||
override fun port(): Int = if (serverSettings.clientPort > 0) serverSettings.clientPort else address.port
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -32,7 +32,7 @@ private const val PKCS_1_PEM_FOOTER = "-----END RSA PRIVATE KEY-----"
|
|||
private const val PKCS_8_PEM_HEADER = "-----BEGIN PRIVATE KEY-----"
|
||||
private const val PKCS_8_PEM_FOOTER = "-----END PRIVATE KEY-----"
|
||||
|
||||
fun loadKey(keyDataString: String): PrivateKey? {
|
||||
internal fun loadKey(keyDataString: String): PrivateKey? {
|
||||
if (keyDataString.contains(PKCS_1_PEM_HEADER)) {
|
||||
val fixedString = keyDataString.replace(PKCS_1_PEM_HEADER, "").replace(
|
||||
PKCS_1_PEM_FOOTER, "")
|
||||
|
|
|
@ -1,115 +0,0 @@
|
|||
/*
|
||||
Mangadex@Home
|
||||
Copyright (c) 2020, MangaDex Network
|
||||
This file is part of MangaDex@Home.
|
||||
|
||||
MangaDex@Home is free software: you can redistribute it and/or modify
|
||||
it under the terms of the GNU General Public License as published by
|
||||
the Free Software Foundation, either version 3 of the License, or
|
||||
(at your option) any later version.
|
||||
|
||||
MangaDex@Home is distributed in the hope that it will be useful,
|
||||
but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
GNU General Public License for more details.
|
||||
|
||||
You should have received a copy of the GNU General Public License
|
||||
along with this MangaDex@Home. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
/* ktlint-disable no-wildcard-imports */
|
||||
package mdnet.base.server
|
||||
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import mdnet.base.data.Statistics
|
||||
import mdnet.base.info
|
||||
import mdnet.base.netty.Netty
|
||||
import mdnet.base.settings.ClientSettings
|
||||
import mdnet.base.settings.ServerSettings
|
||||
import mdnet.cache.DiskLruCache
|
||||
import org.apache.http.client.config.CookieSpecs
|
||||
import org.apache.http.client.config.RequestConfig
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.http4k.client.ApacheClient
|
||||
import org.http4k.core.*
|
||||
import org.http4k.filter.ServerFilters
|
||||
import org.http4k.routing.bind
|
||||
import org.http4k.routing.routes
|
||||
import org.http4k.server.Http4kServer
|
||||
import org.http4k.server.asServer
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
private val LOGGER = LoggerFactory.getLogger("Application")
|
||||
|
||||
fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSettings: ClientSettings, statistics: AtomicReference<Statistics>, isHandled: AtomicBoolean): Http4kServer {
|
||||
val database = Database.connect("jdbc:sqlite:cache/data.db", "org.sqlite.JDBC")
|
||||
val client = ApacheClient(responseBodyMode = BodyMode.Stream, client = HttpClients.custom()
|
||||
.disableConnectionState()
|
||||
.setDefaultRequestConfig(
|
||||
RequestConfig.custom()
|
||||
.setCookieSpec(CookieSpecs.IGNORE_COOKIES)
|
||||
.setConnectTimeout(3000)
|
||||
.setSocketTimeout(3000)
|
||||
.setConnectionRequestTimeout(3000)
|
||||
.build())
|
||||
.setMaxConnTotal(3000)
|
||||
.setMaxConnPerRoute(3000)
|
||||
.build())
|
||||
|
||||
val imageServer = ImageServer(cache, database, statistics, serverSettings, client)
|
||||
|
||||
return timeRequest()
|
||||
.then(catchAllHideDetails())
|
||||
.then(ServerFilters.CatchLensFailure)
|
||||
.then(setHandled(isHandled))
|
||||
.then(addCommonHeaders())
|
||||
.then(
|
||||
routes(
|
||||
"/data/{chapterHash}/{fileName}" bind Method.GET to imageServer.handler(dataSaver = false),
|
||||
"/data-saver/{chapterHash}/{fileName}" bind Method.GET to imageServer.handler(dataSaver = true),
|
||||
"/{token}/data/{chapterHash}/{fileName}" bind Method.GET to imageServer.handler(
|
||||
dataSaver = false,
|
||||
tokenized = true
|
||||
),
|
||||
"/{token}/data-saver/{chapterHash}/{fileName}" bind Method.GET to imageServer.handler(
|
||||
dataSaver = true,
|
||||
tokenized = true
|
||||
)
|
||||
)
|
||||
)
|
||||
.asServer(Netty(serverSettings.tls!!, clientSettings, statistics))
|
||||
}
|
||||
|
||||
fun setHandled(isHandled: AtomicBoolean): Filter {
|
||||
return Filter { next: HttpHandler ->
|
||||
{
|
||||
isHandled.set(true)
|
||||
next(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun timeRequest(): Filter {
|
||||
return Filter { next: HttpHandler ->
|
||||
{ request: Request ->
|
||||
val cleanedUri = request.uri.path.let {
|
||||
if (it.startsWith("/data")) {
|
||||
it
|
||||
} else {
|
||||
it.replaceBefore("/data", "/{token}")
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.info { "Request for $cleanedUri received from ${request.source?.address}" }
|
||||
|
||||
val start = System.currentTimeMillis()
|
||||
val response = next(request)
|
||||
val latency = System.currentTimeMillis() - start
|
||||
|
||||
LOGGER.info { "Request for $cleanedUri completed (TTFB) in ${latency}ms" }
|
||||
|
||||
response.header("X-Time-Taken", latency.toString())
|
||||
}
|
||||
}
|
||||
}
|
|
@ -32,40 +32,52 @@ import java.io.BufferedInputStream
|
|||
import java.io.BufferedOutputStream
|
||||
import java.io.File
|
||||
import java.io.InputStream
|
||||
import java.security.MessageDigest
|
||||
import java.time.Clock
|
||||
import java.time.OffsetDateTime
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import javax.crypto.Cipher
|
||||
import javax.crypto.CipherInputStream
|
||||
import javax.crypto.CipherOutputStream
|
||||
import javax.crypto.spec.SecretKeySpec
|
||||
import mdnet.base.Constants
|
||||
import mdnet.base.data.ImageData
|
||||
import mdnet.base.data.ImageDatum
|
||||
import mdnet.base.data.Statistics
|
||||
import mdnet.base.data.Token
|
||||
import mdnet.base.info
|
||||
import mdnet.base.netty.Netty
|
||||
import mdnet.base.settings.RemoteSettings
|
||||
import mdnet.base.settings.ServerSettings
|
||||
import mdnet.base.trace
|
||||
import mdnet.base.warn
|
||||
import mdnet.cache.CachingInputStream
|
||||
import mdnet.cache.DiskLruCache
|
||||
import org.apache.http.client.config.CookieSpecs
|
||||
import org.apache.http.client.config.RequestConfig
|
||||
import org.apache.http.impl.client.HttpClients
|
||||
import org.http4k.client.ApacheClient
|
||||
import org.http4k.core.*
|
||||
import org.http4k.filter.CachingFilters
|
||||
import org.http4k.filter.ServerFilters
|
||||
import org.http4k.lens.Path
|
||||
import org.http4k.routing.bind
|
||||
import org.http4k.routing.routes
|
||||
import org.http4k.server.Http4kServer
|
||||
import org.http4k.server.asServer
|
||||
import org.jetbrains.exposed.sql.Database
|
||||
import org.jetbrains.exposed.sql.SchemaUtils
|
||||
import org.jetbrains.exposed.sql.transactions.transaction
|
||||
import org.slf4j.LoggerFactory
|
||||
|
||||
private val LOGGER = LoggerFactory.getLogger(ImageServer::class.java)
|
||||
|
||||
class ImageServer(
|
||||
private val cache: DiskLruCache,
|
||||
private val database: Database,
|
||||
private val statistics: AtomicReference<Statistics>,
|
||||
private val serverSettings: ServerSettings,
|
||||
private val remoteSettings: RemoteSettings,
|
||||
private val client: HttpHandler
|
||||
) {
|
||||
init {
|
||||
|
@ -93,7 +105,7 @@ class ImageServer(
|
|||
return@then Response(Status.FORBIDDEN)
|
||||
}
|
||||
|
||||
if (tokenized || serverSettings.forceTokens) {
|
||||
if (tokenized || remoteSettings.forceTokens) {
|
||||
val tokenArr = Base64.getUrlDecoder().decode(Path.of("token")(request))
|
||||
if (tokenArr.size < 24) {
|
||||
LOGGER.info { "Request for $sanitizedUri rejected for invalid token" }
|
||||
|
@ -103,7 +115,7 @@ class ImageServer(
|
|||
JACKSON.readValue<Token>(
|
||||
try {
|
||||
SODIUM.cryptoBoxOpenEasyAfterNm(
|
||||
tokenArr.sliceArray(24 until tokenArr.size), tokenArr.sliceArray(0 until 24), serverSettings.tokenKey
|
||||
tokenArr.sliceArray(24 until tokenArr.size), tokenArr.sliceArray(0 until 24), remoteSettings.tokenKey
|
||||
)
|
||||
} catch (_: SodiumException) {
|
||||
LOGGER.info { "Request for $sanitizedUri rejected for invalid token" }
|
||||
|
@ -209,7 +221,7 @@ class ImageServer(
|
|||
it.copy(cacheMisses = it.cacheMisses + 1)
|
||||
}
|
||||
|
||||
val mdResponse = client(Request(Method.GET, "${serverSettings.imageServer}$sanitizedUri"))
|
||||
val mdResponse = client(Request(Method.GET, "${remoteSettings.imageServer}$sanitizedUri"))
|
||||
|
||||
if (mdResponse.status != Status.OK) {
|
||||
LOGGER.trace { "Upstream query for $sanitizedUri errored with status ${mdResponse.status}" }
|
||||
|
@ -298,7 +310,6 @@ class ImageServer(
|
|||
|
||||
companion object {
|
||||
private val SODIUM = LazySodiumJava(SodiumJava())
|
||||
private val LOGGER = LoggerFactory.getLogger(ImageServer::class.java)
|
||||
private val JACKSON: ObjectMapper = jacksonObjectMapper()
|
||||
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)
|
||||
.registerModule(JavaTimeModule())
|
||||
|
@ -318,23 +329,76 @@ class ImageServer(
|
|||
}
|
||||
}
|
||||
|
||||
private fun getRc4(key: ByteArray): Cipher {
|
||||
val rc4 = Cipher.getInstance("RC4")
|
||||
rc4.init(Cipher.ENCRYPT_MODE, SecretKeySpec(key, "RC4"))
|
||||
return rc4
|
||||
}
|
||||
|
||||
private fun md5Bytes(stringToHash: String): ByteArray {
|
||||
val digest = MessageDigest.getInstance("MD5")
|
||||
return digest.digest(stringToHash.toByteArray())
|
||||
}
|
||||
|
||||
private fun printHexString(bytes: ByteArray): String {
|
||||
val sb = StringBuilder()
|
||||
for (b in bytes) {
|
||||
sb.append(String.format("%02x", b))
|
||||
}
|
||||
return sb.toString()
|
||||
}
|
||||
|
||||
private fun String.isImageMimetype() = this.toLowerCase().startsWith("image/")
|
||||
|
||||
fun getServer(cache: DiskLruCache, remoteSettings: RemoteSettings, serverSettings: ServerSettings, statistics: AtomicReference<Statistics>, isHandled: AtomicBoolean): Http4kServer {
|
||||
val database = Database.connect("jdbc:sqlite:cache/data.db", "org.sqlite.JDBC")
|
||||
val client = ApacheClient(responseBodyMode = BodyMode.Stream, client = HttpClients.custom()
|
||||
.disableConnectionState()
|
||||
.setDefaultRequestConfig(
|
||||
RequestConfig.custom()
|
||||
.setCookieSpec(CookieSpecs.IGNORE_COOKIES)
|
||||
.setConnectTimeout(3000)
|
||||
.setSocketTimeout(3000)
|
||||
.setConnectionRequestTimeout(3000)
|
||||
.build())
|
||||
.setMaxConnTotal(3000)
|
||||
.setMaxConnPerRoute(3000)
|
||||
.build())
|
||||
|
||||
val imageServer = ImageServer(cache, database, statistics, remoteSettings, client)
|
||||
|
||||
return timeRequest()
|
||||
.then(catchAllHideDetails())
|
||||
.then(ServerFilters.CatchLensFailure)
|
||||
.then(setHandled(isHandled))
|
||||
.then(addCommonHeaders())
|
||||
.then(
|
||||
routes(
|
||||
"/data/{chapterHash}/{fileName}" bind Method.GET to imageServer.handler(dataSaver = false),
|
||||
"/data-saver/{chapterHash}/{fileName}" bind Method.GET to imageServer.handler(dataSaver = true),
|
||||
"/{token}/data/{chapterHash}/{fileName}" bind Method.GET to imageServer.handler(
|
||||
dataSaver = false,
|
||||
tokenized = true
|
||||
),
|
||||
"/{token}/data-saver/{chapterHash}/{fileName}" bind Method.GET to imageServer.handler(
|
||||
dataSaver = true,
|
||||
tokenized = true
|
||||
)
|
||||
)
|
||||
)
|
||||
.asServer(Netty(remoteSettings.tls!!, serverSettings, statistics))
|
||||
}
|
||||
|
||||
fun setHandled(isHandled: AtomicBoolean): Filter {
|
||||
return Filter { next: HttpHandler ->
|
||||
{
|
||||
isHandled.set(true)
|
||||
next(it)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fun timeRequest(): Filter {
|
||||
return Filter { next: HttpHandler ->
|
||||
{ request: Request ->
|
||||
val cleanedUri = request.uri.path.let {
|
||||
if (it.startsWith("/data")) {
|
||||
it
|
||||
} else {
|
||||
it.replaceBefore("/data", "/{token}")
|
||||
}
|
||||
}
|
||||
|
||||
LOGGER.info { "Request for $cleanedUri received from ${request.source?.address}" }
|
||||
|
||||
val start = System.currentTimeMillis()
|
||||
val response = next(request)
|
||||
val latency = System.currentTimeMillis() - start
|
||||
|
||||
LOGGER.info { "Request for $cleanedUri completed (TTFB) in ${latency}ms" }
|
||||
|
||||
response.header("X-Time-Taken", latency.toString())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -22,9 +22,12 @@ package mdnet.base.server
|
|||
import com.goterl.lazycode.lazysodium.LazySodiumJava
|
||||
import com.goterl.lazycode.lazysodium.exceptions.SodiumException
|
||||
import com.goterl.lazycode.lazysodium.interfaces.Box
|
||||
import java.security.MessageDigest
|
||||
import javax.crypto.Cipher
|
||||
import javax.crypto.spec.SecretKeySpec
|
||||
|
||||
@Throws(SodiumException::class)
|
||||
fun LazySodiumJava.cryptoBoxOpenEasyAfterNm(cipherBytes: ByteArray, nonce: ByteArray, sharedKey: ByteArray): String {
|
||||
internal fun LazySodiumJava.cryptoBoxOpenEasyAfterNm(cipherBytes: ByteArray, nonce: ByteArray, sharedKey: ByteArray): String {
|
||||
if (!Box.Checker.checkNonce(nonce.size)) {
|
||||
throw SodiumException("Incorrect nonce length.")
|
||||
}
|
||||
|
@ -40,3 +43,22 @@ fun LazySodiumJava.cryptoBoxOpenEasyAfterNm(cipherBytes: ByteArray, nonce: ByteA
|
|||
}
|
||||
return str(message)
|
||||
}
|
||||
|
||||
internal fun getRc4(key: ByteArray): Cipher {
|
||||
val rc4 = Cipher.getInstance("RC4")
|
||||
rc4.init(Cipher.ENCRYPT_MODE, SecretKeySpec(key, "RC4"))
|
||||
return rc4
|
||||
}
|
||||
|
||||
internal fun md5Bytes(stringToHash: String): ByteArray {
|
||||
val digest = MessageDigest.getInstance("MD5")
|
||||
return digest.digest(stringToHash.toByteArray())
|
||||
}
|
||||
|
||||
internal fun printHexString(bytes: ByteArray): String {
|
||||
val sb = StringBuilder()
|
||||
for (b in bytes) {
|
||||
sb.append(String.format("%02x", b))
|
||||
}
|
||||
return sb.toString()
|
||||
}
|
||||
|
|
|
@ -18,6 +18,7 @@ along with this MangaDex@Home. If not, see <http://www.gnu.org/licenses/>.
|
|||
*/
|
||||
package mdnet.base.settings
|
||||
|
||||
import com.fasterxml.jackson.annotation.JsonUnwrapped
|
||||
import com.fasterxml.jackson.databind.PropertyNamingStrategy
|
||||
import com.fasterxml.jackson.databind.annotation.JsonNaming
|
||||
import dev.afanasev.sekret.Secret
|
||||
|
@ -25,6 +26,14 @@ import dev.afanasev.sekret.Secret
|
|||
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
|
||||
data class ClientSettings(
|
||||
val maxCacheSizeInMebibytes: Long = 20480,
|
||||
@JsonUnwrapped
|
||||
val serverSettings: ServerSettings = ServerSettings(),
|
||||
val webSettings: WebSettings? = null,
|
||||
val devSettings: DevSettings = DevSettings(isDev = false)
|
||||
)
|
||||
|
||||
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
|
||||
data class ServerSettings(
|
||||
val maxMebibytesPerHour: Long = 0,
|
||||
val maxKilobitsPerSecond: Long = 0,
|
||||
val clientHostname: String = "0.0.0.0",
|
||||
|
@ -32,9 +41,7 @@ data class ClientSettings(
|
|||
val clientExternalPort: Int = 0,
|
||||
@field:Secret val clientSecret: String = "PASTE-YOUR-SECRET-HERE",
|
||||
val threads: Int = 4,
|
||||
val gracefulShutdownWaitSeconds: Int = 60,
|
||||
val webSettings: WebSettings? = null,
|
||||
val devSettings: DevSettings? = null
|
||||
val gracefulShutdownWaitSeconds: Int = 60
|
||||
)
|
||||
|
||||
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
|
||||
|
|
|
@ -23,7 +23,7 @@ import com.fasterxml.jackson.databind.annotation.JsonNaming
|
|||
import dev.afanasev.sekret.Secret
|
||||
|
||||
@JsonNaming(PropertyNamingStrategy.SnakeCaseStrategy::class)
|
||||
data class ServerSettings(
|
||||
data class RemoteSettings(
|
||||
val imageServer: String,
|
||||
val latestBuild: Int,
|
||||
val url: String,
|
||||
|
@ -37,7 +37,7 @@ data class ServerSettings(
|
|||
if (this === other) return true
|
||||
if (javaClass != other?.javaClass) return false
|
||||
|
||||
other as ServerSettings
|
||||
other as RemoteSettings
|
||||
|
||||
if (imageServer != other.imageServer) return false
|
||||
if (latestBuild != other.latestBuild) return false
|
|
@ -37,5 +37,6 @@
|
|||
<appender-ref ref="ASYNC"/>
|
||||
</root>
|
||||
|
||||
<logger name="Exposed" level="ERROR"/
|
||||
<logger name="io.netty" level="INFO"/>
|
||||
</configuration>
|
||||
|
|
Loading…
Reference in a new issue