Make Statistics immutable
This commit is contained in:
parent
f153a11717
commit
505c917e56
|
@ -2,7 +2,9 @@ package mdnet.base;
|
|||
|
||||
import com.google.gson.Gson;
|
||||
import com.google.gson.GsonBuilder;
|
||||
import com.google.gson.reflect.TypeToken;
|
||||
import mdnet.base.settings.ClientSettings;
|
||||
import mdnet.base.settings.WebSettings;
|
||||
import mdnet.base.web.ApplicationKt;
|
||||
import mdnet.base.web.WebUiKt;
|
||||
import mdnet.cache.DiskLruCache;
|
||||
|
@ -12,13 +14,15 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.*;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class MangaDexClient {
|
||||
private final static Gson GSON = new GsonBuilder().setPrettyPrinting().create();
|
||||
private final static Logger LOGGER = LoggerFactory.getLogger(MangaDexClient.class);
|
||||
|
||||
// This lock protects the Http4kServer from concurrent restart attempts
|
||||
|
@ -27,13 +31,16 @@ public class MangaDexClient {
|
|||
private final ServerHandler serverHandler;
|
||||
private final ClientSettings clientSettings;
|
||||
private final AtomicReference<Statistics> statistics;
|
||||
private ServerSettings serverSettings;
|
||||
|
||||
// if this is null, then the server has shutdown
|
||||
private Http4kServer engine;
|
||||
private ServerSettings serverSettings;
|
||||
private Http4kServer engine; // if this is null, then the server has shutdown
|
||||
private Http4kServer webUi;
|
||||
private DiskLruCache cache;
|
||||
|
||||
// these variables are for runLoop();
|
||||
private int counter = 0;
|
||||
private long lastBytesSent = 0;
|
||||
|
||||
public MangaDexClient(ClientSettings clientSettings) {
|
||||
this.clientSettings = clientSettings;
|
||||
this.serverHandler = new ServerHandler(clientSettings);
|
||||
|
@ -42,14 +49,23 @@ public class MangaDexClient {
|
|||
try {
|
||||
cache = DiskLruCache.open(new File("cache"), 3, 3,
|
||||
clientSettings.getMaxCacheSizeMib() * 1024 * 1024 /* MiB to bytes */);
|
||||
|
||||
DiskLruCache.Snapshot snapshot = cache.get("statistics");
|
||||
if (snapshot != null) {
|
||||
String json = snapshot.getString(0);
|
||||
snapshot.close();
|
||||
statistics.set(GSON.fromJson(json, new TypeToken<ArrayList<Statistics>>() {
|
||||
}.getType()));
|
||||
} else {
|
||||
statistics.set(new Statistics());
|
||||
}
|
||||
lastBytesSent = statistics.get().getBytesSent();
|
||||
} catch (IOException e) {
|
||||
MangaDexClient.dieWithError(e);
|
||||
}
|
||||
}
|
||||
|
||||
// This function also does most of the program initialization.
|
||||
public void runLoop() {
|
||||
statistics.set(new Statistics(0));
|
||||
loginAndStartServer();
|
||||
if (serverSettings.getLatestBuild() > Constants.CLIENT_BUILD) {
|
||||
if (LOGGER.isWarnEnabled()) {
|
||||
|
@ -58,26 +74,19 @@ public class MangaDexClient {
|
|||
}
|
||||
}
|
||||
|
||||
if (clientSettings.getWebSettings() != null) {
|
||||
webUi = WebUiKt.getUiServer(clientSettings.getWebSettings(), statistics);
|
||||
webUi.start();
|
||||
}
|
||||
|
||||
if (LOGGER.isInfoEnabled()) {
|
||||
LOGGER.info("MDNet initialization completed successfully. Starting normal operation.");
|
||||
}
|
||||
|
||||
webUi = WebUiKt.getUiServer(clientSettings.getWebSettings(), statistics);
|
||||
webUi.start();
|
||||
|
||||
// we don't really care about the Atomic part here
|
||||
AtomicInteger counter = new AtomicInteger();
|
||||
// ping keep-alive every 45 seconds
|
||||
executorService.scheduleAtFixedRate(() -> {
|
||||
int num = counter.get();
|
||||
if (num == 80) {
|
||||
counter.set(0);
|
||||
|
||||
// if server is stopped due to egress limits, restart it
|
||||
if (LOGGER.isInfoEnabled()) {
|
||||
LOGGER.info("Hourly update: refreshing statistics");
|
||||
}
|
||||
statistics.set(new Statistics(statistics.get().getSequenceNumber() + 1));
|
||||
if (counter == 80) {
|
||||
counter = 0;
|
||||
lastBytesSent = statistics.get().getBytesSent();
|
||||
|
||||
if (engine == null) {
|
||||
if (LOGGER.isInfoEnabled()) {
|
||||
|
@ -87,7 +96,7 @@ public class MangaDexClient {
|
|||
loginAndStartServer();
|
||||
}
|
||||
} else {
|
||||
counter.set(num + 1);
|
||||
counter++;
|
||||
}
|
||||
|
||||
// if the server is offline then don't try and refresh certs
|
||||
|
@ -95,8 +104,9 @@ public class MangaDexClient {
|
|||
return;
|
||||
}
|
||||
|
||||
if (clientSettings.getMaxBandwidthMibPerHour() != 0 && clientSettings.getMaxBandwidthMibPerHour() * 1024
|
||||
* 1024 /* MiB to bytes */ < statistics.get().getBytesSent().get()) {
|
||||
long currentBytesSent = statistics.get().getBytesSent() - lastBytesSent;
|
||||
if (clientSettings.getMaxBandwidthMibPerHour() != 0
|
||||
&& clientSettings.getMaxBandwidthMibPerHour() * 1024 * 1024 /* MiB to bytes */ < currentBytesSent) {
|
||||
if (LOGGER.isInfoEnabled()) {
|
||||
LOGGER.info("Shutting down server as hourly bandwidth limit reached");
|
||||
}
|
||||
|
@ -170,6 +180,12 @@ public class MangaDexClient {
|
|||
|
||||
logoutAndStopServer();
|
||||
}
|
||||
webUi.close();
|
||||
try {
|
||||
cache.close();
|
||||
} catch (IOException e) {
|
||||
LOGGER.error("Cache failed to close", e);
|
||||
}
|
||||
}
|
||||
|
||||
public static void main(String[] args) {
|
||||
|
@ -184,47 +200,28 @@ public class MangaDexClient {
|
|||
MangaDexClient.dieWithError("Expected one argument: path to config file, or nothing");
|
||||
}
|
||||
|
||||
Gson gson = new GsonBuilder().setPrettyPrinting().create();
|
||||
ClientSettings settings;
|
||||
|
||||
try {
|
||||
settings = gson.fromJson(new FileReader(file), ClientSettings.class);
|
||||
settings = GSON.fromJson(new FileReader(file), ClientSettings.class);
|
||||
} catch (FileNotFoundException ignored) {
|
||||
settings = new ClientSettings();
|
||||
LOGGER.warn("Settings file {} not found, generating file", file);
|
||||
try (FileWriter writer = new FileWriter(file)) {
|
||||
writer.write(gson.toJson(settings));
|
||||
writer.write(GSON.toJson(settings));
|
||||
} catch (IOException e) {
|
||||
MangaDexClient.dieWithError(e);
|
||||
}
|
||||
}
|
||||
|
||||
if (!ClientSettings.isSecretValid(settings.getClientSecret()))
|
||||
MangaDexClient.dieWithError("Config Error: API Secret is invalid, must be 52 alphanumeric characters");
|
||||
|
||||
if (settings.getClientPort() == 0) {
|
||||
MangaDexClient.dieWithError("Config Error: Invalid port number");
|
||||
}
|
||||
|
||||
if (settings.getMaxCacheSizeMib() < 1024) {
|
||||
MangaDexClient.dieWithError("Config Error: Invalid max cache size, must be >= 1024 MiB (1GiB)");
|
||||
}
|
||||
|
||||
if (LOGGER.isInfoEnabled()) {
|
||||
LOGGER.info("Client settings loaded: {}", settings);
|
||||
}
|
||||
|
||||
MangaDexClient client = new MangaDexClient(settings);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(client::shutdown));
|
||||
client.runLoop();
|
||||
validateSettings(settings);
|
||||
|
||||
if (settings.getWebSettings() != null) {
|
||||
// java.io.ByteArrayOutputStream out = new java.io.ByteArrayOutputStream();
|
||||
// System.setOut(new java.io.PrintStream(out));
|
||||
WebSettings webSettings = settings.getWebSettings();
|
||||
|
||||
// TODO: system.out redirect
|
||||
ClientSettings finalSettings = settings;
|
||||
new Thread(() -> {
|
||||
WebConsole webConsole = new WebConsole(finalSettings.getWebSettings().getUiWebsocketPort()) {
|
||||
WebConsole webConsole = new WebConsole(webSettings.getUiWebsocketPort()) {
|
||||
@Override
|
||||
protected void parseMessage(String message) {
|
||||
System.out.println(message);
|
||||
|
@ -235,6 +232,14 @@ public class MangaDexClient {
|
|||
// TODO: webConsole.sendMessage(t,m) whenever system.out is written to
|
||||
}).start();
|
||||
}
|
||||
|
||||
if (LOGGER.isInfoEnabled()) {
|
||||
LOGGER.info("Client settings loaded: {}", settings);
|
||||
}
|
||||
|
||||
MangaDexClient client = new MangaDexClient(settings);
|
||||
Runtime.getRuntime().addShutdownHook(new Thread(client::shutdown));
|
||||
client.runLoop();
|
||||
}
|
||||
|
||||
public static void dieWithError(Throwable e) {
|
||||
|
@ -246,8 +251,48 @@ public class MangaDexClient {
|
|||
|
||||
public static void dieWithError(String error) {
|
||||
if (LOGGER.isErrorEnabled()) {
|
||||
LOGGER.error("Critical Error: " + error);
|
||||
LOGGER.error("Critical Error: {}", error);
|
||||
}
|
||||
System.exit(1);
|
||||
}
|
||||
|
||||
public static void validateSettings(ClientSettings settings) {
|
||||
if (!isSecretValid(settings.getClientSecret()))
|
||||
MangaDexClient.dieWithError("Config Error: API Secret is invalid, must be 52 alphanumeric characters");
|
||||
|
||||
if (settings.getClientPort() == 0) {
|
||||
MangaDexClient.dieWithError("Config Error: Invalid port number");
|
||||
}
|
||||
|
||||
if (settings.getMaxCacheSizeMib() < 1024) {
|
||||
MangaDexClient.dieWithError("Config Error: Invalid max cache size, must be >= 1024 MiB (1GiB)");
|
||||
}
|
||||
|
||||
if (settings.getThreads() < 4) {
|
||||
MangaDexClient.dieWithError("Config Error: Invalid number of threads, must be >= 8");
|
||||
}
|
||||
|
||||
if (settings.getMaxBandwidthMibPerHour() < 0) {
|
||||
MangaDexClient.dieWithError("Config Error: Max bandwidth must be >= 0");
|
||||
}
|
||||
|
||||
if (settings.getMaxBurstRateKibPerSecond() < 0) {
|
||||
MangaDexClient.dieWithError("Config Error: Max burst rate must be >= 0");
|
||||
}
|
||||
|
||||
if (settings.getWebSettings() != null) {
|
||||
if (settings.getWebSettings().getUiPort() == 0) {
|
||||
MangaDexClient.dieWithError("Config Error: Invalid UI port number");
|
||||
}
|
||||
|
||||
if (settings.getWebSettings().getUiWebsocketPort() == 0) {
|
||||
MangaDexClient.dieWithError("Config Error: Invalid websocket port number");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isSecretValid(String clientSecret) {
|
||||
final int CLIENT_KEY_LENGTH = 52;
|
||||
return Pattern.matches("^[a-zA-Z0-9]{" + CLIENT_KEY_LENGTH + "}$", clientSecret);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,58 +0,0 @@
|
|||
package mdnet.base;
|
||||
|
||||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
public class Statistics {
|
||||
@SerializedName("requests_served")
|
||||
private final AtomicInteger requestsServed;
|
||||
@SerializedName("cache_hits")
|
||||
private final AtomicInteger cacheHits;
|
||||
@SerializedName("cache_misses")
|
||||
private final AtomicInteger cacheMisses;
|
||||
@SerializedName("bytes_sent")
|
||||
private final AtomicLong bytesSent;
|
||||
@SerializedName("sequence_number")
|
||||
private final int sequenceNumber;
|
||||
|
||||
public Statistics(int sequenceNumber) {
|
||||
requestsServed = new AtomicInteger();
|
||||
cacheHits = new AtomicInteger();
|
||||
cacheMisses = new AtomicInteger();
|
||||
bytesSent = new AtomicLong();
|
||||
this.sequenceNumber = sequenceNumber;
|
||||
}
|
||||
|
||||
public AtomicInteger getRequestsServed() {
|
||||
return requestsServed;
|
||||
}
|
||||
|
||||
public AtomicInteger getCacheHits() {
|
||||
return cacheHits;
|
||||
}
|
||||
|
||||
public AtomicInteger getCacheMisses() {
|
||||
return cacheMisses;
|
||||
}
|
||||
|
||||
public AtomicLong getBytesSent() {
|
||||
return bytesSent;
|
||||
}
|
||||
|
||||
public int getSequenceNumber() {
|
||||
return sequenceNumber;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "Statistics{" +
|
||||
"requestsServed=" + requestsServed +
|
||||
", cacheHits=" + cacheHits +
|
||||
", cacheMisses=" + cacheMisses +
|
||||
", bytesSent=" + bytesSent +
|
||||
", sequenceNumber=" + sequenceNumber +
|
||||
'}';
|
||||
}
|
||||
}
|
|
@ -3,7 +3,6 @@ package mdnet.base.settings;
|
|||
import com.google.gson.annotations.SerializedName;
|
||||
|
||||
import java.util.Objects;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public final class ClientSettings {
|
||||
@SerializedName("max_cache_size_mib")
|
||||
|
@ -75,9 +74,4 @@ public final class ClientSettings {
|
|||
+ maxBandwidthMibPerHour + ", maxBurstRateKibPerSecond=" + maxBurstRateKibPerSecond + ", clientPort="
|
||||
+ clientPort + ", clientSecret='" + "<hidden>" + '\'' + ", threads=" + getThreads() + '}';
|
||||
}
|
||||
|
||||
public static boolean isSecretValid(String clientSecret) {
|
||||
final int CLIENT_KEY_LENGTH = 52;
|
||||
return Pattern.matches("^[a-zA-Z0-9]{" + CLIENT_KEY_LENGTH + "}$", clientSecret);
|
||||
}
|
||||
}
|
||||
|
|
2
src/main/java/mdnet/cache/DiskLruCache.java
vendored
2
src/main/java/mdnet/cache/DiskLruCache.java
vendored
|
@ -411,7 +411,7 @@ public final class DiskLruCache implements Closeable {
|
|||
return getImpl(key);
|
||||
}
|
||||
|
||||
public synchronized Snapshot getImpl(String key) throws IOException {
|
||||
private synchronized Snapshot getImpl(String key) throws IOException {
|
||||
checkNotClosed();
|
||||
Entry entry = lruEntries.get(key);
|
||||
if (entry == null) {
|
||||
|
|
|
@ -37,26 +37,26 @@ import javax.net.ssl.SSLException
|
|||
|
||||
private val LOGGER = LoggerFactory.getLogger("Application")
|
||||
|
||||
class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val stats: AtomicReference<Statistics>) : ServerConfig {
|
||||
private val threadsToAllocate = clientSettings.getThreads()
|
||||
|
||||
class Netty(private val tls: ServerSettings.TlsCert, private val clientSettings: ClientSettings, private val statistics: AtomicReference<Statistics>) : ServerConfig {
|
||||
override fun toServer(httpHandler: HttpHandler): Http4kServer = object : Http4kServer {
|
||||
private val masterGroup = NioEventLoopGroup(threadsToAllocate)
|
||||
private val workerGroup = NioEventLoopGroup(threadsToAllocate)
|
||||
private val masterGroup = NioEventLoopGroup(clientSettings.threads)
|
||||
private val workerGroup = NioEventLoopGroup(clientSettings.threads)
|
||||
private lateinit var closeFuture: ChannelFuture
|
||||
private lateinit var address: InetSocketAddress
|
||||
|
||||
private val burstLimiter = object : GlobalTrafficShapingHandler(
|
||||
workerGroup, 1024 * clientSettings.maxBurstRateKibPerSecond, 0, 50) {
|
||||
override fun doAccounting(counter: TrafficCounter) {
|
||||
stats.get().bytesSent.getAndAdd(counter.cumulativeWrittenBytes())
|
||||
statistics.getAndUpdate {
|
||||
it.copy(bytesSent = it.bytesSent + counter.cumulativeWrittenBytes())
|
||||
}
|
||||
counter.resetCumulativeTime()
|
||||
}
|
||||
}
|
||||
|
||||
override fun start(): Http4kServer = apply {
|
||||
if (LOGGER.isInfoEnabled) {
|
||||
LOGGER.info("Starting webserver with {} threads", threadsToAllocate)
|
||||
LOGGER.info("Starting webserver with {} threads", clientSettings.threads)
|
||||
}
|
||||
|
||||
val (mainCert, chainCert) = getX509Certs(tls.certificate)
|
||||
|
|
11
src/main/kotlin/mdnet/base/Statistics.kt
Normal file
11
src/main/kotlin/mdnet/base/Statistics.kt
Normal file
|
@ -0,0 +1,11 @@
|
|||
package mdnet.base
|
||||
|
||||
import com.google.gson.annotations.SerializedName
|
||||
|
||||
data class Statistics(
|
||||
@field:SerializedName("requests_served") val requestsServed: Int = 0,
|
||||
@field:SerializedName("cache_hits") val cacheHits: Int = 0,
|
||||
@field:SerializedName("cache_misses") val cacheMisses: Int = 0,
|
||||
@field:SerializedName("browser_cached") val browserCached: Int = 0,
|
||||
@field:SerializedName("bytes_sent") val bytesSent: Long = 0
|
||||
)
|
|
@ -31,8 +31,6 @@ import java.io.BufferedInputStream
|
|||
import java.io.BufferedOutputStream
|
||||
import java.io.InputStream
|
||||
import java.security.MessageDigest
|
||||
import java.time.format.DateTimeFormatter
|
||||
import java.util.*
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
import javax.crypto.Cipher
|
||||
|
@ -59,7 +57,6 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
|
|||
.build())
|
||||
.setMaxConnTotal(THREADS_TO_ALLOCATE)
|
||||
.setMaxConnPerRoute(THREADS_TO_ALLOCATE)
|
||||
// Have it at the maximum open sockets a user can have in most modern OSes. No reason to limit this, just limit it at the Netty side.
|
||||
.build())
|
||||
|
||||
val app = { dataSaver: Boolean ->
|
||||
|
@ -83,8 +80,9 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
|
|||
md5Bytes("$chapterHash.$fileName")
|
||||
}
|
||||
val cacheId = printHexString(rc4Bytes)
|
||||
|
||||
statistics.get().requestsServed.incrementAndGet()
|
||||
statistics.getAndUpdate {
|
||||
it.copy(requestsServed = it.requestsServed + 1)
|
||||
}
|
||||
|
||||
// Netty doesn't do Content-Length or Content-Type, so we have the pleasure of doing that ourselves
|
||||
fun respondWithImage(input: InputStream, length: String?, type: String, lastModified: String?): Response =
|
||||
|
@ -113,10 +111,12 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
|
|||
|
||||
val snapshot = cache.get(cacheId)
|
||||
if (snapshot != null) {
|
||||
statistics.get().cacheHits.incrementAndGet()
|
||||
|
||||
// our files never change, so it's safe to use the browser cache
|
||||
if (request.header("If-Modified-Since") != null) {
|
||||
statistics.getAndUpdate {
|
||||
it.copy(browserCached = it.browserCached + 1)
|
||||
}
|
||||
|
||||
if (LOGGER.isInfoEnabled) {
|
||||
LOGGER.info("Request for $sanitizedUri cached by browser")
|
||||
}
|
||||
|
@ -127,6 +127,10 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
|
|||
Response(Status.NOT_MODIFIED)
|
||||
.header("Last-Modified", lastModified)
|
||||
} else {
|
||||
statistics.getAndUpdate {
|
||||
it.copy(cacheHits = it.cacheHits + 1)
|
||||
}
|
||||
|
||||
if (LOGGER.isInfoEnabled) {
|
||||
LOGGER.info("Request for $sanitizedUri hit cache")
|
||||
}
|
||||
|
@ -137,7 +141,10 @@ fun getServer(cache: DiskLruCache, serverSettings: ServerSettings, clientSetting
|
|||
)
|
||||
}
|
||||
} else {
|
||||
statistics.get().cacheMisses.incrementAndGet()
|
||||
statistics.getAndUpdate {
|
||||
it.copy(cacheMisses = it.cacheMisses + 1)
|
||||
}
|
||||
|
||||
if (LOGGER.isInfoEnabled) {
|
||||
LOGGER.info("Request for $sanitizedUri missed cache")
|
||||
}
|
||||
|
@ -225,9 +232,6 @@ private fun getRc4(key: ByteArray): Cipher {
|
|||
return rc4
|
||||
}
|
||||
|
||||
private val HTTP_TIME_FORMATTER = DateTimeFormatter.ofPattern("EEE, dd MMM yyyy HH:mm:ss O", Locale.ENGLISH)
|
||||
|
||||
|
||||
private fun md5Bytes(stringToHash: String): ByteArray {
|
||||
val digest = MessageDigest.getInstance("MD5")
|
||||
return digest.digest(stringToHash.toByteArray())
|
||||
|
|
Loading…
Reference in a new issue