Skip to content

Commit

Permalink
WIP: Remove SASI indexes
Browse files Browse the repository at this point in the history
Signed-off-by: Prithvi Raj <p.r@uber.com>
  • Loading branch information
vprithvi committed May 2, 2018
1 parent c7891a4 commit 5e58764
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 17 deletions.
21 changes: 17 additions & 4 deletions plugin/storage/cassandra/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
package dependencystore

import (
"fmt"
"strings"
"time"

"github.com/pkg/errors"
Expand All @@ -27,8 +29,10 @@ import (
)

const (
depsInsertStmt = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsSelectStmt = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsInsertStmt = "INSERT INTO dependencies(ts, date_bucket, dependencies) VALUES (?, ?, ?)"
depsSelectFmt = "SELECT ts, dependencies FROM dependencies WHERE date_bucket IN (%s) AND ts >= ? AND ts < ?"
dateFmt = "20060102"
day = 24 * time.Hour
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
Expand Down Expand Up @@ -64,13 +68,14 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
CallCount: int64(d.CallCount),
}
}
query := s.session.Query(depsInsertStmt, ts, ts, deps)
query := s.session.Query(depsInsertStmt, ts, ts.Format(dateFmt), deps)
return s.dependenciesTableMetrics.Exec(query, s.logger)
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
query := s.session.Query(depsSelectStmt, endTs.Add(-1*lookback), endTs)
startTs := endTs.Add(-1 * lookback)
query := s.session.Query(getDepSelectString(startTs, endTs), startTs, endTs)
iter := query.Consistency(cassandra.One).Iter()

var mDependency []model.DependencyLink
Expand All @@ -93,6 +98,14 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio
return mDependency, nil
}

func getDepSelectString(startTs time.Time, endTs time.Time) string {
var dateBuckets []string
for ts := startTs.Truncate(day); ts.Before(endTs); ts = ts.Add(day) {
dateBuckets = append(dateBuckets, ts.Format(dateFmt))
}
return fmt.Sprintf(depsSelectFmt, strings.Join(dateBuckets, ","))
}

func (s *DependencyStore) timeIntervalToPoints(endTs time.Time, lookback time.Duration) []time.Time {
startTs := endTs.Add(-lookback)
var days []time.Time
Expand Down
6 changes: 3 additions & 3 deletions plugin/storage/cassandra/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ func TestDependencyStoreWrite(t *testing.T) {
} else {
assert.Fail(t, "expecting first arg as time.Time", "received: %+v", args)
}
if d, ok := args[1].(time.Time); ok {
assert.Equal(t, ts, d)
if d, ok := args[1].(string); ok {
assert.Equal(t, ts.Format(dateFmt), d)
} else {
assert.Fail(t, "expecting second arg as time.Time", "received: %+v", args)
assert.Fail(t, "expecting second arg as string", "received: %+v", args)
}
if d, ok := args[2].([]Dependency); ok {
assert.Equal(t, []Dependency{
Expand Down
15 changes: 5 additions & 10 deletions plugin/storage/cassandra/schema/v001.cql.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -188,20 +188,15 @@ CREATE TYPE IF NOT EXISTS ${keyspace}.dependency (
);

-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data
-- note we have to write ts twice (once as ts_index). This is because we cannot make a SASI index on the primary key
CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies (
ts timestamp,
ts_index timestamp,
ts timestamp,
date_bucket bigint,
dependencies list<frozen<dependency>>,
PRIMARY KEY (ts)
)
WITH compaction = {
PRIMARY KEY (date_bucket, ts)
) WITH CLUSTERING ORDER BY (ts DESC)
AND compaction = {
'min_threshold': '4',
'max_threshold': '32',
'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy'
}
AND default_time_to_live = ${dependencies_ttl};

CREATE CUSTOM INDEX IF NOT EXISTS ON ${keyspace}.dependencies (ts_index)
USING 'org.apache.cassandra.index.sasi.SASIIndex'
WITH OPTIONS = {'mode': 'SPARSE'};

0 comments on commit 5e58764

Please sign in to comment.