diff --git a/server/follower_controller.go b/server/follower_controller.go index 4e2578ba..6fcbcdfa 100644 --- a/server/follower_controller.go +++ b/server/follower_controller.go @@ -337,6 +337,7 @@ func (fc *followerController) Truncate(req *proto.TruncateRequest) (*proto.Trunc return nil, errors.Wrapf(err, "failed to truncate wal. truncate-offset: %d - wal-last-offset: %d", req.HeadEntryId.Offset, fc.wal.LastOffset()) } + fc.lastAppendedOffset = headOffset return &proto.TruncateResponse{ HeadEntryId: &proto.EntryId{ diff --git a/server/follower_controller_test.go b/server/follower_controller_test.go index 4d640a29..8f776fa1 100644 --- a/server/follower_controller_test.go +++ b/server/follower_controller_test.go @@ -87,8 +87,7 @@ func TestFollower(t *testing.T) { wg := common.NewWaitGroup(1) go func() { - err := fc.Replicate(stream) - assert.ErrorIs(t, err, context.Canceled) + _ = fc.Replicate(stream) wg.Done() }() @@ -111,6 +110,56 @@ func TestFollower(t *testing.T) { assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) assert.EqualValues(t, 1, fc.Term()) + // close follower + assert.NoError(t, fc.Close()) + + // new term to test if we can continue replicate messages + fc, err = NewFollowerController(Config{}, common.DefaultNamespace, shardId, walFactory, kvFactory) + assert.NoError(t, err) + assert.Equal(t, proto.ServingStatus_NOT_MEMBER, fc.Status()) + _, err = fc.NewTerm(&proto.NewTermRequest{Term: 2}) + assert.NoError(t, err) + assert.Equal(t, proto.ServingStatus_FENCED, fc.Status()) + assert.EqualValues(t, 2, fc.Term()) + truncateResp, err = fc.Truncate(&proto.TruncateRequest{ + Term: 2, + HeadEntryId: &proto.EntryId{ + Term: 1, + Offset: 0, + }, + }) + assert.NoError(t, err) + assert.EqualValues(t, 2, truncateResp.HeadEntryId.Term) + + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + stream = newMockServerReplicateStream() + wg = common.NewWaitGroup(1) + go func() { + err := fc.Replicate(stream) + assert.ErrorIs(t, err, context.Canceled) + wg.Done() + }() + stream.AddRequest(createAddRequest(t, 2, 0, map[string]string{"a": "0", "b": "1"}, wal.InvalidOffset)) + // Wait for response + response = stream.GetResponse() + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + assert.EqualValues(t, 0, response.Offset) + // Write next entry + stream.AddRequest(createAddRequest(t, 2, 1, map[string]string{"a": "4", "b": "5"}, wal.InvalidOffset)) + + // Wait for response + response = stream.GetResponse() + assert.EqualValues(t, 1, response.Offset) + + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + assert.EqualValues(t, 2, fc.Term()) + + stream.AddRequest(createAddRequest(t, 2, 2, map[string]string{"a": "4", "b": "5"}, wal.InvalidOffset)) + response = stream.GetResponse() + assert.EqualValues(t, 2, response.Offset) + assert.Equal(t, proto.ServingStatus_FOLLOWER, fc.Status()) + assert.EqualValues(t, 2, fc.Term()) + // Double-check the values in the DB // Keys are not there because they were not part of the commit offset dbRes, err := fc.(*followerController).db.Get(&proto.GetRequest{