Skip to content

Commit

Permalink
Merge pull request #1554 from weaveworks/parallelism
Browse files Browse the repository at this point in the history
Fetch non-cached reports in parallel
  • Loading branch information
jml committed Jun 8, 2016

Unverified

This user has not yet uploaded their public signing key.
2 parents 2267a79 + 48fc985 commit 38af5e0
Showing 1 changed file with 48 additions and 24 deletions.
72 changes: 48 additions & 24 deletions app/multitenant/dynamo_collector.go
Original file line number Diff line number Diff line change
@@ -240,37 +240,61 @@ func (c *dynamoDBCollector) getCached(reportKeys []string) ([]report.Report, []s
return foundReports, missingReports
}

// Fetch multiple reports in parallel from S3.
func (c *dynamoDBCollector) getNonCached(reportKeys []string) ([]report.Report, error) {
reports := []report.Report{}
type result struct {
key string
report *report.Report
err error
}

ch := make(chan result, len(reportKeys))

for _, reportKey := range reportKeys {
var resp *s3.GetObjectOutput
err := timeRequest("Get", s3RequestDuration, func() error {
var err error
resp, err = c.s3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(c.bucketName),
Key: aws.String(reportKey),
})
return err
})
if err != nil {
return nil, err
}
reader, err := gzip.NewReader(resp.Body)
if err != nil {
log.Errorf("Error gunzipping report: %v", err)
continue
}
rep := report.MakeReport()
if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&rep); err != nil {
log.Errorf("Failed to decode report: %v", err)
continue
go func(reportKey string) {
r := result{key: reportKey}
r.report, r.err = c.getNonCachedReport(reportKey)
ch <- r
}(reportKey)
}

reports := []report.Report{}
for range reportKeys {
r := <-ch
if r.err != nil {
return nil, r.err
}
reports = append(reports, rep)
c.cache.Set(reportKey, rep)
reports = append(reports, *r.report)
c.cache.Set(r.key, *r.report)
}
return reports, nil
}

// Fetch a single report from S3.
func (c *dynamoDBCollector) getNonCachedReport(reportKey string) (*report.Report, error) {
var resp *s3.GetObjectOutput
err := timeRequest("Get", s3RequestDuration, func() error {
var err error
resp, err = c.s3.GetObject(&s3.GetObjectInput{
Bucket: aws.String(c.bucketName),
Key: aws.String(reportKey),
})
return err
})
if err != nil {
return nil, err
}
reader, err := gzip.NewReader(resp.Body)
if err != nil {
return nil, err
}
rep := report.MakeReport()
if err := codec.NewDecoder(reader, &codec.MsgpackHandle{}).Decode(&rep); err != nil {
return nil, err
}
return &rep, nil
}

func (c *dynamoDBCollector) getReports(userid string, row int64, start, end time.Time) ([]report.Report, error) {
rowKey := fmt.Sprintf("%s-%s", userid, strconv.FormatInt(row, 10))
reportKeys, err := c.getReportKeys(rowKey, start, end)

0 comments on commit 38af5e0

Please sign in to comment.