Skip to content

Commit

Permalink
Fix: GraphCommandObjects in multi node
Browse files Browse the repository at this point in the history
  • Loading branch information
sazzad16 committed Apr 9, 2023
1 parent d45b31a commit bc20996
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ public MultiNodePipelineBase(CommandObjects commandObjects) {
*/
protected final void prepareGraphCommands(ConnectionProvider connectionProvider) {
this.graphCommandObjects = new GraphCommandObjects(connectionProvider);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
}

protected abstract HostAndPort getNodeKey(CommandArguments args);
Expand Down
15 changes: 14 additions & 1 deletion src/main/java/redis/clients/jedis/UnifiedJedis.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class UnifiedJedis implements JedisCommands, JedisBinaryCommands,
protected final ConnectionProvider provider;
protected final CommandExecutor executor;
private final CommandObjects commandObjects;
private final GraphCommandObjects graphCommandObjects = new GraphCommandObjects(this);
private final GraphCommandObjects graphCommandObjects;
private JedisBroadcastAndRoundRobinConfig broadcastAndRoundRobinConfig = null;

public UnifiedJedis() {
Expand Down Expand Up @@ -87,6 +87,8 @@ public UnifiedJedis(ConnectionProvider provider) {
this.provider = provider;
this.executor = new DefaultCommandExecutor(provider);
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
}

/**
Expand All @@ -109,6 +111,7 @@ public UnifiedJedis(Connection connection) {
this.provider = null;
this.executor = new SimpleCommandExecutor(connection);
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
}

public UnifiedJedis(Set<HostAndPort> jedisClusterNodes, JedisClientConfig clientConfig, int maxAttempts) {
Expand All @@ -129,24 +132,32 @@ public UnifiedJedis(ClusterConnectionProvider provider, int maxAttempts, Duratio
this.provider = provider;
this.executor = new ClusterCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration);
this.commandObjects = new ClusterCommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
}

public UnifiedJedis(ShardedConnectionProvider provider) {
this.provider = provider;
this.executor = new DefaultCommandExecutor(provider);
this.commandObjects = new ShardedCommandObjects(provider.getHashingAlgo());
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
}

public UnifiedJedis(ShardedConnectionProvider provider, Pattern tagPattern) {
this.provider = provider;
this.executor = new DefaultCommandExecutor(provider);
this.commandObjects = new ShardedCommandObjects(provider.getHashingAlgo(), tagPattern);
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
}

public UnifiedJedis(ConnectionProvider provider, int maxAttempts, Duration maxTotalRetriesDuration) {
this.provider = provider;
this.executor = new RetryableCommandExecutor(provider, maxAttempts, maxTotalRetriesDuration);
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
}

/**
Expand All @@ -159,6 +170,8 @@ public UnifiedJedis(CommandExecutor executor) {
this.provider = null;
this.executor = executor;
this.commandObjects = new CommandObjects();
this.graphCommandObjects = new GraphCommandObjects(this);
this.graphCommandObjects.setBaseCommandArgumentsCreator((comm) -> this.commandObjects.commandArguments(comm));
}

@Override
Expand Down
24 changes: 17 additions & 7 deletions src/main/java/redis/clients/jedis/graph/GraphCommandObjects.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,13 @@
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;

import redis.clients.jedis.Builder;
import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.CommandObject;
import redis.clients.jedis.Connection;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.graph.GraphProtocol.GraphCommand;
import redis.clients.jedis.providers.ConnectionProvider;

Expand All @@ -22,6 +24,8 @@ public class GraphCommandObjects {
private final RedisGraphCommands graph;
private final Connection connection;
private final ConnectionProvider provider;
private Function<ProtocolCommand, CommandArguments> commArgs = (comm) -> new CommandArguments(comm);

private final ConcurrentHashMap<String, Builder<ResultSet>> builders = new ConcurrentHashMap<>();

public GraphCommandObjects(RedisGraphCommands graphCommands) {
Expand All @@ -31,24 +35,28 @@ public GraphCommandObjects(RedisGraphCommands graphCommands) {
}

public GraphCommandObjects(Connection connection) {
this.graph = null;
this.connection = connection;
this.provider = null;
this.graph = null;
}

public GraphCommandObjects(ConnectionProvider provider) {
this.provider = provider;
this.connection = null;
this.graph = null;
this.connection = null;
this.provider = provider;
}

public void setBaseCommandArgumentsCreator(Function<ProtocolCommand, CommandArguments> commArgs) {
this.commArgs = commArgs;
}

// RedisGraph commands
public final CommandObject<ResultSet> graphQuery(String name, String query) {
return new CommandObject<>(new CommandArguments(GraphCommand.QUERY).key(name).add(query).add(__COMPACT), getBuilder(name));
return new CommandObject<>(commArgs.apply(GraphCommand.QUERY).key(name).add(query).add(__COMPACT), getBuilder(name));
}

public final CommandObject<ResultSet> graphReadonlyQuery(String name, String query) {
return new CommandObject<>(new CommandArguments(GraphCommand.RO_QUERY).key(name).add(query).add(__COMPACT), getBuilder(name));
return new CommandObject<>(commArgs.apply(GraphCommand.RO_QUERY).key(name).add(query).add(__COMPACT), getBuilder(name));
}

public final CommandObject<ResultSet> graphQuery(String name, String query, long timeout) {
Expand Down Expand Up @@ -76,11 +84,13 @@ public final CommandObject<ResultSet> graphReadonlyQuery(String name, String que
}

private CommandObject<ResultSet> graphQuery(String name, GraphQueryParams params) {
return new CommandObject<>(params.getArguments(name), getBuilder(name));
return new CommandObject<>(
commArgs.apply(!params.isReadonly() ? GraphCommand.QUERY : GraphCommand.RO_QUERY)
.key(name).addParams(params), getBuilder(name));
}

public final CommandObject<String> graphDelete(String name) {
return new CommandObject<>(new CommandArguments(GraphCommand.DELETE).key(name), STRING);
return new CommandObject<>(commArgs.apply(GraphCommand.DELETE).key(name), STRING);
}
// RedisGraph commands

Expand Down
8 changes: 8 additions & 0 deletions src/main/java/redis/clients/jedis/graph/GraphQueryParams.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@

import java.util.HashMap;
import java.util.Map;
import java.util.function.Function;

import redis.clients.jedis.CommandArguments;
import redis.clients.jedis.commands.ProtocolCommand;
import redis.clients.jedis.exceptions.JedisException;
import redis.clients.jedis.graph.GraphProtocol.GraphCommand;
import redis.clients.jedis.graph.GraphProtocol.GraphKeyword;
Expand Down Expand Up @@ -83,6 +86,11 @@ public void addParams(CommandArguments args) {
}
}

public boolean isReadonly() {
return readonly;
}

@Deprecated
public CommandArguments getArguments(String graphName) {
return new CommandArguments(!readonly ? GraphCommand.QUERY : GraphCommand.RO_QUERY)
.key(graphName).addParams(this);
Expand Down

0 comments on commit bc20996

Please sign in to comment.