Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed May 12, 2024
1 parent caedca7 commit 240e132
Show file tree
Hide file tree
Showing 7 changed files with 327 additions and 143 deletions.
20 changes: 10 additions & 10 deletions channel_medium.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ import (
// can provide significant benefits in terms of overall system efficiency and flexibility.
type ChannelMediumOptions struct {
// KeepLatestPublication enables keeping latest publication which was broadcasted to channel subscribers on
// this Node in the channel medium layer in the case when channel history is not used. This is helpful for
// supporting deltas in at most once scenario. Note, for publications with Offset or when previous Publication
// comes from the broker level KeepLatestPublication won't be used.
// this Node in the channel medium layer. This is helpful for supporting deltas in at most once scenario.
KeepLatestPublication bool

// EnablePositionSync when true delegates connection position checks to the channel medium. In that case
Expand All @@ -45,7 +43,7 @@ type ChannelMediumOptions struct {
// to clients when publication contains the entire state. If zero, all publications will be sent to clients
// without delay logic involved on channel medium level. BroadcastDelay option requires (!) EnableQueue to be
// enabled, as we can not afford delays during broadcast from the PUB/SUB layer. BroadcastDelay must not be
// used in channels with positioning/recovery on.
// used in channels with positioning/recovery on since it skips publications.
broadcastDelay time.Duration
}

Expand Down Expand Up @@ -73,7 +71,7 @@ type channelMedium struct {
}

type node interface {
handlePublication(ch string, pub *Publication, sp StreamPosition, prevPub *Publication) error
handlePublication(ch string, sp StreamPosition, pub, prevPub *Publication, memPrevPub *Publication) error
streamTop(ch string, historyMetaTTL time.Duration) (StreamPosition, error)
}

Expand Down Expand Up @@ -149,19 +147,21 @@ func (c *channelMedium) broadcast(qp queuedPub) {
pubToBroadcast = &Publication{Offset: math.MaxUint64}
spToBroadcast.Offset = math.MaxUint64
}

prevPub := qp.prevPub
useInMemoryLatestPub := c.options.KeepLatestPublication && prevPub == nil && !qp.isInsufficientState && qp.pub.Offset == 0
if useInMemoryLatestPub {
prevPub = c.latestPublication
var localPrevPub *Publication
useLocalLatestPub := c.options.KeepLatestPublication && !qp.isInsufficientState
if useLocalLatestPub {
localPrevPub = c.latestPublication
}
if c.options.broadcastDelay > 0 && !c.options.KeepLatestPublication {
prevPub = nil
}
if qp.isInsufficientState {
prevPub = nil
}
_ = c.node.handlePublication(c.channel, pubToBroadcast, spToBroadcast, prevPub)
if useInMemoryLatestPub {
_ = c.node.handlePublication(c.channel, spToBroadcast, pubToBroadcast, prevPub, localPrevPub)
if useLocalLatestPub {
c.latestPublication = qp.pub
}
}
Expand Down
10 changes: 5 additions & 5 deletions channel_medium_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ func setupChannelMedium(t testing.TB, options ChannelMediumOptions, node node) *
}

type mockNode struct {
handlePublicationFunc func(channel string, pub *Publication, sp StreamPosition, prevPub *Publication) error
handlePublicationFunc func(channel string, sp StreamPosition, pub, prevPub, localPrevPub *Publication) error
streamTopFunc func(ch string, historyMetaTTL time.Duration) (StreamPosition, error)
}

func (m *mockNode) handlePublication(channel string, pub *Publication, sp StreamPosition, prevPub *Publication) error {
func (m *mockNode) handlePublication(channel string, sp StreamPosition, pub, prevPub, localPrevPub *Publication) error {
if m.handlePublicationFunc != nil {
return m.handlePublicationFunc(channel, pub, sp, prevPub)
return m.handlePublicationFunc(channel, sp, pub, prevPub, localPrevPub)
}
return nil
}
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestChannelMediumHandlePublication(t *testing.T) {
doneCh := make(chan struct{})

cache := setupChannelMedium(t, options, &mockNode{
handlePublicationFunc: func(channel string, pub *Publication, sp StreamPosition, prevPublication *Publication) error {
handlePublicationFunc: func(channel string, sp StreamPosition, pub, prevPub, localPrevPub *Publication) error {
close(doneCh)
return nil
},
Expand All @@ -95,7 +95,7 @@ func TestChannelMediumInsufficientState(t *testing.T) {
}
doneCh := make(chan struct{})
medium := setupChannelMedium(t, options, &mockNode{
handlePublicationFunc: func(channel string, pub *Publication, sp StreamPosition, prevPublication *Publication) error {
handlePublicationFunc: func(channel string, sp StreamPosition, pub, prevPub, localPrevPub *Publication) error {
require.Equal(t, uint64(math.MaxUint64), pub.Offset)
require.Equal(t, uint64(math.MaxUint64), sp.Offset)
close(doneCh)
Expand Down
45 changes: 20 additions & 25 deletions client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2759,13 +2759,13 @@ func (c *Client) subscribeCmd(req *protocol.SubscribeRequest, reply SubscribeRep
c.pubSubSync.StartBuffering(channel)
}

sub := subInfo{client: c, deltaType: ""}
sub := subInfo{client: c, deltaType: deltaTypeNone}
if req.Delta != "" {
dt := DeltaType(req.Delta)
if slices.Contains(reply.Options.AllowedDeltaTypes, dt) {
res.Delta = true
sub.deltaType = dt
}
sub.deltaType = dt
}
err := c.node.addSubscription(channel, sub)
if err != nil {
Expand Down Expand Up @@ -3118,22 +3118,17 @@ func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publica
// This is a special pub to trigger insufficient state. Noop in non-positioning case.
return nil
}
// Centrifuge does not support deltas in this case for now. Without positioning there could be a
// message loss (because Broker's PUB/SUB is at most once), which we can't detect here, so prevPub
// coming from the broker level can be different from what client connection actually received.
// It seems possible to implement delta for this path using channel medium layer – but need to
// distinguish broker and in-memory deltas and use in-memory one here.
//if data.delta && deltaAllowed {
// return c.transportEnqueue(data.deltaData, ch, protocol.FrameTypePushPublication)
//}
//if !deltaAllowed {
// c.mu.Lock()
// if chCtx, chCtxOK := c.channels[ch]; chCtxOK {
// chCtx.flags |= flagDeltaAllowed
// c.channels[ch] = chCtx
// }
// c.mu.Unlock()
//}
if prep.deltaSub {
if deltaAllowed {
return c.transportEnqueue(prep.localDeltaData, ch, protocol.FrameTypePushPublication)
}
c.mu.Lock()
if chCtx, chCtxOK := c.channels[ch]; chCtxOK {
chCtx.flags |= flagDeltaAllowed
c.channels[ch] = chCtx
}
c.mu.Unlock()
}
return c.transportEnqueue(prep.fullData, ch, protocol.FrameTypePushPublication)
}
serverSide := channelHasFlag(channelContext.flags, flagServerSide)
Expand Down Expand Up @@ -3161,10 +3156,10 @@ func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publica
if hasFlag(c.transport.DisabledPushFlags(), PushFlagPublication) {
return nil
}
if prep.delta && deltaAllowed {
return c.transportEnqueue(prep.deltaData, ch, protocol.FrameTypePushPublication)
}
if !deltaAllowed {
if prep.deltaSub {
if deltaAllowed {
return c.transportEnqueue(prep.brokerDeltaData, ch, protocol.FrameTypePushPublication)
}
c.mu.Lock()
if chCtx, chCtxOK := c.channels[ch]; chCtxOK {
chCtx.flags |= flagDeltaAllowed
Expand All @@ -3176,7 +3171,7 @@ func (c *Client) writePublicationUpdatePosition(ch string, pub *protocol.Publica
}

func (c *Client) writePublicationNoDelta(ch string, pub *protocol.Publication, data []byte, sp StreamPosition) error {
return c.writePublication(ch, pub, preparedData{fullData: data, deltaData: nil, delta: false}, sp)
return c.writePublication(ch, pub, preparedData{fullData: data, brokerDeltaData: nil, localDeltaData: nil, deltaSub: false}, sp)
}

func (c *Client) writePublication(ch string, pub *protocol.Publication, prep preparedData, sp StreamPosition) error {
Expand All @@ -3188,7 +3183,7 @@ func (c *Client) writePublication(ch string, pub *protocol.Publication, prep pre
return nil
}

if prep.delta {
if prep.deltaSub {
// For this path (no Offset) delta may come from channel medium layer, so that we can use it
// here if allowed for the connection.
c.mu.RLock()
Expand All @@ -3201,7 +3196,7 @@ func (c *Client) writePublication(ch string, pub *protocol.Publication, prep pre
c.mu.RUnlock()

if deltaAllowed {
return c.transportEnqueue(prep.deltaData, ch, protocol.FrameTypePushPublication)
return c.transportEnqueue(prep.localDeltaData, ch, protocol.FrameTypePushPublication)
} else {
c.mu.Lock()
if chCtx, chCtxOK := c.channels[ch]; chCtxOK {
Expand Down
4 changes: 2 additions & 2 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -716,9 +716,9 @@ func testUnexpectedOffsetEpochProtocolV2(t *testing.T, offset uint64, epoch stri
}, &protocol.Command{}, time.Now(), rwWrapper.rw)
require.NoError(t, err)

err = node.handlePublication("test", &Publication{
err = node.handlePublication("test", StreamPosition{offset, epoch}, &Publication{
Offset: offset,
}, StreamPosition{offset, epoch}, nil)
}, nil, nil)
require.NoError(t, err)

select {
Expand Down
Loading

0 comments on commit 240e132

Please sign in to comment.