-
Notifications
You must be signed in to change notification settings - Fork 340
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
refactor(dp-server): simplify by removing dataplane_callbacks #12890
base: master
Are you sure you want to change the base?
Conversation
Part of kumahq#12881 Signed-off-by: Charly Molter <charly.molter@konghq.com>
Reviewer Checklist🔍 Each of these sections need to be checked by the reviewer of the PR 🔍:
|
Dataplane callbacks was an abstraction causing more complexity then anything. We now have a watchdog_callbacks which start the lifecycle and the watchdog We carefully ensure (and test) that there always a single watchdog running at once even when DP reconnects. part of kumahq#12881 Signed-off-by: Charly Molter <charly.molter@konghq.com>
23c3094
to
de971dc
Compare
Signed-off-by: Charly Molter <charly.molter@konghq.com>
Signed-off-by: Charly Molter <charly.molter@konghq.com>
Signed-off-by: Charly Molter <charly.molter@konghq.com>
Signed-off-by: Charly Molter <charly.molter@konghq.com>
} | ||
t.activeStreams[streamID].proxyInfo = pInfo | ||
ctx := t.activeStreams[streamID].ctx | ||
pInfo.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually the fact that the long running cleanup runs with this lock means that 2 connections to the same DP may lead to holding the global lock for longer.
// Watchdog should be run only once for given DP regardless of the number of streams. | ||
// For ADS there is only one stream for DP. | ||
// | ||
// We keep |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
something is missing in this comment
defer t.Unlock() | ||
dataplaneSyncLog.V(1).Info("stream is open", "streamID", streamID) | ||
if _, found := t.activeStreams[streamID]; found { | ||
return errors.Errorf("streamID %d is already tracked, we should never reopen a stream", streamID) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it safe to return error in this case? Docs says OnStreamClosed
will be called:
// OnStreamOpen is called once an xDS stream is opened with a stream ID and the type URL (or "" for ADS).
// Returning an error will end processing and close the stream. OnStreamClosed will still be called.
OnStreamOpen(context.Context, int64, string) error
so we're probably risking closing this stream no?
pInfo.Unlock() | ||
return // We are not the last stream, we don't care | ||
} | ||
if sInfo.proxyInfo.cancelFunc != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since you released the t.Unlock()
you can't use sInfo
as it could be accessed for example from the OnStreamRequest
:
t.activeStreams[streamID].proxyInfo = pInfo
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here sInfo
is a copy to the value in the map, right? So this statement should still work even the proxyInfo was re-assigned in the map from the method OnStreamRequest
.
l := dataplaneSyncLog.WithValues("dpKey", dpKey, "streamID", streamID) | ||
t.Lock() | ||
if t.activeStreams[streamID].proxyInfo != nil { | ||
return nil // We fast return if we already know that this streamID is tracking a specific dataplane |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
missing t.Unlock()
above this line
defer close(pInfo.done) | ||
l.V(1).Info("watchdog started") | ||
if t.dpWatchdogFactory != nil { | ||
t.dpWatchdogFactory.New(dpKey, pInfo.meta.Load).Start(pInfo.ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm trying to understand the purpose of atomic.Pointer
here. Isn't it the same as
go func(metaPtr *core_xds.DataplaneMetadata) {
defer close(pInfo.done)
l.V(1).Info("watchdog started")
if t.dpWatchdogFactory != nil {
t.dpWatchdogFactory.New(dpKey, metaPtr).Start(pInfo.ctx)
}
}(metadata)
assuming of course t.dpWatchdogFactory.New
accepts *DataplaneMetadata
instead of func() *DataplaneMetadata
. In this case, pointer will be copied as value so you don't need to access pInfo
field anymore
Switched to draft. Removing this would be good but things have changed a little so this PR needs to be reworked |
} | ||
|
||
func (t *dataplaneSyncCallbacks) OnStreamClosed(streamID core_xds.StreamID) { | ||
t.Lock() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So many lock/unlock invocations in this method. I'm thinking how can we improve it to make it cleaner.
I like this change. It simplifies things so much. |
Motivation
Dataplane callbacks was an abstraction causing more complexity then
anything.
Implementation information
We now have a watchdog_callbacks which start the lifecycle and the
watchdog
We carefully ensure (and test) that there always a single watchdog running at once
even when DP reconnects.
Supporting documentation
part of #12881