Skip to content

Commit

Permalink
Ensure probes are correcly tidied up in the sqs control router.
Browse files Browse the repository at this point in the history
  • Loading branch information
tomwilkie committed Mar 9, 2016
1 parent f7cbe9c commit 52b4bc4
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 48 deletions.
139 changes: 95 additions & 44 deletions app/multitenant/sqs_control_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ type sqsControlRouter struct {
queueURL *string
userIDer UserIDer

mtx sync.Mutex
cond *sync.Cond
responses map[string]xfer.Response
mtx sync.Mutex
cond *sync.Cond
responses map[string]xfer.Response
probeWorkers map[int64]*probeWorker
}

type sqsRequestMessage struct {
Expand All @@ -54,9 +55,10 @@ func NewSQSControlRouter(url, region string, creds *credentials.Credentials, use
WithEndpoint(url).
WithRegion(region).
WithCredentials(creds))),
queueURL: nil,
userIDer: userIDer,
responses: map[string]xfer.Response{},
queueURL: nil,
userIDer: userIDer,
responses: map[string]xfer.Response{},
probeWorkers: map[int64]*probeWorker{},
}
result.cond = sync.NewCond(&result.mtx)
go result.loop()
Expand Down Expand Up @@ -123,13 +125,13 @@ func (cr *sqsControlRouter) loop() {
continue
}
cr.handleResponses(res)
if err := cr.deleteMessages(res.Messages); err != nil {
if err := cr.deleteMessages(cr.queueURL, res.Messages); err != nil {
log.Errorf("Error deleting message from %s: %v", *cr.queueURL, err)
}
}
}

func (cr *sqsControlRouter) deleteMessages(messages []*sqs.Message) error {
func (cr *sqsControlRouter) deleteMessages(queueURL *string, messages []*sqs.Message) error {
entries := []*sqs.DeleteMessageBatchRequestEntry{}
for _, message := range messages {
entries = append(entries, &sqs.DeleteMessageBatchRequestEntry{
Expand All @@ -138,7 +140,7 @@ func (cr *sqsControlRouter) deleteMessages(messages []*sqs.Message) error {
})
}
_, err := cr.service.DeleteMessageBatch(&sqs.DeleteMessageBatchInput{
QueueUrl: cr.queueURL,
QueueUrl: queueURL,
Entries: entries,
})
return err
Expand Down Expand Up @@ -236,45 +238,94 @@ func (cr *sqsControlRouter) Register(ctx context.Context, probeID string, handle
if err != nil {
return 0, err
}
go func() {
for {
res, err := cr.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: queueURL,
WaitTimeSeconds: longPollTime,
})
if err != nil {
log.Errorf("[Probe %s] Error recieving message: %v", probeID, err)
continue
}
if len(res.Messages) == 0 {
continue
}

// TODO we need to parallelise the handling of requests
for _, message := range res.Messages {
var sqsRequest sqsRequestMessage
if err := json.NewDecoder(bytes.NewBufferString(*message.Body)).Decode(&sqsRequest); err != nil {
log.Errorf("[Probe %s] Error decoding message from: %v", probeID, err)
continue
}

if err := cr.sendMessage(&sqsRequest.ResponseQueueURL, sqsResponseMessage{
ID: sqsRequest.ID,
Response: handler(sqsRequest.Request),
}); err != nil {
log.Errorf("[Probe %s] Error sending response: %v", probeID, err)
}
}
pwID := rand.Int63()
pw := &probeWorker{
sqsControlRouter: cr,
queueURL: queueURL,
handler: handler,
quit: make(chan struct{}),
}
pw.done.Add(1)
go pw.loop()

if err := cr.deleteMessages(res.Messages); err != nil {
log.Errorf("Error deleting message from %s: %v", *cr.queueURL, err)
}
}
}()
return 0, nil
cr.mtx.Lock()
defer cr.mtx.Unlock()
cr.probeWorkers[pwID] = pw

return pwID, nil
}

func (cr *sqsControlRouter) Deregister(_ context.Context, probeID string, id int64) error {
// TODO stop the goroutine launched earlier
cr.mtx.Lock()
pw, ok := cr.probeWorkers[id]
cr.mtx.Unlock()
if !ok {
return fmt.Errorf("Probe %d never connected!", id)
}

pw.stop()

cr.mtx.Lock()
delete(cr.probeWorkers, id)
cr.mtx.Unlock()
return nil
}

type probeWorker struct {
*sqsControlRouter

queueURL *string
handler xfer.ControlHandlerFunc
quit chan struct{}
done sync.WaitGroup
}

func (pw probeWorker) stop() {
close(pw.quit)
pw.done.Wait()
}

func (pw probeWorker) loop() {
for {
// have we been stopped?
select {
case <-pw.quit:
pw.done.Done()
return
default:
}

res, err := pw.service.ReceiveMessage(&sqs.ReceiveMessageInput{
QueueUrl: pw.queueURL,
WaitTimeSeconds: longPollTime,
})
if err != nil {
log.Errorf("Error recieving message: %v", err)
continue
}
if len(res.Messages) == 0 {
continue
}

// TODO do we need to parallelise the handling of requests?
for _, message := range res.Messages {
var sqsRequest sqsRequestMessage
if err := json.NewDecoder(bytes.NewBufferString(*message.Body)).Decode(&sqsRequest); err != nil {
log.Errorf("Error decoding message from: %v", err)
continue
}

if err := pw.sendMessage(&sqsRequest.ResponseQueueURL, sqsResponseMessage{
ID: sqsRequest.ID,
Response: pw.handler(sqsRequest.Request),
}); err != nil {
log.Errorf("Error sending response: %v", err)
}
}

if err := pw.deleteMessages(pw.queueURL, res.Messages); err != nil {
log.Errorf("Error deleting message from %s: %v", *pw.queueURL, err)
}
}
}
9 changes: 5 additions & 4 deletions experimental/multitenant/run.sh
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,19 @@ start_container 1 progrium/consul consul -p 8400:8400 -p 8500:8500 -p 8600:53/ud
# These are the micro services
common_args="--no-probe --app.weave.addr= --app.http.address=:80"
aws_args="--app.aws.region=us-east-1 --app.aws.id=abc --app.aws.secret=123 --app.aws.token=xyz"
start_container 1 weaveworks/scope collection -- ${common_args} --app.collector=dynamodb \
start_container 2 weaveworks/scope collection -- ${common_args} --app.collector=dynamodb \
${aws_args} \
--app.aws.dynamodb=http://dynamodb.weave.local:8000 \
--app.aws.create.tables=true
start_container 1 weaveworks/scope query -- ${common_args} --app.collector=dynamodb \
start_container 2 weaveworks/scope query -- ${common_args} --app.collector=dynamodb \
${aws_args} \
--app.aws.dynamodb=http://dynamodb.weave.local:8000
start_container 1 weaveworks/scope controls -- ${common_args} --app.control.router=sqs \
start_container 2 weaveworks/scope controls -- ${common_args} --app.control.router=sqs \
${aws_args} \
--app.aws.sqs=http://sqs.weave.local:9324
start_container 1 weaveworks/scope pipes -- ${common_args} --app.pipe.router=consul \
--app.consul.addr=consul.weave.local:8500 --app.consul.inf=ethwe
--app.consul.addr=consul.weave.local:8500 --app.consul.inf=ethwe \
--app.consul.prefix=pipes/

# And we bring it all together with a reverse proxy
start_container 1 weaveworks/scope-frontend frontend --add-host=dns.weave.local:$(weave docker-bridge-ip) --publish=4040:80

0 comments on commit 52b4bc4

Please sign in to comment.