Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixed notification DB read after DB was closed #519

Merged
merged 1 commit into from
Sep 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 14 additions & 110 deletions server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,17 +102,20 @@ type leaderController struct {
headOffsetGauge metrics.Gauge
commitOffsetGauge metrics.Gauge
followerAckOffsetGauges map[string]metrics.Gauge

notificationDispatchers map[int64]*notificationDispatcher
}

func NewLeaderController(config Config, namespace string, shardId int64, rpcClient ReplicationRpcProvider, walFactory wal.Factory, kvFactory kv.Factory) (LeaderController, error) {
labels := metrics.LabelsForShard(namespace, shardId)
lc := &leaderController{
status: proto.ServingStatus_NOT_MEMBER,
namespace: namespace,
shardId: shardId,
quorumAckTracker: nil,
rpcClient: rpcClient,
followers: make(map[string]FollowerCursor),
status: proto.ServingStatus_NOT_MEMBER,
namespace: namespace,
shardId: shardId,
quorumAckTracker: nil,
rpcClient: rpcClient,
followers: make(map[string]FollowerCursor),
notificationDispatchers: make(map[int64]*notificationDispatcher),

writeLatencyHisto: metrics.NewLatencyHistogram("oxia_server_leader_write_latency",
"Latency for write operations in the leader", labels),
Expand Down Expand Up @@ -963,110 +966,7 @@ func (lc *leaderController) appendToWalStreamRequest(request *proto.WriteRequest
// ////

func (lc *leaderController) GetNotifications(req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error {
// Create a context for handling this stream
ctx, cancel := context.WithCancel(stream.Context())

go common.DoWithLabels(
ctx,
map[string]string{
"oxia": "dispatch-notifications",
"shard": fmt.Sprintf("%d", lc.shardId),
"peer": common.GetPeer(stream.Context()),
},
func() {
if err := lc.dispatchNotifications(ctx, req, stream); err != nil && !errors.Is(err, context.Canceled) {
lc.log.Warn(
"Failed to dispatch notifications",
slog.Any("error", err),
slog.String("peer", common.GetPeer(stream.Context())),
)
cancel()
}
},
)

select {
case <-lc.ctx.Done():
// Leader is getting closed
cancel()
return lc.ctx.Err()

case <-ctx.Done():
return ctx.Err()

case <-stream.Context().Done():
// The stream is getting closed
cancel()
return stream.Context().Err()
}
}

func (lc *leaderController) dispatchNotifications(ctx context.Context, req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error {
lc.log.Debug(
"Dispatch notifications",
slog.Any("start-offset-exclusive", req.StartOffsetExclusive),
)

var offsetInclusive int64
if req.StartOffsetExclusive != nil {
offsetInclusive = *req.StartOffsetExclusive + 1
} else {
lc.Lock()
qat := lc.quorumAckTracker
lc.Unlock()

if qat == nil {
return errors.New("leader is not yet ready")
}
commitOffset := qat.CommitOffset()

// The client is creating a new notification stream and wants to receive the notification from the next
// entry that will be written.
// In order to ensure the client will positioned on a given offset, we need to send a first "dummy"
// notification. The client will wait for this first notification before making the notification
// channel available to the application
lc.log.Debug(
"Sending first dummy notification",
slog.Int64("commit-offset", commitOffset),
)
if err := stream.Send(&proto.NotificationBatch{
Shard: lc.shardId,
Offset: commitOffset,
Timestamp: 0,
Notifications: nil,
}); err != nil {
return err
}

offsetInclusive = commitOffset + 1
}

return lc.iterateOverNotifications(ctx, stream, offsetInclusive)
}

func (lc *leaderController) iterateOverNotifications(ctx context.Context, stream proto.OxiaClient_GetNotificationsServer, startOffsetInclusive int64) error {
offsetInclusive := startOffsetInclusive
for ctx.Err() == nil {
notifications, err := lc.db.ReadNextNotifications(ctx, offsetInclusive)
if err != nil {
return err
}

lc.log.Debug(
"Got a new list of notification batches",
slog.Int("list-size", len(notifications)),
)

for _, n := range notifications {
if err := stream.Send(n); err != nil {
return err
}
}

offsetInclusive += int64(len(notifications))
}

return ctx.Err()
return startNotificationDispatcher(lc, req, stream)
}

func (lc *leaderController) isClosed() bool {
Expand Down Expand Up @@ -1098,6 +998,10 @@ func (lc *leaderController) close() error {

err = lc.sessionManager.Close()

for _, nd := range lc.notificationDispatchers {
nd.close()
}

if lc.wal != nil {
err = multierr.Append(err, lc.wal.Close())
lc.wal = nil
Expand Down
175 changes: 175 additions & 0 deletions server/notifications_dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,175 @@
// Copyright 2023 StreamNative, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package server

import (
"context"
"fmt"
"log/slog"
"sync/atomic"

"github.com/pkg/errors"

"github.com/streamnative/oxia/common"
"github.com/streamnative/oxia/proto"
)

type notificationDispatcher struct {
lc *leaderController
id int64
req *proto.NotificationsRequest
stream proto.OxiaClient_GetNotificationsServer

ctx context.Context
cancel context.CancelFunc

closeCh chan any
log *slog.Logger
}

var notificationDispatcherIdGen atomic.Int64

func startNotificationDispatcher(lc *leaderController, req *proto.NotificationsRequest, stream proto.OxiaClient_GetNotificationsServer) error {
nd := &notificationDispatcher{
lc: lc,
id: notificationDispatcherIdGen.Add(1),
req: req,
stream: stream,
log: lc.log.With(slog.String("component", "notification-dispatcher")),
closeCh: make(chan any),
}

lc.Lock()
lc.notificationDispatchers[nd.id] = nd
lc.Unlock()

// Create a context for handling this stream
nd.ctx, nd.cancel = context.WithCancel(stream.Context())

go common.DoWithLabels(
nd.ctx,
map[string]string{
"oxia": "dispatch-notifications",
"shard": fmt.Sprintf("%d", lc.shardId),
"peer": common.GetPeer(stream.Context()),
},
func() {
if err := nd.dispatchNotifications(); err != nil && !errors.Is(err, context.Canceled) {
nd.log.Warn(
"Failed to dispatch notifications",
slog.Any("error", err),
slog.String("peer", common.GetPeer(stream.Context())),
)
nd.cancel()
}

close(nd.closeCh)

// Clean up dispatcher for leader controller map
nd.lc.Lock()
delete(nd.lc.notificationDispatchers, nd.id)
nd.lc.Unlock()
},
)

select {
case <-lc.ctx.Done():
// Leader is getting closed
nd.cancel()
return lc.ctx.Err()

case <-nd.ctx.Done():
return nd.ctx.Err()

case <-stream.Context().Done():
// The stream is getting closed
nd.cancel()
return stream.Context().Err()
}
}

func (nd *notificationDispatcher) dispatchNotifications() error {
nd.log.Debug(
"Dispatch notifications",
slog.Any("start-offset-exclusive", nd.req.StartOffsetExclusive),
)

var offsetInclusive int64
if nd.req.StartOffsetExclusive != nil {
offsetInclusive = *nd.req.StartOffsetExclusive + 1
} else {
nd.lc.Lock()
qat := nd.lc.quorumAckTracker
nd.lc.Unlock()

if qat == nil {
return errors.New("leader is not yet ready")
}
commitOffset := qat.CommitOffset()

// The client is creating a new notification stream and wants to receive the notification from the next
// entry that will be written.
// In order to ensure the client will positioned on a given offset, we need to send a first "dummy"
// notification. The client will wait for this first notification before making the notification
// channel available to the application
nd.log.Debug(
"Sending first dummy notification",
slog.Int64("commit-offset", commitOffset),
)
if err := nd.stream.Send(&proto.NotificationBatch{
Shard: nd.lc.shardId,
Offset: commitOffset,
Timestamp: 0,
Notifications: nil,
}); err != nil {
return err
}

offsetInclusive = commitOffset + 1
}

return nd.iterateOverNotifications(offsetInclusive)
}

func (nd *notificationDispatcher) iterateOverNotifications(startOffsetInclusive int64) error {
lc := nd.lc
offsetInclusive := startOffsetInclusive
for nd.ctx.Err() == nil {
notifications, err := lc.db.ReadNextNotifications(nd.ctx, offsetInclusive)
if err != nil {
return err
}

nd.log.Debug(
"Got a new list of notification batches",
slog.Int("list-size", len(notifications)),
)

for _, n := range notifications {
if err := nd.stream.Send(n); err != nil {
return err
}
}

offsetInclusive += int64(len(notifications))
}

return nd.ctx.Err()
}

func (nd *notificationDispatcher) close() {
// Wait for dispatcher stream to be fully closed
<-nd.closeCh
}
Loading