diff --git a/src/main/java/io/lettuce/core/output/ReplayOutput.java b/src/main/java/io/lettuce/core/output/ReplayOutput.java index 4dafced0f1..f8407723d4 100644 --- a/src/main/java/io/lettuce/core/output/ReplayOutput.java +++ b/src/main/java/io/lettuce/core/output/ReplayOutput.java @@ -33,6 +33,11 @@ public void set(long integer) { output.add(new Integer(integer)); } + @Override + public void set(double number) { + output.add(new Double(number)); + } + @Override public void setError(ByteBuffer error) { error.mark(); @@ -130,6 +135,21 @@ protected void replay(CommandOutput target) { } + static class Double extends Signal { + + final double message; + + Double(double message) { + this.message = message; + } + + @Override + protected void replay(CommandOutput target) { + target.set(message); + } + + } + public static class ErrorBytes extends BulkStringSupport { ErrorBytes(ByteBuffer message) { diff --git a/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java b/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java index 9936ba4283..3472660281 100644 --- a/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java +++ b/src/test/java/io/lettuce/core/pubsub/PubSubCommandIntegrationTests.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Matcher; @@ -543,6 +544,15 @@ void resubscribeShardChannelsOnReconnect() throws Exception { assertThat(messages.take()).isEqualTo(shardMessage); } + @Test + void interleaveCommands() throws ExecutionException, InterruptedException { + + // relay Double and Long + assertThat(pubsub.zadd("myzset", 1.0, "one").get()).isEqualTo(1L); + assertThat(pubsub.zadd("myzset", 2.0, "two").get()).isEqualTo(1L); + assertThat(pubsub.zpopmin("myzset", 1).get().get(0).getValue()).isEqualTo("one"); + } + @Test void adapter() throws Exception { final BlockingQueue localCounts = LettuceFactories.newBlockingQueue();