Skip to content

Commit

Permalink
Support 'multi()' from JedisPooled (UnifiedJedis) (#3361)
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 authored Apr 9, 2023
1 parent d45b31a commit 14cd22f
Show file tree
Hide file tree
Showing 8 changed files with 122 additions and 4 deletions.
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/JedisCluster.java
Original file line number Diff line number Diff line change
Expand Up @@ -209,4 +209,13 @@ public Connection getConnectionFromSlot(int slot) {
public ClusterPipeline pipelined() {
return new ClusterPipeline((ClusterConnectionProvider) provider);
}

/**
* @return nothing
* @throws UnsupportedOperationException
*/
@Override
public Transaction multi() {
throw new UnsupportedOperationException();
}
}
9 changes: 9 additions & 0 deletions src/main/java/redis/clients/jedis/JedisSharding.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,13 @@ public JedisSharding(ShardedConnectionProvider provider, Pattern tagPattern) {
public ShardedPipeline pipelined() {
return new ShardedPipeline((ShardedConnectionProvider) provider);
}

/**
* @return nothing
* @throws UnsupportedOperationException
*/
@Override
public Transaction multi() {
throw new UnsupportedOperationException();
}
}
15 changes: 15 additions & 0 deletions src/main/java/redis/clients/jedis/ReliableTransaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public class ReliableTransaction extends TransactionBase {
* Creates a new transaction.
*
* A MULTI command will be executed. WATCH/UNWATCH/MULTI commands must not be called with this object.
* @param connection connection
*/
public ReliableTransaction(Connection connection) {
super(connection);
Expand All @@ -33,6 +34,20 @@ public ReliableTransaction(Connection connection, boolean doMulti) {
super(connection, doMulti);
}

/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public ReliableTransaction(Connection connection, boolean doMulti, boolean closeConnection) {
super(connection, doMulti, closeConnection);
}

@Override
protected final void processMultiResponse() {
String status = connection.getStatusCodeReply();
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/redis/clients/jedis/Transaction.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ public Transaction(Jedis jedis) {
*
* A MULTI command will be added to be sent to server. WATCH/UNWATCH/MULTI commands must not be
* called with this object.
* @param connection connection
*/
public Transaction(Connection connection) {
super(connection);
Expand All @@ -41,6 +42,21 @@ public Transaction(Connection connection, boolean doMulti) {
this.jedis = null;
}

/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public Transaction(Connection connection, boolean doMulti, boolean closeConnection) {
super(connection, doMulti, closeConnection);
this.jedis = null;
}

@Override
protected final void processMultiResponse() {
// do nothing
Expand Down
25 changes: 24 additions & 1 deletion src/main/java/redis/clients/jedis/TransactionBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ public abstract class TransactionBase extends Queable implements PipelineCommand
PipelineBinaryCommands, RedisModulePipelineCommands, Closeable {

protected final Connection connection;
private final boolean closeConnection;
private final CommandObjects commandObjects;
private final GraphCommandObjects graphCommandObjects;

Expand All @@ -52,6 +53,7 @@ public abstract class TransactionBase extends Queable implements PipelineCommand
*
* A MULTI command will be added to be sent to server. WATCH/UNWATCH/MULTI commands must not be
* called with this object.
* @param connection connection
*/
public TransactionBase(Connection connection) {
this(connection, true);
Expand All @@ -67,7 +69,22 @@ public TransactionBase(Connection connection) {
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
*/
public TransactionBase(Connection connection, boolean doMulti) {
this(connection, doMulti, false);
}

/**
* Creates a new transaction.
*
* A user wanting to WATCH/UNWATCH keys followed by a call to MULTI ({@link #multi()}) it should
* be {@code doMulti=false}.
*
* @param connection connection
* @param doMulti {@code false} should be set to enable manual WATCH, UNWATCH and MULTI
* @param closeConnection should the 'connection' be closed when 'close()' is called?
*/
public TransactionBase(Connection connection, boolean doMulti, boolean closeConnection) {
this.connection = connection;
this.closeConnection = closeConnection;
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this.connection);
if (doMulti) multi();
Expand Down Expand Up @@ -112,7 +129,13 @@ protected final <T> Response<T> appendCommand(CommandObject<T> commandObject) {

@Override
public final void close() {
clear();
try {
clear();
} finally {
if (closeConnection) {
connection.close();
}
}
}

public final void clear() {
Expand Down
10 changes: 8 additions & 2 deletions src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -4649,8 +4649,14 @@ public Object pipelined() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
}
Connection connection = provider.getConnection();
return new Pipeline(connection, true);
return new Pipeline(provider.getConnection(), true);
}

public Transaction multi() {
if (provider == null) {
throw new IllegalStateException("It is not allowed to create Pipeline from this " + getClass());
}
return new Transaction(provider.getConnection(), true, true);
}

public Object sendCommand(ProtocolCommand cmd) {
Expand Down
7 changes: 7 additions & 0 deletions src/test/java/redis/clients/jedis/ClusterPipeliningTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -1044,4 +1044,11 @@ public void simple() { // TODO: move into 'redis.clients.jedis.commands.unified.
}
}
}

@Test
public void transaction() {
try (JedisCluster cluster = new JedisCluster(nodes, DEFAULT_CLIENT_CONFIG)) {
assertThrows(UnsupportedOperationException.class, () -> cluster.multi());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,14 @@
import redis.clients.jedis.JedisPooled;
import redis.clients.jedis.Pipeline;
import redis.clients.jedis.Response;
import redis.clients.jedis.Transaction;

import redis.clients.jedis.commands.unified.UnifiedJedisCommandsTestBase;

public class PooledPipelinedTest extends UnifiedJedisCommandsTestBase {
public class PooledPipeliningTest extends UnifiedJedisCommandsTestBase {

protected Pipeline pipeline;
protected Transaction transaction;

@BeforeClass
public static void prepare() throws InterruptedException {
Expand All @@ -35,11 +37,13 @@ public static void cleanUp() {
public void setUp() {
PooledCommandsTestHelper.clearData();
pipeline = ((JedisPooled) jedis).pipelined();
transaction = jedis.multi();
}

@After
public void tearDown() {
pipeline.close();
transaction.close();
}

@Test
Expand Down Expand Up @@ -71,4 +75,33 @@ public void simple() {
assertEquals(expected.get(i), responses.get(i).get());
}
}

@Test
public void transaction() {
final int count = 10;
int totalCount = 0;
for (int i = 0; i < count; i++) {
jedis.set("foo" + i, "bar" + i);
}
totalCount += count;
for (int i = 0; i < count; i++) {
jedis.rpush("foobar" + i, "foo" + i, "bar" + i);
}
totalCount += count;

List<Object> expected = new ArrayList<>(totalCount);
for (int i = 0; i < count; i++) {
transaction.get("foo" + i);
expected.add("bar" + i);
}
for (int i = 0; i < count; i++) {
transaction.lrange("foobar" + i, 0, -1);
expected.add(Arrays.asList("foo" + i, "bar" + i));
}

List<Object> responses = transaction.exec();
for (int i = 0; i < totalCount; i++) {
assertEquals(expected.get(i), responses.get(i));
}
}
}

0 comments on commit 14cd22f

Please sign in to comment.