Skip to content
This repository was archived by the owner on Jul 28, 2021. It is now read-only.

[Patch v1.0.7] #6

Merged
merged 17 commits into from
May 24, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>pw.mihou</groupId>
<artifactId>RoseDB</artifactId>
<version>1.0.5</version>
<version>1.0.7</version>
<build>
<plugins>
<plugin>
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/pw/mihou/rosedb/RoseDB.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
import pw.mihou.rosedb.io.Scheduler;
import pw.mihou.rosedb.utility.Terminal;
import pw.mihou.rosedb.utility.UpdateChecker;

import java.io.File;
import java.net.URISyntaxException;
import java.nio.file.Files;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -53,6 +53,16 @@ public static void main(String[] args) throws URISyntaxException {
JSONObject config = new JSONObject(FileHandler.read("config.json").join());

if (config.isNull("configVersion") || !config.getString("configVersion").equals(UpdateChecker.CONFIG_VERSION)) {

if(config.isNull("configVersion") || !config.isNull("configVersion") && Double.parseDouble(config.getString("configVersion")) < 1.2){
FileHandler.setDirectory(config.isNull("directory") ?
new File(RoseDB.class.getProtectionDomain().getCodeSource().getLocation().toURI().getPath())
.getParentFile().getPath() + File.separator + "Database" + File.separator : config.getString("directory"));
Terminal.log(Levels.INFO, "Your configuration was detected to be from a version < 1.2 which meant the data files are utilizing an older format.");
Terminal.log(Levels.INFO, "We will perform a short migration to the newer file format real quick... please do not close!");
FileHandler.migrateAll().join();
}

Terminal.log(Levels.INFO, "We have noticed that your config.json is outdated, we are currently going to perform a short configuration update.");
Terminal.log(Levels.INFO, "Don't worry, there isn't anything you need to do on your side!");
FileHandler.writeToFile("config.json", updateConfig(config).toString()).join();
Expand Down Expand Up @@ -100,6 +110,8 @@ public static void main(String[] args) throws URISyntaxException {
// We are moving original setting of root level here, we want errors to be logged for startup.
Terminal.setLoggingLevel(rootLevel(Optional.ofNullable(config.getString("loggingLevel"))
.orElse("INFO")));

FileHandler.setDirectory(directory);
RoseServer.run(port);

} else {
Expand Down
38 changes: 33 additions & 5 deletions src/main/java/pw/mihou/rosedb/connections/RoseServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,40 @@
import java.io.File;
import java.io.IOException;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

public class RoseServer {

private static final Map<String, WsContext> context = new ConcurrentHashMap<>();
private static final Map<String, RoseDatabase> database = new ConcurrentHashMap<>();
private static final Scanner scanner = new Scanner(System.in).useDelimiter("\n");

public static void reply(WsContext context, String response, int kode) {
context.send(new JSONObject().put("response", response).put("kode", kode).toString());
if(context != null) {
context.send(new JSONObject().put("response", response).put("kode", kode).toString());
} else {
Terminal.log(Levels.INFO, new JSONObject().put("response", response).put("kode", kode).toString());
}
}

public static void reply(WsContext context, JSONObject response, String unique, int kode) {
context.send(response.put("kode", kode).put("replyTo", unique).toString());
if(context != null) {
context.send(response.put("kode", kode).put("replyTo", unique).toString());
} else {
Terminal.log(Levels.INFO, response.put("kode", kode).put("replyTo", unique).toString());
}
}

public static void reply(WsContext context, String response, String unique, int kode) {
context.send(new JSONObject().put("response", response).put("kode", kode)
.put("replyTo", unique).toString());
if(context != null) {
context.send(new JSONObject().put("response", response).put("kode", kode)
.put("replyTo", unique).toString());
} else {
Terminal.log(Levels.INFO, new JSONObject().put("response", response).put("kode", kode)
.put("replyTo", unique).toString());
}
}

public static RoseDatabase getDatabase(String db) {
Expand Down Expand Up @@ -117,7 +132,7 @@ public static void run(int port) {
}).start(port);

Runtime.getRuntime().addShutdownHook(new Thread(app::stop));
Runtime.getRuntime().addShutdownHook(new Thread(FileHandler::executeFinalRuntime));
Runtime.getRuntime().addShutdownHook(new Thread(FileHandler::write));
Runtime.getRuntime().addShutdownHook(new Thread(Scheduler::shutdown));

Terminal.log(Levels.DEBUG, "All events and handlers are now ready.");
Expand All @@ -140,6 +155,19 @@ public static void run(int port) {

Terminal.log(Levels.INFO, "RoseDB is now running on port: " + port);
startHeartbeat();

while(scanner.hasNextLine()){
String request = scanner.nextLine();
try {
RoseListenerManager.execute(new JSONObject(request), null);
} catch (JSONException e) {
reply(null, "The request was considered as invalid: " + request, -1);
Terminal.log(Levels.DEBUG, "Received invalid JSON request: " + request + " from terminal.");
} catch (MessageTooLargeException e){
reply(null, "The request was canceled by force: " + e.getMessage(), -1);
Terminal.log(Levels.ERROR, "Received message that was too large from terminal.");
}
}
}

}
157 changes: 111 additions & 46 deletions src/main/java/pw/mihou/rosedb/io/FileHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,80 +9,111 @@
import pw.mihou.rosedb.manager.RoseDatabase;
import pw.mihou.rosedb.utility.Terminal;

import java.io.BufferedReader;
import java.io.File;
import java.io.FilenameFilter;
import java.io.IOException;
import java.nio.charset.Charset;
import java.io.*;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Arrays;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.zip.GZIPInputStream;
import java.util.zip.GZIPOutputStream;

public class FileHandler {

private static final ConcurrentLinkedQueue<RoseRequest> queue = new ConcurrentLinkedQueue<>();
private static final AtomicBoolean threadFull = new AtomicBoolean(false);
private static final FilenameFilter filter = (dir, name) -> name.endsWith(".rose");
private static String directory;

public static void setDirectory(String dir) {
directory = dir;
}

public static CompletableFuture<Void> writeToFile(String path, String value) {
return write(path, value, false);
}

public static CompletableFuture<Void> writeGzip(String path, String value) {
return write(path, value, true);
}

public static CompletableFuture<String> read(String path) {
return read(path, false);
}

private static CompletableFuture<String> read(String path, boolean gzip) {
return CompletableFuture.supplyAsync(() -> {
try (BufferedReader reader = Files.newBufferedReader(Paths.get(path))) {
return reader.readLine();
} catch (final IOException e) {
try (BufferedReader r = (!gzip ? Files.newBufferedReader(Paths.get(path)) :
new BufferedReader(new InputStreamReader(new GZIPInputStream(new FileInputStream(new File(path)), 65536))))) {
return r.readLine();
} catch (IOException e) {
Terminal.log(Levels.ERROR, e.getMessage());
}
return "";
}, Scheduler.getExecutorService());
}

public static void write(String database, String collection, String identifier, String json) {
// This way, it actually queues the requests.
queue.add(new RoseRequest(database, collection, identifier, json));
public static CompletableFuture<String> readGzip(String path) {
return read(path, true);
}

// We are using == 1 since we are already adding one to the queue.
// The reason why we are doing the stream as well is because we don't want to delay the other requests.
if (queue.stream().filter(roseRequest -> (roseRequest.identifier.equalsIgnoreCase(identifier) && roseRequest.collection.equalsIgnoreCase(collection) && roseRequest.database.equalsIgnoreCase(database))).count() == 1) {
write();
}
public static CompletableFuture<Void> compress(String path) {
return read(path, false).thenAccept(s -> writeGzip(path, s));
}

public static boolean delete(String database, String collection, String identifier) {
queue.stream().filter(roseRequest -> filter(roseRequest, database, collection, identifier)).forEachOrdered(queue::remove);
return delete(format(database, collection, identifier));
}

private static boolean delete(String path) {
try {
queue.stream().filter(roseRequest -> roseRequest.identifier.equalsIgnoreCase(identifier)).forEachOrdered(queue::remove);
return Files.deleteIfExists(Paths.get(new StringBuilder(RoseDB.directory).append(File.separator).append(database).append(File.separator).append(collection)
.append(File.separator).append(identifier).append(".rose").toString()));
return Files.deleteIfExists(Paths.get(path));
} catch (IOException exception) {
Terminal.log(Levels.ERROR, "An exception occurred while trying to delete: " + exception.getMessage());
return false;
}
return false;
}

public static void executeFinalRuntime(){
if(!queue.isEmpty()){
Terminal.log(Levels.DEBUG, "Executing final thread to finish remaining " + queue.size() + " write requests.");
public static String format(String database, String collection, String identifier) {
return new StringBuilder(directory).append(File.separator).append(database).append(File.separator).append(collection)
.append(File.separator).append(identifier).append(".rose").toString();
}

public static String format(String database, String collection) {
return new StringBuilder(directory).append(File.separator).append(database).append(File.separator).append(collection).toString();
}

public static String format(String database) {
return new StringBuilder(directory).append(File.separator).append(database).toString();
}

private static boolean filter(RoseRequest roseRequest, String database, String collection, String identifier) {
return (roseRequest.identifier.equalsIgnoreCase(identifier)
&& roseRequest.collection.equalsIgnoreCase(collection)
&& roseRequest.database.equalsIgnoreCase(database));
}

public static void write(String database, String collection, String identifier, String json) {
queue.add(new RoseRequest(database, collection, identifier, json));

if (queue.stream().filter(roseRequest -> filter(roseRequest, database, collection, identifier)).count() == 1) {
write();
}
}

private static void write() {
if(!threadFull.get()) {
public static void write() {
if (!threadFull.get()) {
Scheduler.getExecutorService().submit(() -> {
if (!queue.isEmpty()) {
threadFull.set(true);
// We will be polling here instead.

RoseRequest request = queue.poll();
String location = new StringBuilder(RoseDB.directory).append(File.separator).append(request.database).append(File.separator).append(request.collection)
.append(File.separator).append(request.identifier).append(".rose").toString();
writeGzip(format(request.database, request.collection, request.identifier), request.json).join();

if (!new File(location).exists()) {
writeToFile(location, request.json).join();
} else {
read(location).thenAccept(s -> writeToFile(location, request.json)).join();
}
threadFull.set(false);
if (!queue.isEmpty()) {
write();
Expand All @@ -92,45 +123,51 @@ private static void write() {
}
}

public static CompletableFuture<Void> writeToFile(String path, String value) {
private static CompletableFuture<Void> write(String path, String value, boolean gzip) {
return CompletableFuture.runAsync(() -> {
try {
Files.writeString(Paths.get(path), value, Charset.defaultCharset());
if (!gzip) {
Files.writeString(Paths.get(path), value, StandardCharsets.UTF_8);
} else {
try (BufferedWriter writer = new BufferedWriter(new OutputStreamWriter(new GZIPOutputStream(new FileOutputStream(new File(path)), 65536)))) {
writer.write(value);
}
}
} catch (IOException e) {
Terminal.log(Levels.ERROR, e.getMessage());
}
}, Scheduler.getExecutorService());
});
}

public static CompletableFuture<RoseCollections> readCollection(String database, String collection) {
return CompletableFuture.supplyAsync(() -> {
String location = new StringBuilder(RoseDB.directory).append(File.separator).append(database).append(File.separator).append(collection).toString();
String location = format(database, collection);

if (!new File(location).exists()) {
boolean mkdirs = new File(location).mkdirs();
if(!mkdirs){
if (!mkdirs) {
Terminal.setLoggingLevel(Levels.ERROR);
Terminal.log(Levels.ERROR, "Failed to create folders for " + location + ", possibly we do not have permission to write.");
return new RoseCollections(collection, database);
}
}


File[] contents = new File(location).listFiles(filter);
RoseCollections collections = new RoseCollections(collection, database);

if (contents != null) {
Arrays.stream(contents).forEach(file -> collections.cache(FilenameUtils.getBaseName(file.getName()), read(file.getPath()).join()));
Arrays.stream(contents).forEach(file -> collections.cache(FilenameUtils.getBaseName(file.getName()), readGzip(file.getPath()).join()));
}
return collections;
});
}

public static CompletableFuture<RoseDatabase> readDatabase(String database) {
return CompletableFuture.supplyAsync(() -> {
String location = new StringBuilder(RoseDB.directory).append(File.separator).append(database).toString();
String location = format(database);
if (!new File(location).exists()) {
boolean mkdirs = new File(location).mkdirs();
if(!mkdirs){
if (!mkdirs) {
Terminal.setLoggingLevel(Levels.ERROR);
Terminal.log(Levels.ERROR, "Failed to create folders for " + location + ", possibly we do not have permission to write.");
return new RoseDatabase(database);
Expand All @@ -151,15 +188,14 @@ public static CompletableFuture<RoseDatabase> readDatabase(String database) {
}

public static Optional<String> readData(String database, String collection, String identifier) {
String location = new StringBuilder(RoseDB.directory).append(File.separator).append(database).append(File.separator).append(collection)
.append(File.separator).append(identifier).append(".rose").toString();
String location = format(database, collection, identifier);
if (!new File(location).exists())
return Optional.empty();

return Optional.of(read(location).join());
return Optional.of(readGzip(location).join());
}

public static CompletableFuture<Void> preloadAll(){
public static CompletableFuture<Void> preloadAll() {
return CompletableFuture.runAsync(() -> {
File[] contents = new File(RoseDB.directory).listFiles();

Expand All @@ -170,4 +206,33 @@ public static CompletableFuture<Void> preloadAll(){
});
}

public static CompletableFuture<Void> migrateAll() {
return CompletableFuture.runAsync(() -> {
File[] contents = new File(directory).listFiles();

if (contents != null) {
Arrays.stream(contents).filter(File::isDirectory).forEachOrdered(file -> {
File[] c = new File(format(FilenameUtils.getBaseName(file.getName()))).listFiles();

if (c != null) {
Arrays.stream(c).filter(File::isDirectory)
.forEachOrdered(d -> migrateCollection(FilenameUtils.getBaseName(file.getName()),
FilenameUtils.getBaseName(d.getName())));
}
});
}
});
}

public static CompletableFuture<Void> migrateCollection(String database, String collection) {
return CompletableFuture.runAsync(() -> {
Terminal.log(Levels.INFO, "Attempting to migrate " + collection + " from " + database + " to newer format.");
File[] contents = new File(format(database, collection)).listFiles(filter);

if (contents != null) {
Arrays.stream(contents).forEach(file -> compress(file.getPath()));
}
});
}

}
4 changes: 2 additions & 2 deletions src/main/java/pw/mihou/rosedb/listeners/UpdateListener.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ public void execute(JSONObject request, WsContext context, String unique) {
JSONArray value = request.getJSONArray("value");

for (int i = 0; i < key.length(); i++) {
object.put(key.getString(i), value.getString(i));
object.put(key.getString(i), value.get(i));
}
} else {
object.put(request.getString("key"), request.getString("value"));
object.put(request.getString("key"), request.get("value"));
}

collections.add(request.getString("identifier"), object.toString());
Expand Down
Loading