Skip to content

Commit

Permalink
Add Accounts to http cache events (#1553)
Browse files Browse the repository at this point in the history
  • Loading branch information
laurb9 authored Oct 29, 2020
1 parent 5a718e7 commit e53bf0f
Show file tree
Hide file tree
Showing 5 changed files with 174 additions and 170 deletions.
5 changes: 3 additions & 2 deletions stored_requests/events/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ import (

func TestGoodRequests(t *testing.T) {
cache := stored_requests.Cache{
Requests: memory.NewCache(256*1024, -1, "Requests"),
Imps: memory.NewCache(256*1024, -1, "Imps"),
Requests: memory.NewCache(256*1024, -1, "Request"),
Imps: memory.NewCache(256*1024, -1, "Imp"),
Accounts: memory.NewCache(256*1024, -1, "Account"),
}
id := "1"
config := fmt.Sprintf(`{"id": "%s"}`, id)
Expand Down
4 changes: 4 additions & 0 deletions stored_requests/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,14 @@ import (
type Save struct {
Requests map[string]json.RawMessage `json:"requests"`
Imps map[string]json.RawMessage `json:"imps"`
Accounts map[string]json.RawMessage `json:"accounts"`
}

// Invalidation represents a bulk invalidation
type Invalidation struct {
Requests []string `json:"requests"`
Imps []string `json:"imps"`
Accounts []string `json:"accounts"`
}

// EventProducer will produce cache update and invalidation events on its channels
Expand Down Expand Up @@ -63,12 +65,14 @@ func (e *EventListener) Listen(cache stored_requests.Cache, events EventProducer
case save := <-events.Saves():
cache.Requests.Save(context.Background(), save.Requests)
cache.Imps.Save(context.Background(), save.Imps)
cache.Accounts.Save(context.Background(), save.Accounts)
if e.onSave != nil {
e.onSave()
}
case invalidation := <-events.Invalidations():
cache.Requests.Invalidate(context.Background(), invalidation.Requests)
cache.Imps.Invalidate(context.Background(), invalidation.Imps)
cache.Accounts.Invalidate(context.Background(), invalidation.Accounts)
if e.onInvalidate != nil {
e.onInvalidate()
}
Expand Down
13 changes: 10 additions & 3 deletions stored_requests/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ func TestListen(t *testing.T) {
cache := stored_requests.Cache{
Requests: memory.NewCache(256*1024, -1, "Requests"),
Imps: memory.NewCache(256*1024, -1, "Imps"),
Accounts: memory.NewCache(256*1024, -1, "Account"),
}

// create channels to synchronize
Expand All @@ -39,37 +40,43 @@ func TestListen(t *testing.T) {
save := Save{
Requests: data,
Imps: data,
Accounts: data,
}
cache.Requests.Save(context.Background(), save.Requests)
cache.Requests.Save(context.Background(), save.Imps)
cache.Imps.Save(context.Background(), save.Imps)
cache.Accounts.Save(context.Background(), save.Accounts)

config = fmt.Sprintf(`{"id": "%s", "updated": true}`, id)
data = map[string]json.RawMessage{id: json.RawMessage(config)}
save = Save{
Requests: data,
Imps: data,
Accounts: data,
}

ep.saves <- save
<-saveOccurred

requestData := cache.Requests.Get(context.Background(), idSlice)
impData := cache.Imps.Get(context.Background(), idSlice)
if !reflect.DeepEqual(requestData, data) || !reflect.DeepEqual(impData, data) {
accountData := cache.Accounts.Get(context.Background(), idSlice)
if !reflect.DeepEqual(requestData, data) || !reflect.DeepEqual(impData, data) || !reflect.DeepEqual(accountData, data) {
t.Error("Update failed")
}

invalidation := Invalidation{
Requests: idSlice,
Imps: idSlice,
Accounts: idSlice,
}

ep.invalidations <- invalidation
<-invalidateOccurred

requestData = cache.Requests.Get(context.Background(), idSlice)
impData = cache.Imps.Get(context.Background(), idSlice)
if len(requestData) > 0 || len(impData) > 0 {
accountData = cache.Accounts.Get(context.Background(), idSlice)
if len(requestData) > 0 || len(impData) > 0 || len(accountData) > 0 {
t.Error("Invalidate failed")
}
}
Expand Down
17 changes: 14 additions & 3 deletions stored_requests/events/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,13 @@ import (
// "imp2": { ... stored data for imp2 ... },
// }
// }
// or
// {
// "accounts": {
// "acc1": { ... config data for acc1 ... },
// "acc2": { ... config data for acc2 ... },
// },
// }
//
// To signal deletions, the endpoint may return { "deleted": true }
// in place of the Stored Data if the "last-modified" param existed.
Expand Down Expand Up @@ -82,10 +89,11 @@ func (e *HTTPEvents) fetchAll() {
defer cancel()
resp, err := ctxhttp.Get(ctx, e.client, e.Endpoint)
if respObj, ok := e.parse(e.Endpoint, resp, err); ok &&
(len(respObj.StoredRequests) > 0 || len(respObj.StoredImps) > 0) {
(len(respObj.StoredRequests) > 0 || len(respObj.StoredImps) > 0 || len(respObj.Accounts) > 0) {
e.saves <- events.Save{
Requests: respObj.StoredRequests,
Imps: respObj.StoredImps,
Accounts: respObj.Accounts,
}
}
}
Expand Down Expand Up @@ -125,14 +133,16 @@ func (e *HTTPEvents) refresh(ticker <-chan time.Time) {
invalidations := events.Invalidation{
Requests: extractInvalidations(respObj.StoredRequests),
Imps: extractInvalidations(respObj.StoredImps),
Accounts: extractInvalidations(respObj.Accounts),
}
if len(respObj.StoredRequests) > 0 || len(respObj.StoredImps) > 0 {
if len(respObj.StoredRequests) > 0 || len(respObj.StoredImps) > 0 || len(respObj.Accounts) > 0 {
e.saves <- events.Save{
Requests: respObj.StoredRequests,
Imps: respObj.StoredImps,
Accounts: respObj.Accounts,
}
}
if len(invalidations.Requests) > 0 || len(invalidations.Imps) > 0 {
if len(invalidations.Requests) > 0 || len(invalidations.Imps) > 0 || len(invalidations.Accounts) > 0 {
e.invalidations <- invalidations
}
e.lastUpdate = thisTimeInUTC
Expand Down Expand Up @@ -193,4 +203,5 @@ func (e *HTTPEvents) Invalidations() <-chan events.Invalidation {
type responseContract struct {
StoredRequests map[string]json.RawMessage `json:"requests"`
StoredImps map[string]json.RawMessage `json:"imps"`
Accounts map[string]json.RawMessage `json:"accounts"`
}
Loading

0 comments on commit e53bf0f

Please sign in to comment.