From 52b4bc414facb46efcf3e9a2c1152aeb0bf7d544 Mon Sep 17 00:00:00 2001 From: Tom Wilkie Date: Wed, 9 Mar 2016 20:00:59 +0000 Subject: [PATCH] Ensure probes are correcly tidied up in the sqs control router. --- app/multitenant/sqs_control_router.go | 139 ++++++++++++++++++-------- experimental/multitenant/run.sh | 9 +- 2 files changed, 100 insertions(+), 48 deletions(-) diff --git a/app/multitenant/sqs_control_router.go b/app/multitenant/sqs_control_router.go index fa3492c940..354f4c33d5 100644 --- a/app/multitenant/sqs_control_router.go +++ b/app/multitenant/sqs_control_router.go @@ -31,9 +31,10 @@ type sqsControlRouter struct { queueURL *string userIDer UserIDer - mtx sync.Mutex - cond *sync.Cond - responses map[string]xfer.Response + mtx sync.Mutex + cond *sync.Cond + responses map[string]xfer.Response + probeWorkers map[int64]*probeWorker } type sqsRequestMessage struct { @@ -54,9 +55,10 @@ func NewSQSControlRouter(url, region string, creds *credentials.Credentials, use WithEndpoint(url). WithRegion(region). WithCredentials(creds))), - queueURL: nil, - userIDer: userIDer, - responses: map[string]xfer.Response{}, + queueURL: nil, + userIDer: userIDer, + responses: map[string]xfer.Response{}, + probeWorkers: map[int64]*probeWorker{}, } result.cond = sync.NewCond(&result.mtx) go result.loop() @@ -123,13 +125,13 @@ func (cr *sqsControlRouter) loop() { continue } cr.handleResponses(res) - if err := cr.deleteMessages(res.Messages); err != nil { + if err := cr.deleteMessages(cr.queueURL, res.Messages); err != nil { log.Errorf("Error deleting message from %s: %v", *cr.queueURL, err) } } } -func (cr *sqsControlRouter) deleteMessages(messages []*sqs.Message) error { +func (cr *sqsControlRouter) deleteMessages(queueURL *string, messages []*sqs.Message) error { entries := []*sqs.DeleteMessageBatchRequestEntry{} for _, message := range messages { entries = append(entries, &sqs.DeleteMessageBatchRequestEntry{ @@ -138,7 +140,7 @@ func (cr *sqsControlRouter) deleteMessages(messages []*sqs.Message) error { }) } _, err := cr.service.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{ - QueueUrl: cr.queueURL, + QueueUrl: queueURL, Entries: entries, }) return err @@ -236,45 +238,94 @@ func (cr *sqsControlRouter) Register(ctx context.Context, probeID string, handle if err != nil { return 0, err } - go func() { - for { - res, err := cr.service.ReceiveMessage(&sqs.ReceiveMessageInput{ - QueueUrl: queueURL, - WaitTimeSeconds: longPollTime, - }) - if err != nil { - log.Errorf("[Probe %s] Error recieving message: %v", probeID, err) - continue - } - if len(res.Messages) == 0 { - continue - } - // TODO we need to parallelise the handling of requests - for _, message := range res.Messages { - var sqsRequest sqsRequestMessage - if err := json.NewDecoder(bytes.NewBufferString(*message.Body)).Decode(&sqsRequest); err != nil { - log.Errorf("[Probe %s] Error decoding message from: %v", probeID, err) - continue - } - - if err := cr.sendMessage(&sqsRequest.ResponseQueueURL, sqsResponseMessage{ - ID: sqsRequest.ID, - Response: handler(sqsRequest.Request), - }); err != nil { - log.Errorf("[Probe %s] Error sending response: %v", probeID, err) - } - } + pwID := rand.Int63() + pw := &probeWorker{ + sqsControlRouter: cr, + queueURL: queueURL, + handler: handler, + quit: make(chan struct{}), + } + pw.done.Add(1) + go pw.loop() - if err := cr.deleteMessages(res.Messages); err != nil { - log.Errorf("Error deleting message from %s: %v", *cr.queueURL, err) - } - } - }() - return 0, nil + cr.mtx.Lock() + defer cr.mtx.Unlock() + cr.probeWorkers[pwID] = pw + + return pwID, nil } func (cr *sqsControlRouter) Deregister(_ context.Context, probeID string, id int64) error { - // TODO stop the goroutine launched earlier + cr.mtx.Lock() + pw, ok := cr.probeWorkers[id] + cr.mtx.Unlock() + if !ok { + return fmt.Errorf("Probe %d never connected!", id) + } + + pw.stop() + + cr.mtx.Lock() + delete(cr.probeWorkers, id) + cr.mtx.Unlock() return nil } + +type probeWorker struct { + *sqsControlRouter + + queueURL *string + handler xfer.ControlHandlerFunc + quit chan struct{} + done sync.WaitGroup +} + +func (pw probeWorker) stop() { + close(pw.quit) + pw.done.Wait() +} + +func (pw probeWorker) loop() { + for { + // have we been stopped? + select { + case <-pw.quit: + pw.done.Done() + return + default: + } + + res, err := pw.service.ReceiveMessage(&sqs.ReceiveMessageInput{ + QueueUrl: pw.queueURL, + WaitTimeSeconds: longPollTime, + }) + if err != nil { + log.Errorf("Error recieving message: %v", err) + continue + } + if len(res.Messages) == 0 { + continue + } + + // TODO do we need to parallelise the handling of requests? + for _, message := range res.Messages { + var sqsRequest sqsRequestMessage + if err := json.NewDecoder(bytes.NewBufferString(*message.Body)).Decode(&sqsRequest); err != nil { + log.Errorf("Error decoding message from: %v", err) + continue + } + + if err := pw.sendMessage(&sqsRequest.ResponseQueueURL, sqsResponseMessage{ + ID: sqsRequest.ID, + Response: pw.handler(sqsRequest.Request), + }); err != nil { + log.Errorf("Error sending response: %v", err) + } + } + + if err := pw.deleteMessages(pw.queueURL, res.Messages); err != nil { + log.Errorf("Error deleting message from %s: %v", *pw.queueURL, err) + } + } +} diff --git a/experimental/multitenant/run.sh b/experimental/multitenant/run.sh index b96908451c..d37c191ca6 100755 --- a/experimental/multitenant/run.sh +++ b/experimental/multitenant/run.sh @@ -44,18 +44,19 @@ start_container 1 progrium/consul consul -p 8400:8400 -p 8500:8500 -p 8600:53/ud # These are the micro services common_args="--no-probe --app.weave.addr= --app.http.address=:80" aws_args="--app.aws.region=us-east-1 --app.aws.id=abc --app.aws.secret=123 --app.aws.token=xyz" -start_container 1 weaveworks/scope collection -- ${common_args} --app.collector=dynamodb \ +start_container 2 weaveworks/scope collection -- ${common_args} --app.collector=dynamodb \ ${aws_args} \ --app.aws.dynamodb=http://dynamodb.weave.local:8000 \ --app.aws.create.tables=true -start_container 1 weaveworks/scope query -- ${common_args} --app.collector=dynamodb \ +start_container 2 weaveworks/scope query -- ${common_args} --app.collector=dynamodb \ ${aws_args} \ --app.aws.dynamodb=http://dynamodb.weave.local:8000 -start_container 1 weaveworks/scope controls -- ${common_args} --app.control.router=sqs \ +start_container 2 weaveworks/scope controls -- ${common_args} --app.control.router=sqs \ ${aws_args} \ --app.aws.sqs=http://sqs.weave.local:9324 start_container 1 weaveworks/scope pipes -- ${common_args} --app.pipe.router=consul \ - --app.consul.addr=consul.weave.local:8500 --app.consul.inf=ethwe + --app.consul.addr=consul.weave.local:8500 --app.consul.inf=ethwe \ + --app.consul.prefix=pipes/ # And we bring it all together with a reverse proxy start_container 1 weaveworks/scope-frontend frontend --add-host=dns.weave.local:$(weave docker-bridge-ip) --publish=4040:80