From ceb47028a39bd0cefcb5e9f016060f3bdc5de810 Mon Sep 17 00:00:00 2001 From: pcdv Date: Fri, 15 Mar 2024 15:40:47 +0100 Subject: [PATCH] Issue #503: SessionProxy.sendSequenceReset() not called The test also tries to reproduce an inversion of SendSequenceReset and ResendRequest messages, but it is not reproduced (it happens only when a SessionProxy causes the ResendRequest to be sent asynchronously). --- .../system_tests/RaceResendResetTest.java | 172 ++++++++++++++++++ .../real_logic/artio/util/DebugFIXClient.java | 79 ++++++++ .../co/real_logic/artio/util/DebugServer.java | 131 +++++++++++++ .../artio/util/FixMessageTweak.java | 41 +++++ 4 files changed, 423 insertions(+) create mode 100644 artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java create mode 100644 artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java create mode 100644 artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java create mode 100644 artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java new file mode 100644 index 0000000000..1c6cebd260 --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/system_tests/RaceResendResetTest.java @@ -0,0 +1,172 @@ +package uk.co.real_logic.artio.system_tests; + +import org.agrona.ErrorHandler; +import org.agrona.concurrent.EpochNanoClock; +import org.junit.Test; +import uk.co.real_logic.artio.dictionary.generation.Exceptions; +import uk.co.real_logic.artio.engine.EngineConfiguration; +import uk.co.real_logic.artio.engine.FixEngine; +import uk.co.real_logic.artio.fields.EpochFractionFormat; +import uk.co.real_logic.artio.library.LibraryConfiguration; +import uk.co.real_logic.artio.protocol.GatewayPublication; +import uk.co.real_logic.artio.session.DirectSessionProxy; +import uk.co.real_logic.artio.session.SessionCustomisationStrategy; +import uk.co.real_logic.artio.session.SessionIdStrategy; +import uk.co.real_logic.artio.session.SessionProxy; +import uk.co.real_logic.artio.util.DebugFIXClient; +import uk.co.real_logic.artio.util.DebugServer; + +import java.io.IOException; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static uk.co.real_logic.artio.TestFixtures.launchMediaDriver; +import static uk.co.real_logic.artio.messages.InitialAcceptedSessionOwner.SOLE_LIBRARY; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.ACCEPTOR_ID; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.INITIATOR_ID; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingConfig; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.acceptingLibraryConfig; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.connect; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingConfig; +import static uk.co.real_logic.artio.system_tests.SystemTestUtil.initiatingLibraryConfig; + +/** + * Try reproducing race between sent ResendRequest and ResetSequence message when both + * parties request a resend. Also checks that SessionProxy is invoked when a ResetSequence + * message must be sent. + */ +public class RaceResendResetTest extends AbstractGatewayToGatewaySystemTest { + + private boolean sendResendRequestCalled; + private boolean sendSequenceResetCalled; + private boolean useProxy; + + private void launch() { + mediaDriver = launchMediaDriver(); + launchAccepting(); + launchInitiating(); + testSystem = new TestSystem(acceptingLibrary, initiatingLibrary); + } + + private void launchInitiating() { + final EngineConfiguration initiatingConfig = + initiatingConfig(libraryAeronPort, nanoClock) + .deleteLogFileDirOnStart(true) + .initialAcceptedSessionOwner(SOLE_LIBRARY); + initiatingEngine = FixEngine.launch(initiatingConfig); + LibraryConfiguration lib = initiatingLibraryConfig(libraryAeronPort, initiatingHandler, nanoClock); + if (useProxy) + lib.sessionProxyFactory(this::sessionProxyFactory); + initiatingLibrary = connect(lib + ); + } + + private void launchAccepting() { + final EngineConfiguration acceptingConfig = acceptingConfig(port, ACCEPTOR_ID, INITIATOR_ID, nanoClock) + .deleteLogFileDirOnStart(true) + .initialAcceptedSessionOwner(SOLE_LIBRARY); + acceptingEngine = FixEngine.launch(acceptingConfig); + + final LibraryConfiguration acceptingLibraryConfig = acceptingLibraryConfig(acceptingHandler, nanoClock); + acceptingLibrary = connect(acceptingLibraryConfig); + } + + /** + * Sanity check that we can connect Artio to a debug server with canned messages. + */ + @Test + public void testDebugServer() throws IOException { + DebugServer srv = new DebugServer(port); + srv.setWaitForData(true); + srv.addFIXResponse("8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=1|52=20240315-10:52:24.098|98=0|108=10|141=N|35002=0|35003=0|10=024|"); + srv.start(); + + mediaDriver = launchMediaDriver(); + launchInitiating(); + testSystem = new TestSystem(initiatingLibrary); + connectAndAcquire(); + } + + private SessionProxy sessionProxyFactory( + final int sessionBufferSize, + final GatewayPublication gatewayPublication, + final SessionIdStrategy sessionIdStrategy, + final SessionCustomisationStrategy customisationStrategy, + final EpochNanoClock clock, + final long connectionId, + final int libraryId, + final ErrorHandler errorHandler, + final EpochFractionFormat epochFractionPrecision) { + return new DirectSessionProxy(sessionBufferSize, gatewayPublication, sessionIdStrategy, customisationStrategy, + clock, connectionId, libraryId, errorHandler, epochFractionPrecision) { + @Override + public long sendResendRequest(int msgSeqNo, int beginSeqNo, int endSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed) { + sendResendRequestCalled = true; +// try { +// Thread.sleep(10); +// } catch (InterruptedException ignored) { +// } + return super.sendResendRequest(msgSeqNo, beginSeqNo, endSeqNo, sequenceIndex, lastMsgSeqNumProcessed); + } + + @Override + public long sendSequenceReset(int msgSeqNo, int newSeqNo, int sequenceIndex, int lastMsgSeqNumProcessed) { + sendSequenceResetCalled = true; + return super.sendSequenceReset(msgSeqNo, newSeqNo, sequenceIndex, lastMsgSeqNumProcessed); + } + }; + } + + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldNotInvertResendAndReset() throws Exception { + useProxy = false; + reconnectTest(); + } + + @Test(timeout = TEST_TIMEOUT_IN_MS) + public void shouldCallProxySendSequenceReset() throws Exception { + useProxy = true; + reconnectTest(); + } + + private void reconnectTest() throws Exception { + launch(); + + connectAndAcquire(); + + messagesCanBeExchanged(); + + disconnectSessions(); + Exceptions.closeAll(this::closeAcceptingEngine); + + assertEquals(3, acceptingSession.lastReceivedMsgSeqNum()); + assertEquals(3, initiatingSession.lastReceivedMsgSeqNum()); + + DebugServer srv = new DebugServer(port); + srv.setWaitForData(true); + srv.addFIXResponse("8=FIX.4.4|9=94|35=A|49=acceptor|56=initiator|34=5|52=***|98=0|108=10|141=N|35002=0|35003=0|10=024|"); + srv.addFIXResponse("8=FIX.4.4|9=94|35=2|49=acceptor|56=initiator|34=6|52=***|7=4|16=0|10=024|"); + srv.start(); + + connectPersistentSessions(4, 4, false); + + DebugFIXClient exchange = new DebugFIXClient(srv.popClient(5000)); + exchange.popAndAssert("35=A 34=4"); + exchange.popAndAssert("35=2 34=5 7=4 16=0"); + exchange.popAndAssert("35=4 34=4 36=6"); + + exchange.close(); + srv.stop(); + Exceptions.closeAll(this::closeInitiatingEngine, mediaDriver); + + if (useProxy) { + assertTrue("SessionProxy.sendResendRequest() not called", sendResendRequestCalled); + assertTrue("SessionProxy.sendSequenceReset() not called", sendSequenceResetCalled); + } + } + + private void connectAndAcquire() { + connectSessions(); + acceptingSession = acceptingHandler.lastSession(); + } +} diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java new file mode 100644 index 0000000000..2188383877 --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugFIXClient.java @@ -0,0 +1,79 @@ +package uk.co.real_logic.artio.util; + +import org.junit.Assert; + +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.Scanner; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * Helper to pop FIX messages received on a socket. + * + * @see DebugServer + */ +public class DebugFIXClient { + private final DebugServer.HasIOStream io; + private final Thread thread; + + private final BlockingQueue> messages = new LinkedBlockingQueue<>(); + private volatile boolean disposed; + private String prefix = " <<< "; + + public DebugFIXClient(DebugServer.HasIOStream io) { + this.io = Objects.requireNonNull(io); + thread = new Thread(this::run, "DebugFIXClient"); + thread.start(); + } + + public void close() throws Exception { + disposed = true; + io.in.close(); + io.in.close(); + thread.interrupt(); + thread.join(); + } + + private void run() { + StringBuilder s = new StringBuilder(128); + while (!disposed) { + Scanner scanner = new Scanner(io.in).useDelimiter("\u0001"); + Map msg = new HashMap<>(); + while (scanner.hasNext()) { + String fld = scanner.next(); + s.append(fld).append('|'); + int eq = fld.indexOf('='); + String tag = fld.substring(0, eq); + msg.put(tag, fld.substring(eq + 1)); + if (tag.equals("10")) { + messages.add(msg); + msg = new HashMap<>(); + System.out.println(prefix + s); + s.setLength(0); + } + } + } + } + + public Map popMessage() throws InterruptedException { + return messages.poll(5, TimeUnit.SECONDS); + } + + public void popAndAssert(String tagValues) throws InterruptedException { + Map map = popMessage(); + for (String rule : tagValues.split(" ")) { + String tag = rule.substring(0, rule.indexOf('=')); + if (map == null) + throw new AssertionError("No message received"); + String value = map.get(tag); + Assert.assertEquals(rule, tag + "=" + value); + } + } + + public void setPrefix(String prefix) { + this.prefix = prefix; + } +} diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java new file mode 100644 index 0000000000..168103b6e2 --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/DebugServer.java @@ -0,0 +1,131 @@ +package uk.co.real_logic.artio.util; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.ServerSocket; +import java.net.Socket; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * A server that accepts TCP connections and is able to reply automatically with canned + * data. It can be used to simulate a FIX server in order to quickly sent specific messages. + */ +public class DebugServer { + + private final int port; + private final Queue connectResponses; + private final BlockingQueue clients; + private final ServerSocket serverSocket; + + /** + * If true, wait until some data is received before sending prepared messages. + */ + private boolean waitForData; + + /** + * Creates a debug server listening on specified port. + */ + public DebugServer(int port) throws IOException { + this.port = port; + this.connectResponses = new ConcurrentLinkedQueue<>(); + this.clients = new LinkedBlockingQueue<>(); + this.serverSocket = new ServerSocket(port); + } + + /** + * Adds a message that must be directly sent to connecting clients. Messages + * are sent in the same order they were added. + */ + public void addConnectResponse(byte[] message) { + connectResponses.add(message); + } + + /** + * Warning: causes problems because SendingTime and checksum needs to be regenerated + * and they are not. + */ + public void addFIXResponse(String msg) { + addConnectResponse(FixMessageTweak.recycle(msg)); + } + + /** + * Starts the debug server, accepting incoming connections and sending + * prepared data. + */ + public void start() throws IOException { + new Thread("DebugServer-" + port) { + @Override + public void run() { + try { + while (!serverSocket.isClosed()) { + Socket s = serverSocket.accept(); + System.out.println("Connection accepted from " + s.getInetAddress()); + try { + BufferedInputStream in = new BufferedInputStream(s.getInputStream()); + BufferedOutputStream out = new BufferedOutputStream(s.getOutputStream()); + + if (!connectResponses.isEmpty() && waitForData) { + in.mark(0); + in.read(); + in.reset(); + } + + HasIOStream client = new HasIOStream(in, out); + sendResponses(client.out); + clients.add(client); + } catch (IOException e) { + e.printStackTrace(); + } + } + } catch (IOException e) { + if (!serverSocket.isClosed()) + e.printStackTrace(); + } + } + }.start(); + } + + public void stop() throws IOException { + serverSocket.close(); + } + + /** + * Sends prepared data to the client. + */ + private void sendResponses(OutputStream outputStream) throws IOException { + for (byte[] response : connectResponses) { + outputStream.write(response); + outputStream.flush(); + } + } + + public HasIOStream popClient(long timeoutMs) throws InterruptedException { + return clients.poll(timeoutMs, TimeUnit.MILLISECONDS); + } + + public int getPort() { + return port; + } + + public void setWaitForData(boolean waitForData) { + this.waitForData = waitForData; + } + + public static class HasIOStream { + + public final InputStream in; + public final OutputStream out; + + public HasIOStream(InputStream in, OutputStream out) { + this.in = in; + this.out = out; + } + } +} diff --git a/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java new file mode 100644 index 0000000000..470ea2540b --- /dev/null +++ b/artio-system-tests/src/test/java/uk/co/real_logic/artio/util/FixMessageTweak.java @@ -0,0 +1,41 @@ +package uk.co.real_logic.artio.util; + +import java.nio.charset.StandardCharsets; +import java.time.LocalDateTime; +import java.time.ZoneOffset; +import java.time.format.DateTimeFormatter; + +/** + * Easily generate valid FIX messages from captured data. + */ +public class FixMessageTweak { + + /** + * Replaces sending time with current time and recompute BodyLength / Checksum. + */ + public static byte[] recycle(String msg) { + msg = msg.replace('|', '\001'); + + // Replace Sending Time (52) with current time + String time = LocalDateTime.now(ZoneOffset.UTC).format(DateTimeFormatter.ofPattern("yyyyMMdd-HH:mm:ss.SSS")); + msg = msg.replaceAll("52=[^\u0001]*\u0001", "52=" + time + '\001'); + + // recompute body length + int body = msg.indexOf('\001', 10) + 1; + int trailer = msg.indexOf("10="); + msg = msg.replaceAll("9=\\d+", "9=" + (trailer - body)); + + // recompute checksum + msg = msg.replaceAll("10=[0-9]{3}", "10=" + computeChecksum(msg)); + + return msg.getBytes(StandardCharsets.UTF_8); + } + + private static String computeChecksum(String fixMessage) { + int checksum = 0; + for (int i = fixMessage.indexOf("10=") - 1; i >= 0; i--) { + checksum += fixMessage.charAt(i); + } + return String.format("%03d", checksum % 256); + } +}