From ee914103542c7b3fe46743a283b081417592a7d4 Mon Sep 17 00:00:00 2001 From: Akihiro Suda Date: Wed, 20 Apr 2016 07:32:34 +0000 Subject: [PATCH] endpoint/rest: fix entity registration Fix flaky test #135 --- earthquake/endpoint/endpoint_test.go | 8 -------- earthquake/endpoint/rest/queue/restqueue.go | 2 +- earthquake/endpoint/rest/restendpoint.go | 15 +++++++++++++-- earthquake/endpoint/rest/restendpoint_test.go | 6 ------ 4 files changed, 14 insertions(+), 17 deletions(-) diff --git a/earthquake/endpoint/endpoint_test.go b/earthquake/endpoint/endpoint_test.go index 558dd51..9b53b4b 100644 --- a/earthquake/endpoint/endpoint_test.go +++ b/earthquake/endpoint/endpoint_test.go @@ -18,7 +18,6 @@ package endpoint import ( "flag" "fmt" - log "github.com/cihub/seelog" "github.com/osrg/earthquake/earthquake/endpoint/rest" "github.com/osrg/earthquake/earthquake/inspector/transceiver" "github.com/osrg/earthquake/earthquake/signal" @@ -31,7 +30,6 @@ import ( "os" "sync" "testing" - "time" ) var ( @@ -82,12 +80,6 @@ func TestMain(m *testing.M) { } restTransceivers[i].Start() } - // we need to wait here. - // otherwise the test can hang due to an error from restendpoint.go: - // "Ignored action for unknown entity %s. You sent the action before registration done?" - // FIXME: there should be some notification - log.Debugf("FIXME: sleeping for 10 seconds, but we should not sleep here..") - time.Sleep(10 * time.Second) os.Exit(m.Run()) } diff --git a/earthquake/endpoint/rest/queue/restqueue.go b/earthquake/endpoint/rest/queue/restqueue.go index 1b54b8d..0ca73d2 100644 --- a/earthquake/endpoint/rest/queue/restqueue.go +++ b/earthquake/endpoint/rest/queue/restqueue.go @@ -139,7 +139,7 @@ func RegisterNewQueue(entityID string) (*ActionQueue, error) { defer queuesLock.Unlock() old, oldOk := queues[entityID] if oldOk { - return nil, fmt.Errorf("entity exists %s(%#v)", entityID, old) + return old, fmt.Errorf("entity exists %s(%#v)", entityID, old) } queue := ActionQueue{ EntityID: entityID, diff --git a/earthquake/endpoint/rest/restendpoint.go b/earthquake/endpoint/rest/restendpoint.go index d19da0e..3b0b4fd 100644 --- a/earthquake/endpoint/rest/restendpoint.go +++ b/earthquake/endpoint/rest/restendpoint.go @@ -50,8 +50,10 @@ func queueFromHttpRequest(r *http.Request) (*ActionQueue, error) { entityID := vars["entity_id"] queue := GetQueue(entityID) if queue == nil { + // Note that another routine can register the entity queue, err = RegisterNewQueue(entityID) - if err != nil { + // "already registered" err is not an issue here + if err != nil && queue == nil { return nil, err } } @@ -79,6 +81,14 @@ func eventsOnPost(w http.ResponseWriter, r *http.Request) { restutil.WriteError(w, err) return } + // register entity if it is not registered yet. + // FIXME: rename the function + _, err = queueFromHttpRequest(r) + if err != nil { + restutil.WriteError(w, err) + return + } + // send event to orchestrator main go func() { orchestratorEventCh <- event @@ -140,7 +150,8 @@ func actionPropagatorRoutine() { queue := GetQueue(action.EntityID()) if queue == nil { log.Errorf("Ignored action for unknown entity %s."+ - "You sent the action before registration done?", action.EntityID()) + "Orchestrator sent an action before registration is done?", action.EntityID()) + log.Errorf("Action: %#v", action) continue } queue.Put(action) diff --git a/earthquake/endpoint/rest/restendpoint_test.go b/earthquake/endpoint/rest/restendpoint_test.go index 429663c..2061243 100644 --- a/earthquake/endpoint/rest/restendpoint_test.go +++ b/earthquake/endpoint/rest/restendpoint_test.go @@ -22,7 +22,6 @@ import ( "os" "sync" "testing" - "time" "github.com/osrg/earthquake/earthquake/inspector/transceiver" "github.com/osrg/earthquake/earthquake/signal" @@ -76,11 +75,6 @@ func TestMain(m *testing.M) { transceivers[i].Start() } - // we need to wait here. - // otherwise the test can hang due to an error from restendpoint.go: - // "Ignored action for unknown entity %s. You sent the action before registration done?" - // FIXME: there should be some notification - time.Sleep(10 * time.Second) os.Exit(m.Run()) }