Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(session): expired session write request will delete other session shadow key #534

Merged
merged 6 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,10 @@ package main
import (
"context"
"fmt"
"github.com/streamnative/oxia/cmd/wal"
"os"

"github.com/streamnative/oxia/cmd/wal"

"github.com/spf13/cobra"
"go.uber.org/automaxprocs/maxprocs"

Expand Down
1 change: 1 addition & 0 deletions cmd/wal/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package wal

import (
"github.com/spf13/cobra"

"github.com/streamnative/oxia/cmd/wal/common"
"github.com/streamnative/oxia/cmd/wal/truncate"
)
Expand Down
6 changes: 4 additions & 2 deletions cmd/wal/truncate/cmd.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
package truncate

import (
"log/slog"
"math"

"github.com/spf13/cobra"

"github.com/streamnative/oxia/cmd/wal/common"
"github.com/streamnative/oxia/server/wal"
"log/slog"
"math"
)

type truncateOptions struct {
Expand Down
76 changes: 76 additions & 0 deletions server/leader_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1289,3 +1289,79 @@ func TestLeaderController_NotificationsDisabled(t *testing.T) {
assert.NoError(t, kvFactory.Close())
assert.NoError(t, walFactory.Close())
}

func TestLeaderController_DuplicateNewTerm_WithSession(t *testing.T) {
var shard int64 = 2

kvFactory, err := kv.NewPebbleKVFactory(testKVOptions)
assert.NoError(t, err)
walFactory := newTestWalFactory(t)

lc, err := NewLeaderController(Config{}, common.DefaultNamespace, shard, newMockRpcClient(), walFactory, kvFactory)
assert.NoError(t, err)

_, err = lc.NewTerm(&proto.NewTermRequest{Shard: shard, Term: 1})
assert.NoError(t, err)

_, err = lc.BecomeLeader(context.Background(), &proto.BecomeLeaderRequest{
Shard: shard,
Term: 1,
ReplicationFactor: 1,
FollowerMaps: nil,
})
assert.NoError(t, err)

csResult, err := lc.CreateSession(&proto.CreateSessionRequest{
Shard: shard,
SessionTimeoutMs: 5_000,
ClientIdentity: "my-identity",
})
assert.NoError(t, err)

sessionId := csResult.SessionId
invalidSessionId := int64(5)

key := "/namespace/sn/system/0xe0000000_0xf0000000"

// Write entry
_, err = lc.Write(context.Background(), &proto.WriteRequest{
Shard: &shard,
Puts: []*proto.PutRequest{{
Key: key,
Value: []byte("value-a"),
SessionId: &invalidSessionId,
}},
})
assert.NoError(t, err)

// Start a new term on the same leader
_, err = lc.NewTerm(&proto.NewTermRequest{Shard: shard, Term: 2})
assert.NoError(t, err)

_, err = lc.BecomeLeader(context.Background(), &proto.BecomeLeaderRequest{
Shard: shard,
Term: 2,
ReplicationFactor: 1,
FollowerMaps: nil,
})
assert.NoError(t, err)

_, err = lc.CloseSession(&proto.CloseSessionRequest{
Shard: shard,
SessionId: sessionId,
})
assert.NoError(t, err)

// Read entry
r := <-lc.Read(context.Background(), &proto.ReadRequest{
Shard: &shard,
Gets: []*proto.GetRequest{{Key: key}},
})

assert.NoError(t, r.Err)
assert.Equal(t, proto.Status_KEY_NOT_FOUND, r.Response.Status)

assert.NoError(t, lc.Close())
assert.NoError(t, kvFactory.Close())
assert.NoError(t, walFactory.Close())
}
29 changes: 17 additions & 12 deletions server/session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ type updateCallback struct{}

var SessionUpdateOperationCallback kv.UpdateOperationCallback = &updateCallback{}

func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.PutRequest, _ *proto.StorageEntry) (proto.Status, error) {
func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.PutRequest, existingEntry *proto.StorageEntry) (proto.Status, error) {
var _, closer, err = batch.Get(SessionKey(SessionId(*request.SessionId)))
if err != nil {
if errors.Is(err, kv.ErrKeyNotFound) {
Expand All @@ -320,6 +320,10 @@ func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.Pu
if err = closer.Close(); err != nil {
return proto.Status_SESSION_DOES_NOT_EXIST, err
}
// delete existing session shadow
if status, err := deleteShadow(batch, request.Key, existingEntry); err != nil {
return status, err
}
// Create the session shadow entry
err = batch.Put(ShadowKey(SessionId(*request.SessionId), request.Key), []byte{})
if err != nil {
Expand All @@ -330,26 +334,27 @@ func (*updateCallback) OnPutWithinSession(batch kv.WriteBatch, request *proto.Pu
}

func (c *updateCallback) OnPut(batch kv.WriteBatch, request *proto.PutRequest, existingEntry *proto.StorageEntry) (proto.Status, error) {
if existingEntry != nil && existingEntry.SessionId != nil {
// We are overwriting an ephemeral value, let's delete its shadow
switch {
// override by normal operation
case request.SessionId == nil:
if status, err := deleteShadow(batch, request.Key, existingEntry); err != nil {
return status, err
}
}

sessionId := request.SessionId
if sessionId != nil {
// We are adding an ephemeral value, let's check if the session exists
// override by session operation
case request.SessionId != nil:
return c.OnPutWithinSession(batch, request, existingEntry)
}
return proto.Status_OK, nil
}

func deleteShadow(batch kv.WriteBatch, key string, existingEntry *proto.StorageEntry) (proto.Status, error) {
existingSessionId := SessionId(*existingEntry.SessionId)
err := batch.Delete(ShadowKey(existingSessionId, key))
if err != nil && !errors.Is(err, kv.ErrKeyNotFound) {
return proto.Status_SESSION_DOES_NOT_EXIST, err
// We are overwriting an ephemeral value, let's delete its shadow
if existingEntry != nil && existingEntry.SessionId != nil {
existingSessionId := SessionId(*existingEntry.SessionId)
err := batch.Delete(ShadowKey(existingSessionId, key))
if err != nil && !errors.Is(err, kv.ErrKeyNotFound) {
return proto.Status_SESSION_DOES_NOT_EXIST, err
}
}
return proto.Status_OK, nil
}
Expand Down
76 changes: 69 additions & 7 deletions server/session_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) {

writeBatch = mockWriteBatch{
"a/b/c": []byte{},
SessionKey(SessionId(sessionId-1)) + "a%2Fb%2Fc": []byte{},
ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{},
}

se := &proto.StorageEntry{
Expand All @@ -160,9 +160,9 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) {

writeBatch = mockWriteBatch{
"a/b/c": []byte{},
SessionKey(SessionId(sessionId-1)) + "a%2Fb%2Fc": []byte{},
SessionKey(SessionId(sessionId - 1)): []byte{},
SessionKey(SessionId(sessionId)): []byte{},
ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{},
SessionKey(SessionId(sessionId - 1)): []byte{},
SessionKey(SessionId(sessionId)): []byte{},
}

status, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, se)
Expand All @@ -178,6 +178,35 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status)

// session (sessionID -1) entry
tmpSessionId := sessionId - 1
se = &proto.StorageEntry{
Value: []byte("value"),
VersionId: 0,
CreationTimestamp: 0,
ModificationTimestamp: 0,
SessionId: &tmpSessionId,
}
// sessionID has expired
writeBatch = mockWriteBatch{
"a/b/c": []byte{}, // real data
ShadowKey(SessionId(sessionId-1), "a/b/c"): []byte{}, // shadow key
SessionKey(SessionId(sessionId - 1)): []byte{}, // session
}
// try to use current session override the (sessionID -1)
sessionPutRequest = &proto.PutRequest{
Key: "a/b/c",
Value: []byte("b"),
SessionId: &sessionId,
}

status, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, se)
assert.NoError(t, err)
assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status)
_, closer, err := writeBatch.Get(ShadowKey(SessionId(sessionId-1), "a/b/c"))
assert.NoError(t, err)
closer.Close()

expectedErr := errors.New("error coming from the DB on read")
writeBatch = mockWriteBatch{
SessionKey(SessionId(sessionId)): expectedErr,
Expand All @@ -191,14 +220,14 @@ func TestSessionUpdateOperationCallback_OnPut(t *testing.T) {
status, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, nil)
assert.NoError(t, err)
assert.Equal(t, proto.Status_OK, status)
sessionKey := SessionKey(SessionId(sessionId)) + "/a%2Fb%2Fc"
_, found := writeBatch[sessionKey]
sessionShadowKey := ShadowKey(SessionId(sessionId), "a/b/c")
_, found := writeBatch[sessionShadowKey]
assert.True(t, found)

expectedErr = errors.New("error coming from the DB on write")
writeBatch = mockWriteBatch{
SessionKey(SessionId(sessionId)): []byte{},
sessionKey: expectedErr,
sessionShadowKey: expectedErr,
}
_, err = SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, nil)
assert.ErrorIs(t, err, expectedErr)
Expand Down Expand Up @@ -578,3 +607,36 @@ func reopenLeaderController(t *testing.T, kvFactory kv.Factory, walFactory wal.F

return lc.(*leaderController)
}

func TestSession_PutWithExpiredSession(t *testing.T) {
var oldSessionId int64 = 100
var newSessionId int64 = 101

se := &proto.StorageEntry{
Value: []byte("value"),
VersionId: 0,
CreationTimestamp: 0,
ModificationTimestamp: 0,
SessionId: &oldSessionId,
}
// sessionID has expired
writeBatch := mockWriteBatch{
"a/b/c": []byte{}, // real data
ShadowKey(SessionId(oldSessionId), "a/b/c"): []byte{}, // shadow key
SessionKey(SessionId(oldSessionId)): []byte{}, // session
}
// try to use current session override the (sessionID -1)
sessionPutRequest := &proto.PutRequest{
Key: "a/b/c",
Value: []byte("b"),
SessionId: &newSessionId,
}

status, err := SessionUpdateOperationCallback.OnPut(writeBatch, sessionPutRequest, se)
assert.NoError(t, err)
assert.Equal(t, proto.Status_SESSION_DOES_NOT_EXIST, status)

_, closer, err := writeBatch.Get(ShadowKey(SessionId(oldSessionId), "a/b/c"))
assert.NoError(t, err)
assert.NoError(t, closer.Close())
}
Loading