From 27905b63bd471a710560d4277199228e2a0ebfb3 Mon Sep 17 00:00:00 2001 From: Mikko Kortelainen Date: Mon, 11 Mar 2024 10:37:12 +0200 Subject: [PATCH] Readme refactor (#97) * update readme for major version update --- README.adoc | 218 ++++++++++++- .../teragrep/rlp_03/delegate/RelpEvent.java | 2 +- .../rlp_03/delegate/RelpEventClose.java | 2 +- .../rlp_03/delegate/RelpEventOpen.java | 2 +- .../rlp_03/readme/ReadmeDeferredTest.java | 292 ++++++++++++++++++ .../teragrep/rlp_03/readme/ReadmeTest.java | 210 +++++++++++++ 6 files changed, 708 insertions(+), 18 deletions(-) create mode 100644 src/test/java/com/teragrep/rlp_03/readme/ReadmeDeferredTest.java create mode 100644 src/test/java/com/teragrep/rlp_03/readme/ReadmeTest.java diff --git a/README.adoc b/README.adoc index 2571c42f..eccb9aa8 100644 --- a/README.adoc +++ b/README.adoc @@ -8,9 +8,11 @@ AGPLv3 with link:https://github.com/teragrep/rlp_03/blob/master/LICENSE#L665-L67 == Features Current -- RELP Server +- Fast (~100 000 transactions in a second per thread) +- Secure (TLS) -== Setting dependencies +== Usage +=== Setting dependencies [source, xml] ---- @@ -22,40 +24,226 @@ Current com.teragrep rlp_03 - 1.0.0 + ${see_latest_at_github} ---- -== Example +NOTE: See https://github.com/teragrep/rlp_01[rlp_01] for relp client + +NOTE: See https://github.com/teragrep/rlo_06[rlo_06] for syslog decoding + +=== Examples + +Dependencies for examples + +[source, xml] +---- + + + + + + com.teragrep + rlp_03 + ${see_latest_at_github} + + + + com.teragrep + rlp_01 + ${see_latest_at_github} + + + +---- + +Server with shared handler for all connections [source, java] ---- +package com.teragrep.rlp_03.readme; + +import com.teragrep.rlp_01.RelpBatch; +import com.teragrep.rlp_01.RelpConnection; +import com.teragrep.rlp_03.FrameContext; import com.teragrep.rlp_03.Server; -import com.teragrep.rlp_03.SyslogFrameProcessor; +import com.teragrep.rlp_03.ServerFactory; +import com.teragrep.rlp_03.config.Config; +import com.teragrep.rlp_03.delegate.*; +import org.junit.jupiter.api.Test; import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; import java.util.function.Consumer; +import java.util.function.Supplier; -public class Main { - public static void main(String[] args) throws IOException, InterruptedException { - final Consumer cbFunction; +/** + * For use cases in the README.adoc + */ +public class ReadmeTest { + @Test + public void testServerSetup() { + int listenPort = 10601; + int threads = 1; // processing threads shared across the connections + Config config = new Config(listenPort, threads); - cbFunction = (message) -> { - System.out.println(new String(message)); + /* + * System.out.println is used to print the frame payload + */ + Consumer syslogConsumer = new Consumer() { + // NOTE: synchronized because frameDelegateSupplier returns this instance for all the parallel connections + @Override + public synchronized void accept(FrameContext frameContext) { + System.out.println(frameContext.relpFrame().payload().toString()); + } }; - int port = 1601; - Server server = new Server(port, new SyslogFrameProcessor(cbFunction)); - server.start(); - while (true) { - Thread.sleep(1000L); + /* + * DefaultFrameDelegate accepts Consumer for processing syslog frames + */ + DefaultFrameDelegate frameDelegate = new DefaultFrameDelegate(syslogConsumer); + + /* + * Same instance of the frameDelegate is shared with every connection + */ + Supplier frameDelegateSupplier = new Supplier() { + @Override + public FrameDelegate get() { + System.out.println("Providing frameDelegate for a connection"); + return frameDelegate; + } + }; + + /* + * ServerFactory is used to create server instances + */ + ServerFactory serverFactory = new ServerFactory(config, frameDelegateSupplier); + + Server server; + try { + server = serverFactory.create(); + } + catch (IOException ioException) { + throw new UncheckedIOException(ioException); + } + + /* + * One may use server.run(); or create the server into a new thread + */ + Thread serverThread = new Thread(server); + + /* + * Run the server + */ + serverThread.start(); + + /* + * Wait for startup, server is available for connections once it finished setup + */ + try { + server.startup.waitForCompletion(); + System.out.println("server started at port <" + listenPort + ">"); + } + catch (InterruptedException interruptedException) { + throw new RuntimeException(interruptedException); + } + + /* + * Send Hello, World! via rlp_01 + */ + new ExampleRelpClient(listenPort).send("Hello, World!"); + + /* + * Stop server + */ + server.stop(); + + /* + * Wait for stop to complete + */ + try { + serverThread.join(); + } + catch (InterruptedException interruptedException) { + throw new RuntimeException(interruptedException); + } + System.out.println("server stopped at port <" + listenPort + ">"); + + /* + * Close the frameDelegate + */ + try { + frameDelegate.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + /** + * ExampleRelpClient using rlp_01 for demonstration + */ + private class ExampleRelpClient { + private final int port; + ExampleRelpClient(int port) { + this.port = port; + } + + public void send(String record) { + RelpConnection relpConnection = new RelpConnection(); + try { + relpConnection.connect("localhost", port); + } + catch (IOException | TimeoutException exception) { + throw new RuntimeException(exception); + } + + RelpBatch relpBatch = new RelpBatch(); + relpBatch.insert(record.getBytes(StandardCharsets.UTF_8)); + + while (!relpBatch.verifyTransactionAll()) { + relpBatch.retryAllFailed(); + try { + relpConnection.commit(relpBatch); + } + catch (IOException | TimeoutException exception) { + throw new RuntimeException(exception); + } + } + try { + relpConnection.disconnect(); + } + catch (IOException | TimeoutException exception) { + throw new RuntimeException(exception); + } + finally { + relpConnection.tearDown(); + } } } } ---- +If a separate handler is required for each connection which doesn’t need to be a thread-safe, create a new FrameDelegate in the Supplier + +[source, java] +---- + Supplier frameDelegateSupplier = () -> { + System.out.println("Providing frameDelegate for a connection"); + return new DefaultFrameDelegate(frameContext -> System.out.println(frameContext.relpFrame().payload().toString())); + }; +---- + +If a deferred handler is required for command processing, pass custom RelpEvent implementation to DefaultFrameDelegate via the Map constructor. See ReadmeDeferredTest.java for an example. + == Contributing // Change the repository name in the issues link to match with your project's name diff --git a/src/main/java/com/teragrep/rlp_03/delegate/RelpEvent.java b/src/main/java/com/teragrep/rlp_03/delegate/RelpEvent.java index e3fb9a0d..b875c3e0 100644 --- a/src/main/java/com/teragrep/rlp_03/delegate/RelpEvent.java +++ b/src/main/java/com/teragrep/rlp_03/delegate/RelpEvent.java @@ -52,7 +52,7 @@ import java.nio.charset.StandardCharsets; import java.util.function.Consumer; -abstract class RelpEvent implements Consumer, AutoCloseable { +public abstract class RelpEvent implements Consumer, AutoCloseable { protected RelpFrameTX createResponse( RelpFrame rxFrame, String command, diff --git a/src/main/java/com/teragrep/rlp_03/delegate/RelpEventClose.java b/src/main/java/com/teragrep/rlp_03/delegate/RelpEventClose.java index 253e3f0e..b3f76676 100644 --- a/src/main/java/com/teragrep/rlp_03/delegate/RelpEventClose.java +++ b/src/main/java/com/teragrep/rlp_03/delegate/RelpEventClose.java @@ -54,7 +54,7 @@ import java.util.ArrayList; import java.util.List; -class RelpEventClose extends RelpEvent { +public class RelpEventClose extends RelpEvent { private static final Logger LOGGER = LoggerFactory.getLogger(RelpEventClose.class); diff --git a/src/main/java/com/teragrep/rlp_03/delegate/RelpEventOpen.java b/src/main/java/com/teragrep/rlp_03/delegate/RelpEventOpen.java index 0c613746..da28fed2 100644 --- a/src/main/java/com/teragrep/rlp_03/delegate/RelpEventOpen.java +++ b/src/main/java/com/teragrep/rlp_03/delegate/RelpEventOpen.java @@ -52,7 +52,7 @@ import java.util.ArrayList; import java.util.List; -class RelpEventOpen extends RelpEvent { +public class RelpEventOpen extends RelpEvent { private static final String responseData = "200 OK\nrelp_version=0\n" + "relp_software=RLP-01,1.0.1,https://teragrep.com\n" diff --git a/src/test/java/com/teragrep/rlp_03/readme/ReadmeDeferredTest.java b/src/test/java/com/teragrep/rlp_03/readme/ReadmeDeferredTest.java new file mode 100644 index 00000000..6226d045 --- /dev/null +++ b/src/test/java/com/teragrep/rlp_03/readme/ReadmeDeferredTest.java @@ -0,0 +1,292 @@ +/* + * Java Reliable Event Logging Protocol Library Server Implementation RLP-03 + * Copyright (C) 2021,2024 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ +package com.teragrep.rlp_03.readme; + +import com.teragrep.rlp_01.RelpBatch; +import com.teragrep.rlp_01.RelpCommand; +import com.teragrep.rlp_01.RelpConnection; +import com.teragrep.rlp_01.RelpFrameTX; +import com.teragrep.rlp_03.FrameContext; +import com.teragrep.rlp_03.Server; +import com.teragrep.rlp_03.ServerFactory; +import com.teragrep.rlp_03.config.Config; +import com.teragrep.rlp_03.context.frame.RelpFrame; +import com.teragrep.rlp_03.delegate.*; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; + +/** + * For use cases in the README.adoc + */ +public class ReadmeDeferredTest { + @Test + public void testDeferredFrameDelegate() { + int listenPort = 10602; + int threads = 1; // processing threads shared across the connections + Config config = new Config(listenPort, threads); + + /* + * DefaultFrameDelegate accepts Map for processing of the commands + */ + + Map relpCommandConsumerMap = new HashMap<>(); + /* + * Add default commands, open and close, they are mandatory + */ + relpCommandConsumerMap.put(RelpCommand.OPEN, new RelpEventOpen()); + relpCommandConsumerMap.put(RelpCommand.CLOSE, new RelpEventClose()); + + /* + * Queue for deferring the processing of the frames + */ + BlockingQueue frameContexts = new ArrayBlockingQueue<>(1024); + RelpEvent syslogRelpEvent = new RelpEvent() { + @Override + public void accept(FrameContext frameContext) { + frameContexts.add(frameContext); + } + + @Override + public void close() { + frameContexts.clear(); + } + }; + + relpCommandConsumerMap.put(RelpCommand.SYSLOG, syslogRelpEvent); + + /* + * Register the commands to the DefaultFrameDelegate + */ + FrameDelegate frameDelegate = new DefaultFrameDelegate(relpCommandConsumerMap); + + /* + * Same instance of the frameDelegate is shared with every connection, BlockingQueues are thread-safe + */ + Supplier frameDelegateSupplier = () -> { + System.out.println("Providing frameDelegate for a connection"); + return frameDelegate; + }; + + /* + * ServerFactory is used to create server instances + */ + ServerFactory serverFactory = new ServerFactory(config, frameDelegateSupplier); + + Server server; + try { + server = serverFactory.create(); + } + catch (IOException ioException) { + throw new UncheckedIOException(ioException); + } + + /* + * One may use server.run(); or create the server into a new thread + */ + Thread serverThread = new Thread(server); + + /* + * Run the server + */ + serverThread.start(); + + /* + * Wait for startup, server is available for connections once it finished setup + */ + try { + server.startup.waitForCompletion(); + System.out.println("server started at port <" + listenPort + ">"); + } + catch (InterruptedException interruptedException) { + throw new RuntimeException(interruptedException); + } + + /* + * Start deferred processing, otherwise our client will wait forever for a response + */ + DeferredSyslog deferredSyslog = new DeferredSyslog(frameContexts); + Thread deferredProcessingThread = new Thread(deferredSyslog); + deferredProcessingThread.start(); + + /* + * Send Hello, World! via rlp_01 + */ + new ExampleRelpClient(listenPort).send("Hello, Deferred World!"); + + /* + * Stop server + */ + server.stop(); + + /* + * Wait for stop to complete + */ + try { + serverThread.join(); + } + catch (InterruptedException interruptedException) { + throw new RuntimeException(interruptedException); + } + System.out.println("server stopped at port <" + listenPort + ">"); + + + /* + * Close the frameDelegate + */ + try { + frameDelegate.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + + /* + * Stop the deferred processing thread + */ + deferredSyslog.run.set(false); + try { + deferredProcessingThread.join(); + } + catch (InterruptedException interruptedException) { + throw new RuntimeException(interruptedException); + } + } + + + /** + * ExampleRelpClient using rlp_01 for demonstration + */ + private class ExampleRelpClient { + private final int port; + ExampleRelpClient(int port) { + this.port = port; + } + + public void send(String record) { + RelpConnection relpConnection = new RelpConnection(); + try { + relpConnection.connect("localhost", port); + } + catch (IOException | TimeoutException exception) { + throw new RuntimeException(exception); + } + + RelpBatch relpBatch = new RelpBatch(); + relpBatch.insert(record.getBytes(StandardCharsets.UTF_8)); + + while (!relpBatch.verifyTransactionAll()) { + relpBatch.retryAllFailed(); + try { + relpConnection.commit(relpBatch); + } + catch (IOException | TimeoutException exception) { + throw new RuntimeException(exception); + } + } + try { + relpConnection.disconnect(); + } + catch (IOException | TimeoutException exception) { + throw new RuntimeException(exception); + } + finally { + relpConnection.tearDown(); + } + } + } + + private class DeferredSyslog implements Runnable { + + private final BlockingQueue frameContexts; + + public final AtomicBoolean run; + DeferredSyslog(BlockingQueue frameContexts) { + this.frameContexts = frameContexts; + + this.run = new AtomicBoolean(true); + } + + @Override + public void run() { + while (run.get()) { + try { + // this will read at least one + FrameContext frameContext = frameContexts.poll(1, TimeUnit.SECONDS); + + if (frameContext == null) { + // no frame yet + continue; + } + + // try-with-resources so frame is closed and freed, + try (RelpFrame relpFrame = frameContext.relpFrame()) { + System.out.println(this.getClass().getSimpleName() + " payload <[" + relpFrame.payload().toString() + "]>"); + + // create a response for the frame + RelpFrameTX frameResponse = new RelpFrameTX("rsp", "200 OK".getBytes(StandardCharsets.UTF_8)); + + // set transaction number + frameResponse.setTransactionNumber(relpFrame.txn().toInt()); + + // WARNING: failing to respond causes transaction aware clients to wait + frameContext.connectionContext().relpWrite().accept(Collections.singletonList(frameResponse)); + } + } catch (Exception interruptedException) { + // ignored + } + } + + } + } +} diff --git a/src/test/java/com/teragrep/rlp_03/readme/ReadmeTest.java b/src/test/java/com/teragrep/rlp_03/readme/ReadmeTest.java new file mode 100644 index 00000000..3a5da16f --- /dev/null +++ b/src/test/java/com/teragrep/rlp_03/readme/ReadmeTest.java @@ -0,0 +1,210 @@ +/* + * Java Reliable Event Logging Protocol Library Server Implementation RLP-03 + * Copyright (C) 2021,2024 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ +package com.teragrep.rlp_03.readme; + +import com.teragrep.rlp_01.RelpBatch; +import com.teragrep.rlp_01.RelpConnection; +import com.teragrep.rlp_03.FrameContext; +import com.teragrep.rlp_03.Server; +import com.teragrep.rlp_03.ServerFactory; +import com.teragrep.rlp_03.config.Config; +import com.teragrep.rlp_03.delegate.*; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.*; +import java.util.function.Consumer; +import java.util.function.Supplier; + +/** + * For use cases in the README.adoc + */ +public class ReadmeTest { + @Test + public void testServerSetup() { + int listenPort = 10601; + int threads = 1; // processing threads shared across the connections + Config config = new Config(listenPort, threads); + + /* + * System.out.println is used to print the frame payload + */ + Consumer syslogConsumer = new Consumer() { + // NOTE: synchronized because frameDelegateSupplier returns this instance for all the parallel connections + @Override + public synchronized void accept(FrameContext frameContext) { + System.out.println(frameContext.relpFrame().payload().toString()); + } + }; + + /* + * DefaultFrameDelegate accepts Consumer for processing syslog frames + */ + DefaultFrameDelegate frameDelegate = new DefaultFrameDelegate(syslogConsumer); + + /* + * Same instance of the frameDelegate is shared with every connection + */ + Supplier frameDelegateSupplier = new Supplier() { + @Override + public FrameDelegate get() { + System.out.println("Providing frameDelegate for a connection"); + return frameDelegate; + } + }; + + /* + * ServerFactory is used to create server instances + */ + ServerFactory serverFactory = new ServerFactory(config, frameDelegateSupplier); + + Server server; + try { + server = serverFactory.create(); + } + catch (IOException ioException) { + throw new UncheckedIOException(ioException); + } + + /* + * One may use server.run(); or create the server into a new thread + */ + Thread serverThread = new Thread(server); + + /* + * Run the server + */ + serverThread.start(); + + /* + * Wait for startup, server is available for connections once it finished setup + */ + try { + server.startup.waitForCompletion(); + System.out.println("server started at port <" + listenPort + ">"); + } + catch (InterruptedException interruptedException) { + throw new RuntimeException(interruptedException); + } + + /* + * Send Hello, World! via rlp_01 + */ + new ExampleRelpClient(listenPort).send("Hello, World!"); + + /* + * Stop server + */ + server.stop(); + + /* + * Wait for stop to complete + */ + try { + serverThread.join(); + } + catch (InterruptedException interruptedException) { + throw new RuntimeException(interruptedException); + } + System.out.println("server stopped at port <" + listenPort + ">"); + + /* + * Close the frameDelegate + */ + try { + frameDelegate.close(); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + + /** + * ExampleRelpClient using rlp_01 for demonstration + */ + private class ExampleRelpClient { + private final int port; + ExampleRelpClient(int port) { + this.port = port; + } + + public void send(String record) { + RelpConnection relpConnection = new RelpConnection(); + try { + relpConnection.connect("localhost", port); + } + catch (IOException | TimeoutException exception) { + throw new RuntimeException(exception); + } + + RelpBatch relpBatch = new RelpBatch(); + relpBatch.insert(record.getBytes(StandardCharsets.UTF_8)); + + while (!relpBatch.verifyTransactionAll()) { + relpBatch.retryAllFailed(); + try { + relpConnection.commit(relpBatch); + } + catch (IOException | TimeoutException exception) { + throw new RuntimeException(exception); + } + } + try { + relpConnection.disconnect(); + } + catch (IOException | TimeoutException exception) { + throw new RuntimeException(exception); + } + finally { + relpConnection.tearDown(); + } + } + } +}