Skip to content

Commit

Permalink
Fix GEO store commands not removing dst key when result set is empty (#…
Browse files Browse the repository at this point in the history
…1755)

If dst exists, when using the store variant, we need to
delete the dst key when the result set is empty, like
we are overwriting the dst key with an empty result set.

This change covers the following commands and scenarios:
- GEOSEARCHSTORE FROMMEMBER against non-existing src key.
- GEOSEARCHSTORE FROMLONLAT against non-existing src key.
- GEOSEARCHSTORE FROMLONLAT the search result set is empty.
- GEORADIUS STORE Against non-existing src key.
- GEORADIUS STORE search result set is empty.
- GEORADIUSBYMEMBER STORE against non-existing src key.
- GEORADIUSBYMEMBER STORE search result set is empty.

While writing the test cases, we still have a issue, FROMMEMBER
against non-existing src key member:
```
127.0.0.1:6666> geoadd src 10 10 Shenzhen
(integer) 1
127.0.0.1:6666> GEOSEARCHSTORE dst src FROMMEMBER Shenzhen_2 BYBOX 88 88 m
(integer) 0

127.0.0.1:6379> GEOADD src 10 10 Shenzhen
(integer) 1
127.0.0.1:6379> GEOSEARCHSTORE dst src FROMMEMBER Shenzhen_2 BYBOX 88 88 m
(error) ERR could not decode requested zset member
```

Redis will return a specific error if the member does not exist,
but Kvrocks currently handles it as if the src key does not exist,
this will be addressed in a future PR since it require more works.
  • Loading branch information
enjoy-binbin authored Sep 12, 2023
1 parent 11a0140 commit 4aa95fa
Show file tree
Hide file tree
Showing 2 changed files with 65 additions and 0 deletions.
15 changes: 15 additions & 0 deletions src/types/redis_geo.cc
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,11 @@ rocksdb::Status Geo::SearchStore(const Slice &user_key, GeoShape geo_shape, Orig
if (point_type == kMember) {
GeoPoint geo_point;
auto s = Get(user_key, member, &geo_point);
// store key is not empty, try to remove it before returning.
if (!s.ok() && s.IsNotFound() && !store_key.empty()) {
auto del_s = ZSet::Del(store_key);
if (!del_s.ok()) return del_s;
}
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

geo_shape.xy[0] = geo_point.longitude;
Expand All @@ -124,6 +129,11 @@ rocksdb::Status Geo::SearchStore(const Slice &user_key, GeoShape geo_shape, Orig
std::string ns_key = AppendNamespacePrefix(user_key);
ZSetMetadata metadata(false);
rocksdb::Status s = ZSet::GetMetadata(ns_key, &metadata);
// store key is not empty, try to remove it before returning.
if (!s.ok() && s.IsNotFound() && !store_key.empty()) {
auto del_s = ZSet::Del(store_key);
if (!del_s.ok()) return del_s;
}
if (!s.ok()) return s.IsNotFound() ? rocksdb::Status::OK() : s;

// Get neighbor geohash boxes for radius search
Expand All @@ -134,6 +144,11 @@ rocksdb::Status Geo::SearchStore(const Slice &user_key, GeoShape geo_shape, Orig

// if no matching results, give empty reply
if (geo_points->empty()) {
// store key is not empty, try to remove it before returning.
if (!store_key.empty()) {
auto del_s = ZSet::Del(store_key);
if (!del_s.ok()) return del_s;
}
return rocksdb::Status::OK();
}

Expand Down
50 changes: 50 additions & 0 deletions tests/gocase/unit/geo/geo_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,21 @@ func TestGeo(t *testing.T) {
require.EqualValues(t, []interface{}([]interface{}{}), rdb.Do(ctx, "GEORADIUSBYMEMBER_RO", "points", "member", "1", "km").Val())
})

t.Run("GEORADIUSBYMEMBER store: remove the dst key when there is no result set", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "DEL", "src", "dst").Err())

// Against non-existing src key.
require.NoError(t, rdb.Do(ctx, "GEOADD", "dst", "10", "10", "Shenzhen").Err())
require.EqualValues(t, 0, rdb.Do(ctx, "GEORADIUS", "src", 15, 37, 88, "m", "store", "dst").Val())
require.EqualValues(t, 0, rdb.Exists(ctx, "dst").Val())

// The search result set is empty.
require.NoError(t, rdb.Do(ctx, "GEOADD", "src", "10", "10", "Shenzhen").Err())
require.NoError(t, rdb.Do(ctx, "GEOADD", "dst", "20", "20", "Guangzhou").Err())
require.EqualValues(t, 0, rdb.Do(ctx, "GEORADIUS", "src", 15, 37, 88, "m", "store", "dst").Val())
require.EqualValues(t, 0, rdb.Exists(ctx, "dst").Val())
})

