Skip to content

Commit

Permalink
feat(logstream): remove deprecated LLSN field from ReplicateRequest
Browse files Browse the repository at this point in the history
This removes the deprecated LLSN field from the ReplicateRequest struct and
updates the relevant code and tests accordingly. The LLSN field was previously
used to send local log sequence numbers to backup replicas but is no longer
needed.
  • Loading branch information
ijsong committed Feb 21, 2025
1 parent 4637fa6 commit c2663cc
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 229 deletions.
10 changes: 0 additions & 10 deletions internal/storagenode/logstream/replicate_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@ import (

"go.uber.org/zap"

"github.com/kakao/varlog/internal/batchlet"
"github.com/kakao/varlog/pkg/rpc"
"github.com/kakao/varlog/pkg/types"
"github.com/kakao/varlog/pkg/util/runner"
"github.com/kakao/varlog/pkg/verrors"
"github.com/kakao/varlog/proto/snpb"
Expand Down Expand Up @@ -107,12 +105,9 @@ func (rc *replicateClient) sendLoop(ctx context.Context) {
defer func() {
_ = rc.streamClient.CloseSend()
}()
// NOTE: To reuse the request struct, we need to initialize the field LLSN.
maxBatchletLength := batchlet.LengthClasses[len(batchlet.LengthClasses)-1]
req := &snpb.ReplicateRequest{
TopicID: rc.lse.tpid,
LogStreamID: rc.lse.lsid,
LLSN: make([]types.LLSN, maxBatchletLength),
}
streamCtx := rc.streamClient.Context()
for {
Expand All @@ -135,11 +130,6 @@ func (rc *replicateClient) sendLoop(ctx context.Context) {
func (rc *replicateClient) sendLoopInternal(_ context.Context, rt *replicateTask, req *snpb.ReplicateRequest) error {
// Remove maxAppendSubBatchSize, since rt already has batched data.
startTime := time.Now()
// TODO(jun): Since (snpb.ReplicateRequest).LLSN is deprecated, it will disappear soon.
// NOTE: We need to copy the LLSN array, since the array is reused.
req.LLSN = req.LLSN[0:len(rt.llsnList)]
copy(req.LLSN, rt.llsnList)
// req.LLSN = rt.llsnList
req.Data = rt.dataList
if len(rt.llsnList) > 0 {
req.BeginLLSN = rt.llsnList[0]
Expand Down
2 changes: 1 addition & 1 deletion internal/storagenode/logstream/replicate_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ func TestReplicateClient(t *testing.T) {
assert.NoError(t, rr.err)
assert.Equal(t, lse.tpid, rr.req.TopicID)
assert.Equal(t, lse.lsid, rr.req.LogStreamID)
assert.Equal(t, []types.LLSN{1}, rr.req.LLSN)
assert.Equal(t, types.LLSN(1), rr.req.BeginLLSN)
assert.Len(t, rr.req.Data, 1)
assert.Empty(t, rr.req.Data[0])
}
5 changes: 0 additions & 5 deletions internal/storagenode/replication_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,11 +173,6 @@ func (rs *replicationServer) replicate(ctx context.Context, requestC <-chan *rep

lse.Metrics().ReplicateServerOperations.Add(1)

// NOTE: In older versions, BeginLLSN is not present. Therefore, we
// need to set BeginLLSN to the first value in the LLSN field.
if rst.req.BeginLLSN.Invalid() && len(rst.req.LLSN) > 0 {
rst.req.BeginLLSN = rst.req.LLSN[0]
}
err = lse.Replicate(ctx, rst.req.BeginLLSN, rst.req.Data)
if err != nil {
rst.release()
Expand Down
Loading

0 comments on commit c2663cc

Please sign in to comment.