Skip to content

Commit

Permalink
OpsForGeo producing "READONLY You can't write against a read only rep…
Browse files Browse the repository at this point in the history
…lica" on READS... only if master & replica configured redis#1813

Divert pure read intentions of georadius and georadiusbymember commands (variants that do not use STORE/STOREDIST) to GEORADIUS_RO/GEORADIUSBYMEMBER_RO
This will unify the behaviour between Cluster and Redis Standalone/Replica arrangements

Relates to  issues redis#1481 redis#2568 redis#2871

Closes redis#1813
  • Loading branch information
ggivo committed Oct 30, 2024
1 parent d56af50 commit cfd82d5
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 84 deletions.
10 changes: 4 additions & 6 deletions src/main/java/io/lettuce/core/AbstractRedisAsyncCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,6 @@
import java.util.Set;

import static io.lettuce.core.protocol.CommandType.EXEC;
import static io.lettuce.core.protocol.CommandType.GEORADIUS;
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER;
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER_RO;
import static io.lettuce.core.protocol.CommandType.GEORADIUS_RO;

Expand Down Expand Up @@ -1140,13 +1138,13 @@ public RedisFuture<List<GeoCoordinates>> geopos(K key, V... members) {

@Override
public RedisFuture<Set<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
return dispatch(commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name()));
return georadius_ro(key, longitude, latitude, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadius(K key, double longitude, double latitude, double distance,
GeoArgs.Unit unit, GeoArgs geoArgs) {
return dispatch(commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name(), geoArgs));
return georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
}

@Override
Expand All @@ -1166,13 +1164,13 @@ protected RedisFuture<List<GeoWithin<V>>> georadius_ro(K key, double longitude,

@Override
public RedisFuture<Set<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
return dispatch(commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name()));
return georadiusbymember_ro(key, member, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit,
GeoArgs geoArgs) {
return dispatch(commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name(), geoArgs));
return georadiusbymember_ro(key, member, distance, unit, geoArgs);
}

@Override
Expand Down
22 changes: 9 additions & 13 deletions src/main/java/io/lettuce/core/AbstractRedisReactiveCommands.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,8 +66,6 @@
import java.util.function.Supplier;

import static io.lettuce.core.protocol.CommandType.EXEC;
import static io.lettuce.core.protocol.CommandType.GEORADIUS;
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER;
import static io.lettuce.core.protocol.CommandType.GEORADIUSBYMEMBER_RO;
import static io.lettuce.core.protocol.CommandType.GEORADIUS_RO;

Expand Down Expand Up @@ -1203,14 +1201,14 @@ public Flux<Value<GeoCoordinates>> geopos(K key, V... members) {
}

@Override
public Flux<V> georadius(K key, double longitude, double latitude, double distance, Unit unit) {
return createDissolvingFlux(() -> commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name()));
public Flux<V> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
return georadius_ro(key, longitude, latitude, distance, unit);
}

@Override
public Flux<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, Unit unit, GeoArgs geoArgs) {
return createDissolvingFlux(
() -> commandBuilder.georadius(GEORADIUS, key, longitude, latitude, distance, unit.name(), geoArgs));
public Flux<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit,
GeoArgs geoArgs) {
return georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
}

@Override
Expand All @@ -1231,15 +1229,13 @@ protected Flux<GeoWithin<V>> georadius_ro(K key, double longitude, double latitu
}

@Override
public Flux<V> georadiusbymember(K key, V member, double distance, Unit unit) {
return createDissolvingFlux(
() -> commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name()));
public Flux<V> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
return georadiusbymember_ro(key, member, distance, unit);
}

@Override
public Flux<GeoWithin<V>> georadiusbymember(K key, V member, double distance, Unit unit, GeoArgs geoArgs) {
return createDissolvingFlux(
() -> commandBuilder.georadiusbymember(GEORADIUSBYMEMBER, key, member, distance, unit.name(), geoArgs));
public Flux<GeoWithin<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) {
return georadiusbymember_ro(key, member, distance, unit, geoArgs);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -249,28 +249,6 @@ public RedisFuture<String> flushdb(FlushMode flushMode) {
.firstOfAsync(executeOnUpstream(kvRedisClusterAsyncCommands -> kvRedisClusterAsyncCommands.flushdb(flushMode)));
}

@Override
public RedisFuture<Set<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
return super.georadius_ro(key, longitude, latitude, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadius(K key, double longitude, double latitude, double distance,
GeoArgs.Unit unit, GeoArgs geoArgs) {
return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
}

@Override
public RedisFuture<Set<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
return super.georadiusbymember_ro(key, member, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit,
GeoArgs geoArgs) {
return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
}

@Override
public RedisFuture<List<K>> keys(K pattern) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -235,27 +235,6 @@ public Mono<String> flushdb(FlushMode flushMode) {
return Flux.merge(publishers.values()).last();
}

@Override
public Flux<V> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
return super.georadius_ro(key, longitude, latitude, distance, unit);
}

@Override
public Flux<GeoWithin<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit,
GeoArgs geoArgs) {
return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
}

@Override
public Flux<V> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
return super.georadiusbymember_ro(key, member, distance, unit);
}

@Override
public Flux<GeoWithin<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit, GeoArgs geoArgs) {
return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
}

