Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove SASI indices #1328

Merged
merged 12 commits into from
Feb 14, 2019
Merged
11 changes: 11 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,17 @@ Changes by Version

#### Backend Changes

- Remove cassandra SASI indices [#1328](https://github.com/jaegertracing/jaeger/pull/1328)

Migration Path:

1. Run `plugin/storage/cassandra/schema/migration/v001tov002part1.sh` which will copy dependencies into a csv, update the `dependency UDT`, create a new `dependencies_v2` table, and write dependencies from the csv into the `dependencies_v2` table.
2. Run the collector and query services with the cassandra flag `cassandra.enable-dependencies-v2=true` which will instruct jaeger to write and read to and from the new `dependencies_v2` table.
3. Update [spark job](https://github.com/jaegertracing/spark-dependencies) to write to the new `dependencies_v2` table.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should rather say "update Spark job to version N"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll punt on this until I've actually made the change in spark.

4. Run `plugin/storage/cassandra/schema/migration/v001tov002part2.sh` which will DELETE the old dependency table and the SASI index.

Users who wish to continue to use the v1 table don't have to do anything as the cassandra flag `cassandra.enable-dependencies-v2` will default to false. Users may migrate on their own timeline however new features will be built solely on the `dependencies_v2` table. In the future, we will remove support for v1 completely.

##### Breaking Changes

##### New Features
Expand Down
23 changes: 20 additions & 3 deletions model/dependencies.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,26 @@

package model

// DependencyLinkSource is the source of data used to generate the dependencies.
type DependencyLinkSource string

const (
// JaegerDependencyLinkSource describes a dependency diagram that was generated from Jaeger traces.
JaegerDependencyLinkSource = DependencyLinkSource("jaeger")
)

// DependencyLink shows dependencies between services
type DependencyLink struct {
Parent string `json:"parent"`
Child string `json:"child"`
CallCount uint64 `json:"callCount"`
Parent string `json:"parent"`
Child string `json:"child"`
CallCount uint64 `json:"callCount"`
Source DependencyLinkSource `json:"source"`
}

// ApplyDefaults applies defaults to the DependencyLink.
func (d DependencyLink) ApplyDefaults() DependencyLink {
if d.Source == "" {
d.Source = JaegerDependencyLinkSource
}
return d
}
30 changes: 30 additions & 0 deletions model/dependencies_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright (c) 2019 The Jaeger Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package model

import (
"testing"

"github.com/stretchr/testify/assert"
)

func TestDependencyLinkApplyDefaults(t *testing.T) {
dl := DependencyLink{}.ApplyDefaults()
assert.Equal(t, JaegerDependencyLinkSource, dl.Source)

networkSource := DependencyLinkSource("network")
dl = DependencyLink{Source: networkSource}.ApplyDefaults()
assert.Equal(t, networkSource, dl.Source)
}
1 change: 1 addition & 0 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type Configuration struct {
Port int `yaml:"port"`
Authenticator Authenticator `yaml:"authenticator"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery"`
EnableDependenciesV2 bool `yaml:"enable_dependencies_v2"`
TLS TLS
}

Expand Down
5 changes: 5 additions & 0 deletions plugin/storage/cassandra/dependencystore/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type Dependency struct {
Parent string `cql:"parent"`
Child string `cql:"child"`
CallCount int64 `cql:"call_count"` // always unsigned, but we cannot explicitly read uint64 from Cassandra
Source string `cql:"source"`
}

// MarshalUDT handles marshalling a Dependency.
Expand All @@ -36,6 +37,8 @@ func (d *Dependency) MarshalUDT(name string, info gocql.TypeInfo) ([]byte, error
return gocql.Marshal(info, d.Child)
case "call_count":
return gocql.Marshal(info, d.CallCount)
case "source":
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what happens if you run this code against the schema where UDT was not upgraded? Will gocql simply not invoke this function for "source"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

return gocql.Marshal(info, d.Source)
default:
return nil, fmt.Errorf("unknown column for position: %q", name)
}
Expand All @@ -50,6 +53,8 @@ func (d *Dependency) UnmarshalUDT(name string, info gocql.TypeInfo, data []byte)
return gocql.Unmarshal(info, data, &d.Child)
case "call_count":
return gocql.Unmarshal(info, data, &d.CallCount)
case "source":
return gocql.Unmarshal(info, data, &d.Source)
default:
return fmt.Errorf("unknown column for position: %q", name)
}
Expand Down
10 changes: 6 additions & 4 deletions plugin/storage/cassandra/dependencystore/model_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,21 @@ import (

func TestDependencyUDT(t *testing.T) {
dependency := &Dependency{
Parent: "goo",
Child: "gle",
Parent: "bi",
Child: "ng",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

quit raising entropy

CallCount: 123,
Source: "jaeger",
}

testCase := testutils.UDTTestCase{
Obj: dependency,
New: func() gocql.UDTUnmarshaler { return &Dependency{} },
ObjName: "Dependency",
Fields: []testutils.UDTField{
{Name: "parent", Type: gocql.TypeAscii, ValIn: []byte("goo"), Err: false},
{Name: "child", Type: gocql.TypeAscii, ValIn: []byte("gle"), Err: false},
{Name: "parent", Type: gocql.TypeAscii, ValIn: []byte("bi"), Err: false},
{Name: "child", Type: gocql.TypeAscii, ValIn: []byte("ng"), Err: false},
{Name: "call_count", Type: gocql.TypeBigInt, ValIn: []byte{0, 0, 0, 0, 0, 0, 0, 123}, Err: false},
{Name: "source", Type: gocql.TypeAscii, ValIn: []byte("jaeger"), Err: false},
{Name: "wrong-field", Err: true},
},
}
Expand Down
72 changes: 64 additions & 8 deletions plugin/storage/cassandra/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,29 +26,59 @@ import (
casMetrics "github.com/jaegertracing/jaeger/pkg/cassandra/metrics"
)

// Version determines which version of the dependencies table to use.
type Version int

// IsValid returns true if the Version is a valid one.
func (i Version) IsValid() bool {
return i >= 0 && i < versionEnumEnd
}

const (
depsInsertStmt = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsSelectStmt = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
// V1 is used when the dependency table is SASI indexed.
V1 Version = iota

// V2 is used when the dependency table is NOT SASI indexed.
V2
versionEnumEnd

depsInsertStmtV1 = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsInsertStmtV2 = "INSERT INTO dependencies_v2(ts, ts_bucket, dependencies) VALUES (?, ?, ?)"
depsSelectStmtV1 = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsSelectStmtV2 = "SELECT ts, dependencies FROM dependencies_v2 WHERE ts_bucket IN ? AND ts >= ? AND ts < ?"

// TODO: Make this customizable.
tsBucket = 24 * time.Hour
)

var (
errInvalidVersion = errors.New("invalid version")
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
type DependencyStore struct {
session cassandra.Session
dependenciesTableMetrics *casMetrics.Table
logger *zap.Logger
version Version
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(
session cassandra.Session,
metricsFactory metrics.Factory,
logger *zap.Logger,
) *DependencyStore {
version Version,
) (*DependencyStore, error) {
if !version.IsValid() {
return nil, errInvalidVersion
}
return &DependencyStore{
session: session,
dependenciesTableMetrics: casMetrics.NewTable(metricsFactory, "dependencies"),
logger: logger,
}
version: version,
}, nil
}

// WriteDependencies implements dependencystore.Writer#WriteDependencies.
Expand All @@ -59,27 +89,44 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
Parent: d.Parent,
Child: d.Child,
CallCount: int64(d.CallCount),
Source: string(d.Source),
}
}
query := s.session.Query(depsInsertStmt, ts, ts, deps)

var query cassandra.Query
switch s.version {
case V1:
query = s.session.Query(depsInsertStmtV1, ts, ts, deps)
case V2:
query = s.session.Query(depsInsertStmtV2, ts, ts.Truncate(tsBucket), deps)
}
return s.dependenciesTableMetrics.Exec(query, s.logger)
}

// GetDependencies returns all interservice dependencies
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
query := s.session.Query(depsSelectStmt, endTs.Add(-1*lookback), endTs)
startTs := endTs.Add(-1 * lookback)
var query cassandra.Query
switch s.version {
case V1:
query = s.session.Query(depsSelectStmtV1, startTs, endTs)
case V2:
query = s.session.Query(depsSelectStmtV2, getBuckets(startTs, endTs), startTs, endTs)
}
iter := query.Consistency(cassandra.One).Iter()

var mDependency []model.DependencyLink
var dependencies []Dependency
var ts time.Time
for iter.Scan(&ts, &dependencies) {
for _, dependency := range dependencies {
mDependency = append(mDependency, model.DependencyLink{
dl := model.DependencyLink{
Parent: dependency.Parent,
Child: dependency.Child,
CallCount: uint64(dependency.CallCount),
})
Source: model.DependencyLinkSource(dependency.Source),
}.ApplyDefaults()
mDependency = append(mDependency, dl)
}
}

Expand All @@ -89,3 +136,12 @@ func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duratio
}
return mDependency, nil
}

func getBuckets(startTs time.Time, endTs time.Time) []time.Time {
// TODO: Preallocate the array using some maths and maybe use a pool? This endpoint probably isn't used enough to warrant this.
var tsBuckets []time.Time
for ts := startTs.Truncate(tsBucket); ts.Before(endTs); ts = ts.Add(tsBucket) {
tsBuckets = append(tsBuckets, ts)
}
return tsBuckets
}
Loading