-
Notifications
You must be signed in to change notification settings - Fork 473
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
feat(command): Implement the new command POLLUPDATES for polling upda…
…tes by sequence (#2472) As proposed in issue #2469, we would like to add a new command for polling updates from Kvrocks. The main purpose is to allow to implement features like CDC(Change Stream Capture) without running an agent alongside Kvrocks instances to make it easier to operate. The following is the command format: ```shell POLLUPDATES <Sequence Number> [MAX <N>] [STRICT] [FORMAT <RAW>] ``` - `Sequence Number` represents the start sequence of the polling operation and it’s a required argument. - `MAX` represents the maximum number of items that can be retrieved, it’s an optional argument and uses `16` as the default value if it’s missing - `STRICT` is set means the update’s sequence MUST be exactly equal to the sequence number, it’s an optional argument. `GetUpdatesSince` will return the first available sequence if the sequence number is non-existent, so we allow users to specify if required to match the input sequence number. The output contains the following sections: - last_sequence - updates - next_sequence For example, we assume the DB's latest sequence is 100 and we send the command: `POLLUPDATES 90 MAX 3 FORMAT RAW`, it will return the following response: ```shell "latest_sequence" 100 "updates" {batch-0} {batch-1} {batch-2} "next_sequence" 93 ``` This will close #2469 Co-authored-by: Twice <twice@apache.org>
- Loading branch information
1 parent
7f812c7
commit 275ab32
Showing
7 changed files
with
269 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,169 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, | ||
* software distributed under the License is distributed on an | ||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||
* KIND, either express or implied. See the License for the | ||
* specific language governing permissions and limitations | ||
* under the License. | ||
*/ | ||
|
||
package server | ||
|
||
import ( | ||
"context" | ||
"encoding/hex" | ||
"fmt" | ||
"strconv" | ||
"testing" | ||
|
||
"github.com/apache/kvrocks/tests/gocase/util" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
type PollUpdatesResult struct { | ||
LatestSeq int64 | ||
NextSeq int64 | ||
Updates []string | ||
} | ||
|
||
func sliceToPollUpdatesResult(t *testing.T, slice []interface{}) *PollUpdatesResult { | ||
itemCount := 6 | ||
require.Len(t, slice, itemCount) | ||
var latestSeq, nextSeq int64 | ||
|
||
updates := make([]string, 0) | ||
for i := 0; i < itemCount; i += 2 { | ||
key := slice[i].(string) | ||
switch key { | ||
case "latest_sequence": | ||
latestSeq = slice[i+1].(int64) | ||
case "next_sequence": | ||
nextSeq = slice[i+1].(int64) | ||
case "updates": | ||
fields := slice[i+1].([]interface{}) | ||
for _, field := range fields { | ||
str, ok := field.(string) | ||
require.True(t, ok) | ||
updates = append(updates, str) | ||
} | ||
default: | ||
require.Fail(t, fmt.Sprintf("unknown key: %s", key)) | ||
} | ||
} | ||
|
||
return &PollUpdatesResult{ | ||
LatestSeq: latestSeq, | ||
NextSeq: nextSeq, | ||
Updates: updates, | ||
} | ||
} | ||
|
||
func TestPollUpdates_Basic(t *testing.T) { | ||
ctx := context.Background() | ||
|
||
srv0 := util.StartServer(t, map[string]string{}) | ||
defer srv0.Close() | ||
rdb0 := srv0.NewClient() | ||
defer func() { require.NoError(t, rdb0.Close()) }() | ||
|
||
srv1 := util.StartServer(t, map[string]string{}) | ||
defer srv1.Close() | ||
rdb1 := srv1.NewClient() | ||
defer func() { require.NoError(t, rdb1.Close()) }() | ||
|
||
t.Run("Make sure the command POLLUPDATES works well", func(t *testing.T) { | ||
for i := 0; i < 10; i++ { | ||
rdb0.Set(ctx, fmt.Sprintf("key-%d", i), i, 0) | ||
} | ||
|
||
updates := make([]string, 0) | ||
slice, err := rdb0.Do(ctx, "POLLUPDATES", 0, "MAX", 6).Slice() | ||
require.NoError(t, err) | ||
pollUpdates := sliceToPollUpdatesResult(t, slice) | ||
require.EqualValues(t, 10, pollUpdates.LatestSeq) | ||
require.EqualValues(t, 6, pollUpdates.NextSeq) | ||
require.Len(t, pollUpdates.Updates, 6) | ||
updates = append(updates, pollUpdates.Updates...) | ||
|
||
slice, err = rdb0.Do(ctx, "POLLUPDATES", pollUpdates.NextSeq, "MAX", 6).Slice() | ||
require.NoError(t, err) | ||
pollUpdates = sliceToPollUpdatesResult(t, slice) | ||
require.EqualValues(t, 10, pollUpdates.LatestSeq) | ||
require.EqualValues(t, 10, pollUpdates.NextSeq) | ||
require.Len(t, pollUpdates.Updates, 4) | ||
updates = append(updates, pollUpdates.Updates...) | ||
|
||
for i := 0; i < 10; i++ { | ||
batch, err := hex.DecodeString(updates[i]) | ||
require.NoError(t, err) | ||
applied, err := rdb1.Do(ctx, "APPLYBATCH", batch).Bool() | ||
require.NoError(t, err) | ||
require.True(t, applied) | ||
require.EqualValues(t, strconv.Itoa(i), rdb1.Get(ctx, fmt.Sprintf("key-%d", i)).Val()) | ||
} | ||
}) | ||
|
||
t.Run("Runs POLLUPDATES with invalid arguments", func(t *testing.T) { | ||
require.ErrorContains(t, rdb0.Do(ctx, "POLLUPDATES", 0, "MAX", -1).Err(), | ||
"ERR out of numeric range") | ||
require.ErrorContains(t, rdb0.Do(ctx, "POLLUPDATES", 0, "MAX", 1001).Err(), | ||
"ERR out of numeric range") | ||
require.ErrorContains(t, rdb0.Do(ctx, "POLLUPDATES", 0, "FORMAT", "COMMAND").Err(), | ||
"ERR invalid FORMAT option, only support RAW") | ||
require.ErrorContains(t, rdb0.Do(ctx, "POLLUPDATES", 12, "FORMAT", "RAW").Err(), | ||
"ERR next sequence is out of range") | ||
require.Error(t, rdb0.Do(ctx, "POLLUPDATES", 1, "FORMAT", "EXTRA").Err()) | ||
}) | ||
} | ||
|
||
func TestPollUpdates_WithStrict(t *testing.T) { | ||
ctx := context.Background() | ||
|
||
srv0 := util.StartServer(t, map[string]string{}) | ||
defer srv0.Close() | ||
rdb0 := srv0.NewClient() | ||
defer func() { require.NoError(t, rdb0.Close()) }() | ||
|
||
srv1 := util.StartServer(t, map[string]string{}) | ||
defer srv1.Close() | ||
rdb1 := srv1.NewClient() | ||
defer func() { require.NoError(t, rdb1.Close()) }() | ||
|
||
// The latest sequence is 2 after running the HSET command, 1 for the metadata and 1 for the field | ||
require.NoError(t, rdb0.HSet(ctx, "h0", "f0", "v0").Err()) | ||
// The latest sequence is 3 after running the SET command | ||
require.NoError(t, rdb0.Set(ctx, "k0", "v0", 0).Err()) | ||
|
||
// PollUpdates with strict mode should return an error if the sequence number is mismatched | ||
err := rdb0.Do(ctx, "POLLUPDATES", 1, "MAX", 1, "STRICT").Err() | ||
require.ErrorContains(t, err, "ERR mismatched sequence number") | ||
|
||
// Works well if the sequence number is mismatched but not in strict mode | ||
require.NoError(t, rdb0.Do(ctx, "POLLUPDATES", 1, "MAX", 1).Err()) | ||
|
||
slice, err := rdb0.Do(ctx, "POLLUPDATES", 0, "MAX", 10, "STRICT").Slice() | ||
require.NoError(t, err) | ||
pollUpdates := sliceToPollUpdatesResult(t, slice) | ||
require.EqualValues(t, 3, pollUpdates.LatestSeq) | ||
require.EqualValues(t, 3, pollUpdates.NextSeq) | ||
require.Len(t, pollUpdates.Updates, 2) | ||
|
||
for _, update := range pollUpdates.Updates { | ||
batch, err := hex.DecodeString(update) | ||
require.NoError(t, err) | ||
require.NoError(t, rdb1.Do(ctx, "APPLYBATCH", batch).Err()) | ||
} | ||
|
||
require.Equal(t, "v0", rdb1.Get(ctx, "k0").Val()) | ||
require.Equal(t, "v0", rdb1.HGet(ctx, "h0", "f0").Val()) | ||
} |