@Override
public Flux<K> keys(K pattern) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,28 +65,6 @@ public RedisClusterPubSubAsyncCommandsImpl(StatefulRedisPubSubConnection<K, V> c
super(connection, codec);
}

@Override
public RedisFuture<Set<V>> georadius(K key, double longitude, double latitude, double distance, GeoArgs.Unit unit) {
return super.georadius_ro(key, longitude, latitude, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadius(K key, double longitude, double latitude, double distance,
GeoArgs.Unit unit, GeoArgs geoArgs) {
return super.georadius_ro(key, longitude, latitude, distance, unit, geoArgs);
}

@Override
public RedisFuture<Set<V>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit) {
return super.georadiusbymember_ro(key, member, distance, unit);
}

@Override
public RedisFuture<List<GeoWithin<V>>> georadiusbymember(K key, V member, double distance, GeoArgs.Unit unit,
GeoArgs geoArgs) {
return super.georadiusbymember_ro(key, member, distance, unit, geoArgs);
}

@Override
public StatefulRedisClusterPubSubConnectionImpl<K, V> getStatefulConnection() {
return (StatefulRedisClusterPubSubConnectionImpl<K, V>) super.getStatefulConnection();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package io.lettuce.core.commands;

import io.lettuce.core.*;
import io.lettuce.core.api.sync.RedisCommands;
import io.lettuce.core.codec.StringCodec;
import io.lettuce.core.masterreplica.MasterReplica;
import io.lettuce.core.masterreplica.StatefulRedisMasterReplicaConnection;
import io.lettuce.core.models.role.RedisInstance;
import io.lettuce.core.models.role.RoleParser;
import io.lettuce.test.LettuceExtension;
import io.lettuce.test.condition.EnabledOnCommand;
import io.lettuce.test.settings.TestSettings;
import org.junit.jupiter.api.*;
import org.junit.jupiter.api.extension.ExtendWith;

import java.util.Arrays;
import java.util.List;
import java.util.Set;

import static io.lettuce.TestTags.INTEGRATION_TEST;
import static org.assertj.core.api.Assertions.*;
import static org.junit.jupiter.api.Assumptions.assumeTrue;

/**
* @author Mark Paluch
*/
@Tag(INTEGRATION_TEST)
@ExtendWith(LettuceExtension.class)
@EnabledOnCommand("GEOADD")
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class GeoMasterReplicaIntegrationTests extends AbstractRedisClientTest {

private StatefulRedisMasterReplicaConnection<String, String> masterReplica;

private RedisCommands<String, String> upstream;

private RedisCommands<String, String> connection1;

private RedisCommands<String, String> connection2;

@BeforeEach
void before() {

RedisURI node1 = RedisURI.Builder.redis(host, TestSettings.port(3)).withDatabase(2).build();
RedisURI node2 = RedisURI.Builder.redis(host, TestSettings.port(4)).withDatabase(2).build();

connection1 = client.connect(node1).sync();
connection2 = client.connect(node2).sync();

RedisInstance node1Instance = RoleParser.parse(this.connection1.role());
RedisInstance node2Instance = RoleParser.parse(this.connection2.role());

if (node1Instance.getRole().isUpstream() && node2Instance.getRole().isReplica()) {
upstream = connection1;
} else if (node2Instance.getRole().isUpstream() && node1Instance.getRole().isReplica()) {
upstream = connection2;
} else {
assumeTrue(false,
String.format("Cannot run the test because I don't have a distinct master and replica but %s and %s",
node1Instance, node2Instance));
}

masterReplica = MasterReplica.connect(client, StringCodec.UTF8, Arrays.asList(node1, node2));
masterReplica.setReadFrom(ReadFrom.REPLICA);
}

@AfterEach
void after() {

if (connection1 != null) {
connection1.getStatefulConnection().close();
}

if (connection2 != null) {
connection2.getStatefulConnection().close();
}

if (masterReplica != null) {
masterReplica.close();
}
}

@BeforeEach
void setUp() {
this.redis.flushall();
}

@Test
void georadiusReadFromReplica() {

prepareGeo(upstream);

upstream.waitForReplication(1, 1000);

Set<String> georadius = masterReplica.sync().georadius(key, 8.6582861, 49.5285695, 1, GeoArgs.Unit.km);
assertThat(georadius).hasSize(1).contains("Weinheim");
}

@Test
void georadiusbymemberReadFromReplica() {

prepareGeo(upstream);
upstream.waitForReplication(1, 100);

Set<String> empty = masterReplica.sync().georadiusbymember(key, "Bahn", 1, GeoArgs.Unit.km);
assertThat(empty).hasSize(1).contains("Bahn");
}

protected void prepareGeo(RedisCommands<String, String> redis) {
redis.geoadd(key, 8.6638775, 49.5282537, "Weinheim");
redis.geoadd(key, 8.3796281, 48.9978127, "EFS9", 8.665351, 49.553302, "Bahn");
}

private static double getY(List<GeoWithin<String>> georadius, int i) {
return georadius.get(i).getCoordinates().getY().doubleValue();
}

private static double getX(List<GeoWithin<String>> georadius, int i) {
return georadius.get(i).getCoordinates().getX().doubleValue();
}

}

0 comments on commit cfd82d5

Please sign in to comment.