-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Watch SrvKeyspace instead of polling in VTGate. #1433
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -48,7 +48,7 @@ type ResilientSrvTopoServer struct { | |
|
||
// mutex protects the cache map itself, not the individual | ||
// values in the cache. | ||
mutex sync.Mutex | ||
mutex sync.RWMutex | ||
srvKeyspaceNamesCache map[string]*srvKeyspaceNamesEntry | ||
srvKeyspaceCache map[string]*srvKeyspaceEntry | ||
srvShardCache map[string]*srvShardEntry | ||
|
@@ -108,7 +108,7 @@ type srvKeyspaceEntry struct { | |
keyspace string | ||
|
||
// the mutex protects any access to this structure (read or write) | ||
mutex sync.Mutex | ||
mutex sync.RWMutex | ||
|
||
insertionTime time.Time | ||
value *topodatapb.SrvKeyspace | ||
|
@@ -257,14 +257,19 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspaceNames(ctx context.Context, c | |
return result, err | ||
} | ||
|
||
// GetSrvKeyspace returns SrvKeyspace object for the given cell and keyspace. | ||
func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { | ||
server.counts.Add(queryCategory, 1) | ||
|
||
func (server *ResilientSrvTopoServer) getSrvKeyspaceEntry(cell, keyspace string) *srvKeyspaceEntry { | ||
// find the entry in the cache, add it if not there | ||
key := cell + "." + keyspace | ||
server.mutex.Lock() | ||
server.mutex.RLock() | ||
entry, ok := server.srvKeyspaceCache[key] | ||
if ok { | ||
server.mutex.RUnlock() | ||
return entry | ||
} | ||
server.mutex.RUnlock() | ||
|
||
server.mutex.Lock() | ||
entry, ok = server.srvKeyspaceCache[key] | ||
if !ok { | ||
entry = &srvKeyspaceEntry{ | ||
cell: cell, | ||
|
@@ -273,40 +278,88 @@ func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, | |
server.srvKeyspaceCache[key] = entry | ||
} | ||
server.mutex.Unlock() | ||
return entry | ||
} | ||
|
||
// GetSrvKeyspace returns SrvKeyspace object for the given cell and keyspace. | ||
func (server *ResilientSrvTopoServer) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { | ||
server.counts.Add(queryCategory, 1) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think we're interested in QPS on this any more, it's just querying an inside cached value... I would remove that counter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
|
||
entry := server.getSrvKeyspaceEntry(cell, keyspace) | ||
|
||
// If the entry exists, return it | ||
entry.mutex.RLock() | ||
if !entry.insertionTime.IsZero() { | ||
v, e := entry.value, entry.lastError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. lastError should be nil here in all cases, just don't fetch it and return nil? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
entry.mutex.RUnlock() | ||
return v, e | ||
} | ||
entry.mutex.RUnlock() | ||
|
||
// Lock the entry, and do everything holding the lock. This | ||
// means two concurrent requests will only issue one | ||
// underlying query. | ||
entry.mutex.Lock() | ||
defer entry.mutex.Unlock() | ||
|
||
// If the entry is fresh enough, return it | ||
if time.Now().Sub(entry.insertionTime) < server.cacheTTL { | ||
// If the entry exists, return it | ||
if !entry.insertionTime.IsZero() { | ||
return entry.value, entry.lastError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. return entry.value, nil here There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
// not in cache or too old, get the real value | ||
// not in cache, get the real value | ||
newCtx, cancel := context.WithTimeout(context.Background(), *srvTopoTimeout) | ||
defer cancel() | ||
|
||
result, err := server.topoServer.GetSrvKeyspace(newCtx, cell, keyspace) | ||
// start watching | ||
notifications, _, err := server.topoServer.WatchSrvKeyspace(newCtx, cell, keyspace) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am not sure this is right: won't the watch expire when newCtx expires? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. By the way, looking at it now, maybe the WatchSrvKeyspace API is wrong: instead of the stopWatching channel that is returned, WatchSrvKeyspace should just close the notifications channel and return if the context is cancelled. That way the caller can just cancel the context to stop the watch, and that triggers the notification channel to be closed. So the go routine that waist on notifications can just range on the channel. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modified so the Watch call uses an empty context without timeout. |
||
if err != nil { | ||
// set error if there is no cached value | ||
if entry.insertionTime.IsZero() { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is already tested earlier (line 306), no need to test again here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Change to test entry.value == nil. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See below, we don't get any error if topo server is down, at this level. So that's a mood point. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. |
||
server.counts.Add(errorCategory, 1) | ||
log.Errorf("GetSrvKeyspace(%v, %v, %v) failed: %v (no cached value, caching and returning error)", newCtx, cell, keyspace, err) | ||
} else { | ||
server.counts.Add(cachedCategory, 1) | ||
log.Warningf("GetSrvKeyspace(%v, %v, %v) failed: %v (returning cached value: %v %v)", newCtx, cell, keyspace, err, entry.value, entry.lastError) | ||
return entry.value, entry.lastError | ||
entry.lastError = err | ||
entry.lastErrorCtx = newCtx | ||
} | ||
// return cached value if any | ||
log.Errorf("WatchSrvKeyspace failed for %v/%v: %v, returning cached value: %+v, %v", cell, keyspace, err, entry.value, entry.lastError) | ||
return entry.value, entry.lastError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. there is no entry here, we know this, so it's not returning the cached value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There could be. If the first watch call succeeded and the topo server goes down later, we cannot connect to the topo server but we have cached value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The watch call never returns, unless cancelled. That's how the API is documented, and how it's implemented. If it gets a topo service error, it retries and tries to reconnect and get the value. So there is no way to get in that situation. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Got it. |
||
} | ||
sk, ok := <-notifications | ||
if !ok { | ||
// set error if there is no cached value | ||
if entry.insertionTime.IsZero() { | ||
entry.lastError = fmt.Errorf("failed to receive from channel: %v %v", sk, ok) | ||
entry.lastErrorCtx = newCtx | ||
} | ||
// return cached value if any | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. fix comment. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done (removed as it returns error clearly). |
||
log.Errorf("WatchSrvKeyspace first result failed for %v/%v: %v %v, returning cached value: %+v, %v", cell, keyspace, sk, ok, entry.value, entry.lastError) | ||
return entry.value, entry.lastError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. we need to stop the watch here, for clean-up purposes. Close the stopChannel for now, cancel the context later. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
} | ||
|
||
// save the value we got and the current time in the cache | ||
// cache the first notification | ||
entry.insertionTime = time.Now() | ||
entry.value = result | ||
entry.lastError = err | ||
entry.value = sk | ||
entry.lastError = nil | ||
entry.lastErrorCtx = newCtx | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should be nil There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
return result, err | ||
|
||
go func() { | ||
for { | ||
sk, ok := <-notifications | ||
entry.mutex.Lock() | ||
if !ok { | ||
log.Errorf("failed to receive from channel: %v %v", sk, ok) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. the 'for' loop can be a range on 'notifications', and this code moves to outside of the loop. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modify to range over the channel. |
||
// reset entry so it retries watching in next call | ||
entry.insertionTime = time.Time{} | ||
entry.mutex.Unlock() | ||
break | ||
} | ||
entry.insertionTime = time.Now() | ||
entry.value = sk | ||
entry.lastError = nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is useless, right? nobody else can change it, and it's set to nil already? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
entry.mutex.Unlock() | ||
} | ||
}() | ||
|
||
return entry.value, entry.lastError | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. return entry.value, nil There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. |
||
} | ||
|
||
// GetSrvShard returns SrvShard object for the given cell, keyspace, and shard. | ||
|
@@ -705,15 +758,15 @@ func (server *ResilientSrvTopoServer) CacheStatus() *ResilientSrvTopoServerCache | |
} | ||
|
||
for _, entry := range server.srvKeyspaceCache { | ||
entry.mutex.Lock() | ||
entry.mutex.RLock() | ||
result.SrvKeyspaces = append(result.SrvKeyspaces, &SrvKeyspaceCacheStatus{ | ||
Cell: entry.cell, | ||
Keyspace: entry.keyspace, | ||
Value: entry.value, | ||
LastError: entry.lastError, | ||
LastErrorCtx: entry.lastErrorCtx, | ||
}) | ||
entry.mutex.Unlock() | ||
entry.mutex.RUnlock() | ||
} | ||
|
||
for _, entry := range server.srvShardCache { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -193,20 +193,35 @@ func TestFilterUnhealthy(t *testing.T) { | |
// returns errors for everything, except the one keyspace. | ||
type fakeTopo struct { | ||
faketopo.FakeTopo | ||
keyspace string | ||
callCount int | ||
keyspace string | ||
callCount int | ||
notifications chan *topodatapb.SrvKeyspace | ||
stopWatching chan struct{} | ||
} | ||
|
||
func (ft *fakeTopo) GetSrvKeyspaceNames(ctx context.Context, cell string) ([]string, error) { | ||
return []string{ft.keyspace}, nil | ||
} | ||
|
||
func (ft *fakeTopo) GetSrvKeyspace(ctx context.Context, cell, keyspace string) (*topodatapb.SrvKeyspace, error) { | ||
func (ft *fakeTopo) WatchSrvKeyspace(ctx context.Context, cell, keyspace string) (<-chan *topodatapb.SrvKeyspace, chan<- struct{}, error) { | ||
ft.callCount++ | ||
if keyspace == ft.keyspace { | ||
return &topodatapb.SrvKeyspace{}, nil | ||
ft.notifications = make(chan *topodatapb.SrvKeyspace, 10) | ||
ft.stopWatching = make(chan struct{}) | ||
ft.notifications <- &topodatapb.SrvKeyspace{} | ||
return ft.notifications, ft.stopWatching, nil | ||
} | ||
return nil, fmt.Errorf("Unknown keyspace") | ||
return nil, nil, fmt.Errorf("Unknown keyspace") | ||
} | ||
|
||
func (ft *fakeTopo) GetSrvShard(ctx context.Context, cell, keyspace, shard string) (*topodatapb.SrvShard, error) { | ||
ft.callCount++ | ||
if keyspace != ft.keyspace { | ||
return nil, fmt.Errorf("Unknown keyspace") | ||
} | ||
return &topodatapb.SrvShard{ | ||
Name: shard, | ||
}, nil | ||
} | ||
|
||
func (ft *fakeTopo) GetEndPoints(ctx context.Context, cell, keyspace, shard string, tabletType topodatapb.TabletType) (*topodatapb.EndPoints, int64, error) { | ||
|
@@ -298,21 +313,41 @@ func TestCacheWithErrors(t *testing.T) { | |
rsts := NewResilientSrvTopoServer(topo.Server{Impl: ft}, "TestCacheWithErrors") | ||
|
||
// ask for the known keyspace, that populates the cache | ||
_, err := rsts.GetSrvKeyspace(context.Background(), "", "test_ks") | ||
_, err := rsts.GetSrvShard(context.Background(), "", "test_ks", "shard_0") | ||
if err != nil { | ||
t.Fatalf("GetSrvKeyspace got unexpected error: %v", err) | ||
t.Fatalf("GetSrvShard got unexpected error: %v", err) | ||
} | ||
|
||
// now make the topo server fail, and ask again, should get cached | ||
// value, not even ask underlying guy | ||
ft.keyspace = "another_test_ks" | ||
_, err = rsts.GetSrvKeyspace(context.Background(), "", "test_ks") | ||
_, err = rsts.GetSrvShard(context.Background(), "", "test_ks", "shard_0") | ||
if err != nil { | ||
t.Fatalf("GetSrvKeyspace got unexpected error: %v", err) | ||
t.Fatalf("GetSrvShard got unexpected error: %v", err) | ||
} | ||
|
||
// now reduce TTL to nothing, so we won't use cache, and ask again | ||
rsts.cacheTTL = 0 | ||
_, err = rsts.GetSrvShard(context.Background(), "", "test_ks", "shard_0") | ||
if err != nil { | ||
t.Fatalf("GetSrvShard got unexpected error: %v", err) | ||
} | ||
} | ||
|
||
// TestSrvKeyspaceCacheWithErrors will test we properly return cached errors for GetSrvKeyspace. | ||
func TestSrvKeyspaceCacheWithErrors(t *testing.T) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We need a test that writes a new value of SrvKeyspace on fakeTopo.notifications, and makes sure subsequent GetSrvKeyspace get the new value. That is the core unit test we need. Watch out for flaky unit tests as notification is asynchronous, probably need a loop with a timeout until GetSrvKeyspace gets the value. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done |
||
ft := &fakeTopo{keyspace: "test_ks"} | ||
rsts := NewResilientSrvTopoServer(topo.Server{Impl: ft}, "TestSrvKeyspaceCacheWithErrors") | ||
|
||
// ask for the known keyspace, that populates the cache | ||
_, err := rsts.GetSrvKeyspace(context.Background(), "", "test_ks") | ||
if err != nil { | ||
t.Fatalf("GetSrvKeyspace got unexpected error: %v", err) | ||
} | ||
|
||
// now make the topo server fail, and ask again, should get cached | ||
// value, not even ask underlying guy | ||
close(ft.notifications) | ||
_, err = rsts.GetSrvKeyspace(context.Background(), "", "test_ks") | ||
if err != nil { | ||
t.Fatalf("GetSrvKeyspace got unexpected error: %v", err) | ||
|
@@ -325,28 +360,52 @@ func TestCachedErrors(t *testing.T) { | |
rsts := NewResilientSrvTopoServer(topo.Server{Impl: ft}, "TestCachedErrors") | ||
|
||
// ask for an unknown keyspace, should get an error | ||
_, err := rsts.GetSrvKeyspace(context.Background(), "", "unknown_ks") | ||
_, err := rsts.GetSrvShard(context.Background(), "", "unknown_ks", "shard_0") | ||
if err == nil { | ||
t.Fatalf("First GetSrvKeyspace didn't return an error") | ||
t.Fatalf("First GetSrvShard didn't return an error") | ||
} | ||
if ft.callCount != 1 { | ||
t.Fatalf("GetSrvKeyspace didn't get called 1 but %v times", ft.callCount) | ||
t.Fatalf("GetSrvShard didn't get called 1 but %v times", ft.callCount) | ||
} | ||
|
||
// ask again, should get an error and use cache | ||
_, err = rsts.GetSrvKeyspace(context.Background(), "", "unknown_ks") | ||
_, err = rsts.GetSrvShard(context.Background(), "", "unknown_ks", "shard_0") | ||
if err == nil { | ||
t.Fatalf("Second GetSrvKeyspace didn't return an error") | ||
t.Fatalf("Second GetSrvShard didn't return an error") | ||
} | ||
if ft.callCount != 1 { | ||
t.Fatalf("GetSrvKeyspace was called again: %v times", ft.callCount) | ||
t.Fatalf("GetSrvShard was called again: %v times", ft.callCount) | ||
} | ||
|
||
// ask again after expired cache, should get an error | ||
rsts.cacheTTL = 0 | ||
_, err = rsts.GetSrvShard(context.Background(), "", "unknown_ks", "shard_0") | ||
if err == nil { | ||
t.Fatalf("Third GetSrvShard didn't return an error") | ||
} | ||
if ft.callCount != 2 { | ||
t.Fatalf("GetSrvShard was not called again: %v times", ft.callCount) | ||
} | ||
} | ||
|
||
// TestSrvKeyspaceCachedErrors will test we properly return cached errors for SrvKeyspace. | ||
func TestSrvKeyspaceCachedErrors(t *testing.T) { | ||
ft := &fakeTopo{keyspace: "test_ks"} | ||
rsts := NewResilientSrvTopoServer(topo.Server{Impl: ft}, "TestSrvKeyspaceCachedErrors") | ||
|
||
// ask for an unknown keyspace, should get an error | ||
_, err := rsts.GetSrvKeyspace(context.Background(), "", "unknown_ks") | ||
if err == nil { | ||
t.Fatalf("First GetSrvKeyspace didn't return an error") | ||
} | ||
if ft.callCount != 1 { | ||
t.Fatalf("GetSrvKeyspace didn't get called 1 but %v times", ft.callCount) | ||
} | ||
|
||
// ask again, should get an error and use cache | ||
_, err = rsts.GetSrvKeyspace(context.Background(), "", "unknown_ks") | ||
if err == nil { | ||
t.Fatalf("Third GetSrvKeyspace didn't return an error") | ||
t.Fatalf("Second GetSrvKeyspace didn't return an error") | ||
} | ||
if ft.callCount != 2 { | ||
t.Fatalf("GetSrvKeyspace was not called again: %v times", ft.callCount) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this any more, just having 'value != nil' is the test we should be doing to see if it's set?
Unless we display the age of the value in the status page... Then we can populate insertionTime, but I would still test value for nil instead of insertionTime IsZero...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Test for value==nil only is not sufficient.
If the watch call succeeds first, it has a valid value. Later if the topo server goes down, we need a way to reconnect and return cached value before the reconnection succeeds.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
insertionTime is removed.