diff --git a/pkg/cache/v3/simple.go b/pkg/cache/v3/simple.go index 702d449eb2..5fe36c5b9e 100644 --- a/pkg/cache/v3/simple.go +++ b/pkg/cache/v3/simple.go @@ -184,7 +184,11 @@ func NewSnapshotCacheWithHeartbeating(ctx context.Context, ads bool, hash NodeHa } func (cache *snapshotCache) sendHeartbeats(ctx context.Context, node string) { - snapshot := cache.snapshots[node] + snapshot, ok := cache.snapshots[node] + if !ok { + return + } + if info, ok := cache.status[node]; ok { info.mu.Lock() for id, watch := range info.watches { diff --git a/pkg/cache/v3/simple_test.go b/pkg/cache/v3/simple_test.go index 1105b683c1..d5e1f92711 100644 --- a/pkg/cache/v3/simple_test.go +++ b/pkg/cache/v3/simple_test.go @@ -604,3 +604,34 @@ func TestSnapshotSingleResourceFetch(t *testing.T) { protocmp.Transform()), ) } + +func TestAvertPanicForWatchOnNonExistentSnapshot(t *testing.T) { + ctx := context.Background() + c := cache.NewSnapshotCacheWithHeartbeating(ctx, false, cache.IDHash{}, nil, time.Millisecond) + + // Create watch. + req := &cache.Request{ + Node: &core.Node{Id: "test"}, + ResourceNames: []string{"rtds"}, + TypeUrl: rsrc.RuntimeType, + } + ss := stream.NewStreamState(false, map[string]string{"cluster": "abcdef"}) + responder := make(chan cache.Response) + c.CreateWatch(req, ss, responder) + + go func() { + // Wait for at least one heartbeat to occur, then set snapshot. + time.Sleep(time.Millisecond * 5) + srs := &singleResourceSnapshot{ + version: "version-one", + typeurl: rsrc.RuntimeType, + name: "one-second", + resource: durationpb.New(time.Second), + } + if err := c.SetSnapshot(ctx, "test", srs); err != nil { + t.Errorf("unexpected error setting snapshot %v", err) + } + }() + + <-responder +}