t.Run("GEORADIUSBYMEMBER simple (sorted)", func(t *testing.T) {
require.EqualValues(t, []redis.GeoLocation([]redis.GeoLocation{{Name: "wtc one", Longitude: 0, Latitude: 0, Dist: 0, GeoHash: 0}, {Name: "union square", Longitude: 0, Latitude: 0, Dist: 0, GeoHash: 0}, {Name: "central park n/q/r", Longitude: 0, Latitude: 0, Dist: 0, GeoHash: 0}, {Name: "4545", Longitude: 0, Latitude: 0, Dist: 0, GeoHash: 0}, {Name: "lic market", Longitude: 0, Latitude: 0, Dist: 0, GeoHash: 0}}), rdb.GeoRadiusByMember(ctx, "nyc", "wtc one", &redis.GeoRadiusQuery{Radius: 7, Unit: "km"}).Val())
})
Expand Down Expand Up @@ -229,6 +244,26 @@ func TestGeo(t *testing.T) {
require.EqualValues(t, 0, rdb.Do(ctx, "GEOSEARCHSTORE", "dst", "src", "FROMMEMBER", "Shenzhen", "BYBOX", 88, 88, "m").Val())
})

t.Run("GEOSEARCHSTORE remove the dst key when there is no result set", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "del", "src", "dst").Err())

// FROMMEMBER against non-existing src key.
require.NoError(t, rdb.Do(ctx, "geoadd", "dst", "10", "10", "Shenzhen").Err())
require.EqualValues(t, 0, rdb.Do(ctx, "GEOSEARCHSTORE", "dst", "src", "FROMMEMBER", "Shenzhen", "BYBOX", 88, 88, "m").Val())
require.EqualValues(t, 0, rdb.Exists(ctx, "dst").Val())

// FROMLONLAT against non-existing src key.
require.NoError(t, rdb.Do(ctx, "geoadd", "dst", "10", "10", "Shenzhen").Err())
require.EqualValues(t, 0, rdb.Do(ctx, "GEOSEARCHSTORE", "dst", "src", "FROMLONLAT", 15, 37, "BYBOX", 88, 88, "m").Val())
require.EqualValues(t, 0, rdb.Exists(ctx, "dst").Val())

// FROMLONLAT the search result set is empty.
require.NoError(t, rdb.Do(ctx, "geoadd", "src", "10", "10", "Shenzhen").Err())
require.NoError(t, rdb.Do(ctx, "geoadd", "dst", "20", "20", "Guangzhou").Err())
require.EqualValues(t, 0, rdb.Do(ctx, "GEOSEARCHSTORE", "dst", "src", "FROMLONLAT", 15, 37, "BYBOX", 88, 88, "m").Val())
require.EqualValues(t, 0, rdb.Exists(ctx, "dst").Val())
})

t.Run("GEOSEARCHSTORE with BYRADIUS", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "points").Err())
require.NoError(t, rdb.GeoAdd(ctx, "points",
Expand Down Expand Up @@ -312,6 +347,21 @@ func TestGeo(t *testing.T) {
require.ErrorContains(t, rdb.Do(ctx, "georadius", "points", 13.361389, 38.115556, 50, "km", "store").Err(), "syntax")
})

t.Run("GEORADIUS STORE option: remove the dst key when there is no result set", func(t *testing.T) {
require.NoError(t, rdb.Do(ctx, "DEL", "src", "dst").Err())

// Against non-existing src key.
require.NoError(t, rdb.Do(ctx, "GEOADD", "dst", "10", "10", "Shenzhen").Err())
require.EqualValues(t, 0, rdb.Do(ctx, "GEORADIUS", "src", 15, 37, 88, "m", "store", "dst").Val())
require.EqualValues(t, 0, rdb.Exists(ctx, "dst").Val())

// The search result set is empty.
require.NoError(t, rdb.Do(ctx, "GEOADD", "src", "10", "10", "Shenzhen").Err())
require.NoError(t, rdb.Do(ctx, "GEOADD", "dst", "20", "20", "Guangzhou").Err())
require.EqualValues(t, 0, rdb.Do(ctx, "GEORADIUS", "src", 15, 37, 88, "m", "store", "dst").Val())
require.EqualValues(t, 0, rdb.Exists(ctx, "dst").Val())
})

t.Run("GEORADIUS missing key", func(t *testing.T) {
require.NoError(t, rdb.Del(ctx, "points").Err())
require.EqualValues(t, []redis.GeoLocation([]redis.GeoLocation{}), rdb.GeoRadius(ctx, "points", 13.361389, 38.115556, &redis.GeoRadiusQuery{Radius: 50, Unit: "km"}).Val())
Expand Down

0 comments on commit 4aa95fa

Please sign in to comment.