From e53bf0f77af0257b9c62197ecf99e7317f4f6dcf Mon Sep 17 00:00:00 2001 From: Laurentiu Badea Date: Thu, 29 Oct 2020 09:33:34 -0700 Subject: [PATCH] Add Accounts to http cache events (#1553) --- stored_requests/events/api/api_test.go | 5 +- stored_requests/events/events.go | 4 + stored_requests/events/events_test.go | 13 +- stored_requests/events/http/http.go | 17 +- stored_requests/events/http/http_test.go | 305 +++++++++++------------ 5 files changed, 174 insertions(+), 170 deletions(-) diff --git a/stored_requests/events/api/api_test.go b/stored_requests/events/api/api_test.go index d2db4557573..cd3af77bd83 100644 --- a/stored_requests/events/api/api_test.go +++ b/stored_requests/events/api/api_test.go @@ -16,8 +16,9 @@ import ( func TestGoodRequests(t *testing.T) { cache := stored_requests.Cache{ - Requests: memory.NewCache(256*1024, -1, "Requests"), - Imps: memory.NewCache(256*1024, -1, "Imps"), + Requests: memory.NewCache(256*1024, -1, "Request"), + Imps: memory.NewCache(256*1024, -1, "Imp"), + Accounts: memory.NewCache(256*1024, -1, "Account"), } id := "1" config := fmt.Sprintf(`{"id": "%s"}`, id) diff --git a/stored_requests/events/events.go b/stored_requests/events/events.go index ba08f13d65b..5b89943572f 100644 --- a/stored_requests/events/events.go +++ b/stored_requests/events/events.go @@ -11,12 +11,14 @@ import ( type Save struct { Requests map[string]json.RawMessage `json:"requests"` Imps map[string]json.RawMessage `json:"imps"` + Accounts map[string]json.RawMessage `json:"accounts"` } // Invalidation represents a bulk invalidation type Invalidation struct { Requests []string `json:"requests"` Imps []string `json:"imps"` + Accounts []string `json:"accounts"` } // EventProducer will produce cache update and invalidation events on its channels @@ -63,12 +65,14 @@ func (e *EventListener) Listen(cache stored_requests.Cache, events EventProducer case save := <-events.Saves(): cache.Requests.Save(context.Background(), save.Requests) cache.Imps.Save(context.Background(), save.Imps) + cache.Accounts.Save(context.Background(), save.Accounts) if e.onSave != nil { e.onSave() } case invalidation := <-events.Invalidations(): cache.Requests.Invalidate(context.Background(), invalidation.Requests) cache.Imps.Invalidate(context.Background(), invalidation.Imps) + cache.Accounts.Invalidate(context.Background(), invalidation.Accounts) if e.onInvalidate != nil { e.onInvalidate() } diff --git a/stored_requests/events/events_test.go b/stored_requests/events/events_test.go index 74263d14b93..f3483705e86 100644 --- a/stored_requests/events/events_test.go +++ b/stored_requests/events/events_test.go @@ -19,6 +19,7 @@ func TestListen(t *testing.T) { cache := stored_requests.Cache{ Requests: memory.NewCache(256*1024, -1, "Requests"), Imps: memory.NewCache(256*1024, -1, "Imps"), + Accounts: memory.NewCache(256*1024, -1, "Account"), } // create channels to synchronize @@ -39,15 +40,18 @@ func TestListen(t *testing.T) { save := Save{ Requests: data, Imps: data, + Accounts: data, } cache.Requests.Save(context.Background(), save.Requests) - cache.Requests.Save(context.Background(), save.Imps) + cache.Imps.Save(context.Background(), save.Imps) + cache.Accounts.Save(context.Background(), save.Accounts) config = fmt.Sprintf(`{"id": "%s", "updated": true}`, id) data = map[string]json.RawMessage{id: json.RawMessage(config)} save = Save{ Requests: data, Imps: data, + Accounts: data, } ep.saves <- save @@ -55,13 +59,15 @@ func TestListen(t *testing.T) { requestData := cache.Requests.Get(context.Background(), idSlice) impData := cache.Imps.Get(context.Background(), idSlice) - if !reflect.DeepEqual(requestData, data) || !reflect.DeepEqual(impData, data) { + accountData := cache.Accounts.Get(context.Background(), idSlice) + if !reflect.DeepEqual(requestData, data) || !reflect.DeepEqual(impData, data) || !reflect.DeepEqual(accountData, data) { t.Error("Update failed") } invalidation := Invalidation{ Requests: idSlice, Imps: idSlice, + Accounts: idSlice, } ep.invalidations <- invalidation @@ -69,7 +75,8 @@ func TestListen(t *testing.T) { requestData = cache.Requests.Get(context.Background(), idSlice) impData = cache.Imps.Get(context.Background(), idSlice) - if len(requestData) > 0 || len(impData) > 0 { + accountData = cache.Accounts.Get(context.Background(), idSlice) + if len(requestData) > 0 || len(impData) > 0 || len(accountData) > 0 { t.Error("Invalidate failed") } } diff --git a/stored_requests/events/http/http.go b/stored_requests/events/http/http.go index a6a129eed42..4615183f693 100644 --- a/stored_requests/events/http/http.go +++ b/stored_requests/events/http/http.go @@ -42,6 +42,13 @@ import ( // "imp2": { ... stored data for imp2 ... }, // } // } +// or +// { +// "accounts": { +// "acc1": { ... config data for acc1 ... }, +// "acc2": { ... config data for acc2 ... }, +// }, +// } // // To signal deletions, the endpoint may return { "deleted": true } // in place of the Stored Data if the "last-modified" param existed. @@ -82,10 +89,11 @@ func (e *HTTPEvents) fetchAll() { defer cancel() resp, err := ctxhttp.Get(ctx, e.client, e.Endpoint) if respObj, ok := e.parse(e.Endpoint, resp, err); ok && - (len(respObj.StoredRequests) > 0 || len(respObj.StoredImps) > 0) { + (len(respObj.StoredRequests) > 0 || len(respObj.StoredImps) > 0 || len(respObj.Accounts) > 0) { e.saves <- events.Save{ Requests: respObj.StoredRequests, Imps: respObj.StoredImps, + Accounts: respObj.Accounts, } } } @@ -125,14 +133,16 @@ func (e *HTTPEvents) refresh(ticker <-chan time.Time) { invalidations := events.Invalidation{ Requests: extractInvalidations(respObj.StoredRequests), Imps: extractInvalidations(respObj.StoredImps), + Accounts: extractInvalidations(respObj.Accounts), } - if len(respObj.StoredRequests) > 0 || len(respObj.StoredImps) > 0 { + if len(respObj.StoredRequests) > 0 || len(respObj.StoredImps) > 0 || len(respObj.Accounts) > 0 { e.saves <- events.Save{ Requests: respObj.StoredRequests, Imps: respObj.StoredImps, + Accounts: respObj.Accounts, } } - if len(invalidations.Requests) > 0 || len(invalidations.Imps) > 0 { + if len(invalidations.Requests) > 0 || len(invalidations.Imps) > 0 || len(invalidations.Accounts) > 0 { e.invalidations <- invalidations } e.lastUpdate = thisTimeInUTC @@ -193,4 +203,5 @@ func (e *HTTPEvents) Invalidations() <-chan events.Invalidation { type responseContract struct { StoredRequests map[string]json.RawMessage `json:"requests"` StoredImps map[string]json.RawMessage `json:"imps"` + Accounts map[string]json.RawMessage `json:"accounts"` } diff --git a/stored_requests/events/http/http_test.go b/stored_requests/events/http/http_test.go index fdba84cd6fe..2a1aa5d8dfc 100644 --- a/stored_requests/events/http/http_test.go +++ b/stored_requests/events/http/http_test.go @@ -1,145 +1,161 @@ package http import ( - "bytes" "context" "encoding/json" + "fmt" httpCore "net/http" "net/http/httptest" "testing" "time" -) - -func TestStartupReqsOnly(t *testing.T) { - server := httptest.NewServer(&mockResponseHandler{ - statusCode: httpCore.StatusOK, - response: `{"requests":{"request1":{"value":1}, "request2":{"value":2}}}`, - }) - defer server.Close() - - ev := NewHTTPEvents(server.Client(), server.URL, nil, -1) - theSave := <-ev.Saves() - - assertLen(t, theSave.Requests, 2) - assertHasValue(t, theSave.Requests, "request1", `{"value":1}`) - assertHasValue(t, theSave.Requests, "request2", `{"value":2}`) - - assertLen(t, theSave.Imps, 0) -} - -func TestStartupImpsOnly(t *testing.T) { - server := httptest.NewServer(&mockResponseHandler{ - statusCode: httpCore.StatusOK, - response: `{"imps":{"imp1":{"value":1}}}`, - }) - defer server.Close() - - ev := NewHTTPEvents(server.Client(), server.URL, nil, -1) - theSave := <-ev.Saves() - - assertLen(t, theSave.Requests, 0) - - assertLen(t, theSave.Imps, 1) - assertHasValue(t, theSave.Imps, "imp1", `{"value":1}`) -} - -func TestStartupBothTypes(t *testing.T) { - server := httptest.NewServer(&mockResponseHandler{ - statusCode: httpCore.StatusOK, - response: `{"requests":{"request1":{"value":1}, "request2":{"value":2}},"imps":{"imp1":{"value":1}}}`, - }) - defer server.Close() - - ev := NewHTTPEvents(server.Client(), server.URL, nil, -1) - theSave := <-ev.Saves() - - assertLen(t, theSave.Requests, 2) - assertHasValue(t, theSave.Requests, "request1", `{"value":1}`) - assertHasValue(t, theSave.Requests, "request2", `{"value":2}`) - - assertLen(t, theSave.Imps, 1) - assertHasValue(t, theSave.Imps, "imp1", `{"value":1}`) -} - -func TestUpdates(t *testing.T) { - handler := &mockResponseHandler{ - statusCode: httpCore.StatusOK, - response: `{"requests":{"request1":{"value":1}, "request2":{"value":2}},"imps":{"imp1":{"value":3},"imp2":{"value":4}}}`, - } - server := httptest.NewServer(handler) - defer server.Close() - - ev := NewHTTPEvents(server.Client(), server.URL, nil, -1) - - handler.response = `{"requests":{"request1":{"value":5}, "request2":{"deleted":true}},"imps":{"imp1":{"deleted":true},"imp2":{"value":6}}}` - timeChan := make(chan time.Time, 1) - timeChan <- time.Now() - go ev.refresh(timeChan) - firstSave := <-ev.Saves() - secondSave := <-ev.Saves() - inv := <-ev.Invalidations() - - assertLen(t, firstSave.Requests, 2) - assertHasValue(t, firstSave.Requests, "request1", `{"value":1}`) - assertHasValue(t, firstSave.Requests, "request2", `{"value":2}`) - assertLen(t, firstSave.Imps, 2) - assertHasValue(t, firstSave.Imps, "imp1", `{"value":3}`) - assertHasValue(t, firstSave.Imps, "imp2", `{"value":4}`) - - assertLen(t, secondSave.Requests, 1) - assertHasValue(t, secondSave.Requests, "request1", `{"value":5}`) - assertLen(t, secondSave.Imps, 1) - assertHasValue(t, secondSave.Imps, "imp2", `{"value":6}`) - - assertArrLen(t, inv.Requests, 1) - assertArrContains(t, inv.Requests, "request2") - assertArrLen(t, inv.Imps, 1) - assertArrContains(t, inv.Imps, "imp1") -} -func TestErrorResponse(t *testing.T) { - handler := &mockResponseHandler{ - statusCode: httpCore.StatusInternalServerError, - response: "Something horrible happened.", - } - server := httptest.NewServer(handler) - defer server.Close() + "github.com/stretchr/testify/assert" +) - ev := NewHTTPEvents(server.Client(), server.URL, nil, -1) - if len(ev.Saves()) != 0 { - t.Errorf("No saves should be emitted if the HTTP call fails. Got %d", len(ev.Saves())) - } +func ctxProducer() (context.Context, func()) { + return context.WithTimeout(context.Background(), -1) } -func TestExpiredContext(t *testing.T) { - handler := &mockResponseHandler{ - statusCode: httpCore.StatusInternalServerError, - response: "Something horrible happened.", - } - server := httptest.NewServer(handler) - defer server.Close() - - ctxProducer := func() (context.Context, func()) { - return context.WithTimeout(context.Background(), -1) +func TestStartup(t *testing.T) { + type testStep struct { + statusCode int + response string + timeout bool + saves string + invalidations string } - - ev := NewHTTPEvents(server.Client(), server.URL, ctxProducer, -1) - if len(ev.Saves()) != 0 { - t.Errorf("No saves should be emitted if the HTTP call is cancelled. Got %d", len(ev.Saves())) - } -} - -func TestMalformedResponse(t *testing.T) { - handler := &mockResponseHandler{ - statusCode: httpCore.StatusOK, - response: "This isn't JSON.", + testCases := []struct { + description string + tests []testStep + }{ + { + description: "Load requests at startup", + tests: []testStep{ + { + statusCode: httpCore.StatusOK, + response: `{"requests": {"request1": {"value":1}, "request2": {"value":2}}}`, + saves: `{"requests": {"request1": {"value":1}, "request2": {"value":2}}, "imps": null, "accounts": null}`, + }, + }, + }, + { + description: "Load imps at startup", + tests: []testStep{ + { + statusCode: httpCore.StatusOK, + response: `{"imps": {"imp1": {"value":1}}}`, + saves: `{"imps": {"imp1": {"value":1}}, "requests": null, "accounts": null}`, + }, + }, + }, + { + description: "Load requests and imps then update", + tests: []testStep{ + { + statusCode: httpCore.StatusOK, + response: `{"requests": {"request1": {"value":1}, "request2": {"value":2}}, "imps": {"imp1": {"value":3}, "imp2": {"value":4}}}`, + saves: `{"requests": {"request1": {"value":1}, "request2": {"value":2}}, "imps": {"imp1": {"value":3}, "imp2": {"value":4}}, "accounts":null}`, + }, + { + statusCode: httpCore.StatusOK, + response: `{"requests": {"request1": {"value":5}, "request2": {"deleted":true}}, "imps": {"imp1": {"deleted":true}, "imp2": {"value":6}}}`, + saves: `{"requests": {"request1": {"value":5}}, "imps": {"imp2": {"value":6}}, "accounts":null}`, + invalidations: `{"requests": ["request2"], "imps": ["imp1"], "accounts": []}`, + }, + }, + }, + { + description: "Load accounts then update", + tests: []testStep{ + { + statusCode: httpCore.StatusOK, + response: `{"accounts":{"account1":{"value":1}, "account2":{"value":2}}}`, + saves: `{"accounts":{"account1":{"value":1}, "account2":{"value":2}}, "imps": null, "requests": null}`, + }, + { + statusCode: httpCore.StatusOK, + response: `{"accounts":{"account1":{"value":5}, "account2":{"deleted": true}}}`, + saves: `{"accounts":{"account1":{"value":5}}, "imps": null, "requests": null}`, + invalidations: `{"accounts":["account2"], "requests": [], "imps": []}`, + }, + }, + }, + { + description: "Load nothing at startup", + tests: []testStep{ + { + statusCode: httpCore.StatusOK, + response: `{}`, + }, + }, + }, + { + description: "Malformed response at startup", + tests: []testStep{ + { + statusCode: httpCore.StatusOK, + response: `{some bad json`, + }, + }, + }, + { + description: "Server error at startup", + tests: []testStep{ + { + statusCode: httpCore.StatusInternalServerError, + response: ``, + }, + }, + }, + { + description: "HTTP timeout error at startup", + tests: []testStep{ + { + timeout: true, + }, + }, + }, } - server := httptest.NewServer(handler) - defer server.Close() - - ev := NewHTTPEvents(server.Client(), server.URL, nil, -1) - if len(ev.Saves()) != 0 { - t.Errorf("No updates should be emitted if the HTTP call fails. Got %d", len(ev.Saves())) + for _, tests := range testCases { + t.Run(tests.description, func(t *testing.T) { + handler := &mockResponseHandler{} + server := httptest.NewServer(handler) + defer server.Close() + + var ev *HTTPEvents + + for i, test := range tests.tests { + handler.statusCode = test.statusCode + handler.response = test.response + if i == 0 { // NewHTTPEvents() calls the API immediately + if test.timeout { + ev = NewHTTPEvents(server.Client(), server.URL, ctxProducer, -1) // force timeout + } else { + ev = NewHTTPEvents(server.Client(), server.URL, nil, -1) + } + } else { // Second test triggers API call by initiating a 1s refresh loop + timeChan := make(chan time.Time, 1) + timeChan <- time.Now() + go ev.refresh(timeChan) + } + t.Run(fmt.Sprintf("Step %d", i+1), func(t *testing.T) { + // Check expected Saves + if len(test.saves) > 0 { + saves, err := json.Marshal(<-ev.Saves()) + assert.NoError(t, err, `Failed to marshal event.Save object: %v`, err) + assert.JSONEq(t, test.saves, string(saves)) + } + assert.Empty(t, ev.Saves(), "Unexpected additional messages in save channel") + // Check expected Invalidations + if len(test.invalidations) > 0 { + invalidations, err := json.Marshal(<-ev.Invalidations()) + assert.NoError(t, err, `Failed to marshal event.Invalidation object: %v`, err) + assert.JSONEq(t, test.invalidations, string(invalidations)) + } + assert.Empty(t, ev.Invalidations(), "Unexpected additional messages in invalidations channel") + }) + } + }) } } @@ -152,38 +168,3 @@ func (m *mockResponseHandler) ServeHTTP(rw httpCore.ResponseWriter, r *httpCore. rw.WriteHeader(m.statusCode) rw.Write([]byte(m.response)) } - -func assertLen(t *testing.T, m map[string]json.RawMessage, length int) { - t.Helper() - if len(m) != length { - t.Errorf("Expected map with %d elements, but got %v", length, m) - } -} - -func assertArrLen(t *testing.T, list []string, length int) { - t.Helper() - if len(list) != length { - t.Errorf("Expected list with %d elements, but got %v", length, list) - } -} - -func assertArrContains(t *testing.T, haystack []string, needle string) { - t.Helper() - for _, elm := range haystack { - if elm == needle { - return - } - } - t.Errorf("expected element %s to be in list %v", needle, haystack) -} - -func assertHasValue(t *testing.T, m map[string]json.RawMessage, key string, val string) { - t.Helper() - if mapVal, ok := m[key]; ok { - if !bytes.Equal(mapVal, []byte(val)) { - t.Errorf("expected map[%s] to be %s, but got %s", key, val, string(mapVal)) - } - } else { - t.Errorf("map missing expected key: %s", key) - } -}