From 2fc170329ae0754862b66faca73972b3f0027b86 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 25 Sep 2024 10:14:06 +0800 Subject: [PATCH 1/5] fix(session): override shadowKey by expired session --- server/session_manager.go | 29 +++++++++++++++++------------ server/session_manager_test.go | 12 ++++++++++++ 2 files changed, 29 insertions(+), 12 deletions(-) diff --git a/server/session_manager.go b/server/session_manager.go index 91cc581f..4407368a 100644 --- a/server/session_manager.go +++ b/server/session_manager.go @@ -307,7 +307,7 @@ type updateCallback struct{} var SessionUpdateOperationCallback kv.UpdateOperationCallback = &updateCallback{} -func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.PutRequest, _ *proto.StorageEntry) (proto.Status, error) { +func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.PutRequest, existingEntry *proto.StorageEntry) (proto.Status, error) { var _, closer, err = batch.Get(SessionKey(SessionId(*request.SessionId))) if err != nil { if errors.Is(err, kv.ErrKeyNotFound) { @@ -318,6 +318,10 @@ func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.Pu if err = closer.Close(); err != nil { return proto.Status_SESSION_DOES_NOT_EXIST, err } + // delete existing session shadow + if status, err := deleteShadow(batch, request.Key, existingEntry); err != nil { + return status, err + } // Create the session shadow entry err = batch.Put(ShadowKey(SessionId(*request.SessionId), request.Key), []byte{}) if err != nil { @@ -328,26 +332,27 @@ func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.Pu } func (c *updateCallback) OnPut(batch kv.WriteBatch, request *proto.PutRequest, existingEntry *proto.StorageEntry) (proto.Status, error) { - if existingEntry != nil && existingEntry.SessionId != nil { - // We are overwriting an ephemeral value, let's delete its shadow + switch { + // override by normal operation + case request.SessionId == nil: if status, err := deleteShadow(batch, request.Key, existingEntry); err != nil { return status, err } - } - - sessionId := request.SessionId - if sessionId != nil { - // We are adding an ephemeral value, let's check if the session exists + // override by session operation + case request.SessionId != nil: return c.OnPutWithinSession(batch, request, existingEntry) } return proto.Status_OK, nil } func deleteShadow(batch kv.WriteBatch, key string, existingEntry *proto.StorageEntry) (proto.Status, error) { - existingSessionId := SessionId(*existingEntry.SessionId) - err := batch.Delete(ShadowKey(existingSessionId, key)) - if err != nil && !errors.Is(err, kv.ErrKeyNotFound) { - return proto.Status_SESSION_DOES_NOT_EXIST, err + // We are overwriting an ephemeral value, let's delete its shadow + if existingEntry != nil && existingEntry.SessionId != nil { + existingSessionId := SessionId(*existingEntry.SessionId) + err := batch.Delete(ShadowKey(existingSessionId, key)) + if err != nil && !errors.Is(err, kv.ErrKeyNotFound) { + return proto.Status_SESSION_DOES_NOT_EXIST, err + } } return proto.Status_OK, nil } diff --git a/server/session_manager_test.go b/server/session_manager_test.go index 741162f6..5046ed1c 100644 --- a/server/session_manager_test.go +++ b/server/session_manager_test.go @@ -174,6 +174,18 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) { assert.NoError(t, err) assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status) + writeBatch = mockWriteBatch{ + "a/b/c": []byte{}, + SessionKey(SessionId(sessionId-1)) + "a%2Fb%2Fc": []byte{}, + SessionKey(SessionId(sessionId - 1)): []byte{}, + } + status, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, se) + assert.NoError(t, err) + assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status) + _, closer, err := writeBatch.Get(SessionKey(SessionId(sessionId-1)) + "a%2Fb%2Fc") + assert.NoError(t, err) + closer.Close() + expectedErr := errors.New("error coming from the DB on read") writeBatch = mockWriteBatch{ SessionKey(SessionId(sessionId)): expectedErr, From b5c4ebca88111fadd0bab7fd9655aedf6804c807 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 25 Sep 2024 10:42:23 +0800 Subject: [PATCH 2/5] fix --- server/session_manager_test.go | 43 ++++++++++++++++++++++++++-------- 1 file changed, 33 insertions(+), 10 deletions(-) diff --git a/server/session_manager_test.go b/server/session_manager_test.go index 5046ed1c..69c25c88 100644 --- a/server/session_manager_test.go +++ b/server/session_manager_test.go @@ -18,6 +18,7 @@ import ( "context" "errors" "io" + "k8s.io/utils/pointer" "testing" "time" @@ -137,7 +138,7 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) { writeBatch = mockWriteBatch{ "a/b/c": []byte{}, - SessionKey(SessionId(sessionId-1)) + "a%2Fb%2Fc": []byte{}, + ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{}, } se := &proto.StorageEntry{ @@ -156,9 +157,9 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) { writeBatch = mockWriteBatch{ "a/b/c": []byte{}, - SessionKey(SessionId(sessionId-1)) + "a%2Fb%2Fc": []byte{}, - SessionKey(SessionId(sessionId - 1)): []byte{}, - SessionKey(SessionId(sessionId)): []byte{}, + ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{}, + SessionKey(SessionId(sessionId - 1)): []byte{}, + SessionKey(SessionId(sessionId)): []byte{}, } status, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, se) @@ -176,13 +177,35 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) { writeBatch = mockWriteBatch{ "a/b/c": []byte{}, - SessionKey(SessionId(sessionId-1)) + "a%2Fb%2Fc": []byte{}, - SessionKey(SessionId(sessionId - 1)): []byte{}, + ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{}, + SessionKey(SessionId(sessionId - 1)): []byte{}, } + + // session (sessionID -1) entry + se = &proto.StorageEntry{ + Value: []byte("value"), + VersionId: 0, + CreationTimestamp: 0, + ModificationTimestamp: 0, + SessionId: pointer.Int64(sessionId - 1), + } + // sessionID has expired + writeBatch = mockWriteBatch{ + "a/b/c": []byte{}, // real data + ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{}, // shadow key + SessionKey(SessionId(sessionId - 1)): []byte{}, // session + } + // try to use current session override the (sessionID -1) + sessionPutRequest = &proto.PutRequest{ + Key: "a/b/c", + Value: []byte("b"), + SessionId: &sessionId, + } + status, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, se) assert.NoError(t, err) assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status) - _, closer, err := writeBatch.Get(SessionKey(SessionId(sessionId-1)) + "a%2Fb%2Fc") + _, closer, err := writeBatch.Get(ShadowKey(SessionId(sessionId-1), "a/b/c")) assert.NoError(t, err) closer.Close() @@ -199,14 +222,14 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) { status, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, nil) assert.NoError(t, err) assert.Equal(t, proto.Status_OK, status) - sessionKey := SessionKey(SessionId(sessionId)) + "/a%2Fb%2Fc" - _, found := writeBatch[sessionKey] + sessionShadowKey := ShadowKey(SessionId(sessionId), "a/b/c") + _, found := writeBatch[sessionShadowKey] assert.True(t, found) expectedErr = errors.New("error coming from the DB on write") writeBatch = mockWriteBatch{ SessionKey(SessionId(sessionId)): []byte{}, - sessionKey: expectedErr, + sessionShadowKey: expectedErr, } _, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, nil) assert.ErrorIs(t, err, expectedErr) From 9e0f25eac54ebec2c4a5b1996c2db2f8dffc3357 Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 25 Sep 2024 10:59:13 +0800 Subject: [PATCH 3/5] fix unpexted import --- server/session_manager_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/server/session_manager_test.go b/server/session_manager_test.go index 69c25c88..972e9fa8 100644 --- a/server/session_manager_test.go +++ b/server/session_manager_test.go @@ -18,7 +18,6 @@ import ( "context" "errors" "io" - "k8s.io/utils/pointer" "testing" "time" @@ -182,12 +181,13 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) { } // session (sessionID -1) entry + tmpSessionId := sessionId - 1 se = &proto.StorageEntry{ Value: []byte("value"), VersionId: 0, CreationTimestamp: 0, ModificationTimestamp: 0, - SessionId: pointer.Int64(sessionId - 1), + SessionId: &tmpSessionId, } // sessionID has expired writeBatch = mockWriteBatch{ From 60cdff018f0367c17521f9b6104d2ac715fb524f Mon Sep 17 00:00:00 2001 From: Qiang Zhao Date: Wed, 25 Sep 2024 11:08:14 +0800 Subject: [PATCH 4/5] remove useless assignment --- server/session_manager_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/server/session_manager_test.go b/server/session_manager_test.go index 972e9fa8..64cdee1d 100644 --- a/server/session_manager_test.go +++ b/server/session_manager_test.go @@ -174,12 +174,6 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) { assert.NoError(t, err) assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status) - writeBatch = mockWriteBatch{ - "a/b/c": []byte{}, - ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{}, - SessionKey(SessionId(sessionId - 1)): []byte{}, - } - // session (sessionID -1) entry tmpSessionId := sessionId - 1 se = &proto.StorageEntry{ From 841dd4f5f55302fb7cf7fdd06ac4b1a556b7622c Mon Sep 17 00:00:00 2001 From: Matteo Merli Date: Wed, 25 Sep 2024 10:37:36 -0700 Subject: [PATCH 5/5] Added more explicit tests --- cmd/main.go | 3 +- cmd/wal/cmd.go | 1 + cmd/wal/truncate/cmd.go | 6 ++- server/leader_controller_test.go | 76 ++++++++++++++++++++++++++++++++ server/session_manager_test.go | 33 ++++++++++++++ 5 files changed, 116 insertions(+), 3 deletions(-) diff --git a/cmd/main.go b/cmd/main.go index 539e8288..53fbb8e2 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -17,9 +17,10 @@ package main import ( "context" "fmt" - "github.com/streamnative/oxia/cmd/wal" "os" + "github.com/streamnative/oxia/cmd/wal" + "github.com/spf13/cobra" "go.uber.org/automaxprocs/maxprocs" diff --git a/cmd/wal/cmd.go b/cmd/wal/cmd.go index f4342696..20523490 100644 --- a/cmd/wal/cmd.go +++ b/cmd/wal/cmd.go @@ -16,6 +16,7 @@ package wal import ( "github.com/spf13/cobra" + "github.com/streamnative/oxia/cmd/wal/common" "github.com/streamnative/oxia/cmd/wal/truncate" ) diff --git a/cmd/wal/truncate/cmd.go b/cmd/wal/truncate/cmd.go index b1e5b46e..60e10907 100644 --- a/cmd/wal/truncate/cmd.go +++ b/cmd/wal/truncate/cmd.go @@ -15,11 +15,13 @@ package truncate import ( + "log/slog" + "math" + "github.com/spf13/cobra" + "github.com/streamnative/oxia/cmd/wal/common" "github.com/streamnative/oxia/server/wal" - "log/slog" - "math" ) type truncateOptions struct { diff --git a/server/leader_controller_test.go b/server/leader_controller_test.go index 33ee94fb..8c559ced 100644 --- a/server/leader_controller_test.go +++ b/server/leader_controller_test.go @@ -1289,3 +1289,79 @@ func TestLeaderController_NotificationsDisabled(t *testing.T) { assert.NoError(t, kvFactory.Close()) assert.NoError(t, walFactory.Close()) } + +func TestLeaderController_DuplicateNewTerm_WithSession(t *testing.T) { + var shard int64 = 2 + + kvFactory, err := kv.NewPebbleKVFactory(testKVOptions) + assert.NoError(t, err) + walFactory := newTestWalFactory(t) + + lc, err := NewLeaderController(Config{}, common.DefaultNamespace, shard, newMockRpcClient(), walFactory, kvFactory) + assert.NoError(t, err) + + _, err = lc.NewTerm(&proto.NewTermRequest{Shard: shard, Term: 1}) + assert.NoError(t, err) + + _, err = lc.BecomeLeader(context.Background(), &proto.BecomeLeaderRequest{ + Shard: shard, + Term: 1, + ReplicationFactor: 1, + FollowerMaps: nil, + }) + assert.NoError(t, err) + + csResult, err := lc.CreateSession(&proto.CreateSessionRequest{ + Shard: shard, + SessionTimeoutMs: 5_000, + ClientIdentity: "my-identity", + }) + assert.NoError(t, err) + + sessionId := csResult.SessionId + invalidSessionId := int64(5) + + key := "/namespace/sn/system/0xe0000000_0xf0000000" + + // Write entry + _, err = lc.Write(context.Background(), &proto.WriteRequest{ + Shard: &shard, + Puts: []*proto.PutRequest{{ + Key: key, + Value: []byte("value-a"), + SessionId: &invalidSessionId, + }}, + }) + assert.NoError(t, err) + + // Start a new term on the same leader + _, err = lc.NewTerm(&proto.NewTermRequest{Shard: shard, Term: 2}) + assert.NoError(t, err) + + _, err = lc.BecomeLeader(context.Background(), &proto.BecomeLeaderRequest{ + Shard: shard, + Term: 2, + ReplicationFactor: 1, + FollowerMaps: nil, + }) + assert.NoError(t, err) + + _, err = lc.CloseSession(&proto.CloseSessionRequest{ + Shard: shard, + SessionId: sessionId, + }) + assert.NoError(t, err) + + // Read entry + r := <-lc.Read(context.Background(), &proto.ReadRequest{ + Shard: &shard, + Gets: []*proto.GetRequest{{Key: key}}, + }) + + assert.NoError(t, r.Err) + assert.Equal(t, proto.Status_KEY_NOT_FOUND, r.Response.Status) + + assert.NoError(t, lc.Close()) + assert.NoError(t, kvFactory.Close()) + assert.NoError(t, walFactory.Close()) +} diff --git a/server/session_manager_test.go b/server/session_manager_test.go index 71f0fd72..df24fbf3 100644 --- a/server/session_manager_test.go +++ b/server/session_manager_test.go @@ -607,3 +607,36 @@ func reopenLeaderController(t *testing.T, kvFactory kv.Factory, walFactory wal.F return lc.(*leaderController) } + +func TestSession_PutWithExpiredSession(t *testing.T) { + var oldSessionId int64 = 100 + var newSessionId int64 = 101 + + se := &proto.StorageEntry{ + Value: []byte("value"), + VersionId: 0, + CreationTimestamp: 0, + ModificationTimestamp: 0, + SessionId: &oldSessionId, + } + // sessionID has expired + writeBatch := mockWriteBatch{ + "a/b/c": []byte{}, // real data + ShadowKey(SessionId(oldSessionId), "a/b/c"): []byte{}, // shadow key + SessionKey(SessionId(oldSessionId)): []byte{}, // session + } + // try to use current session override the (sessionID -1) + sessionPutRequest := &proto.PutRequest{ + Key: "a/b/c", + Value: []byte("b"), + SessionId: &newSessionId, + } + + status, err := SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, se) + assert.NoError(t, err) + assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status) + + _, closer, err := writeBatch.Get(ShadowKey(SessionId(oldSessionId), "a/b/c")) + assert.NoError(t, err) + assert.NoError(t, closer.Close()) +}