Skip to content

Commit

Permalink
actually address comments
Browse files Browse the repository at this point in the history
Signed-off-by: Won Jun Jang <wjang@uber.com>
  • Loading branch information
black-adder committed Feb 12, 2019
1 parent 87ceb72 commit efeaa92
Show file tree
Hide file tree
Showing 8 changed files with 105 additions and 105 deletions.
30 changes: 15 additions & 15 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,21 @@ import (

// Configuration describes the configuration properties needed to connect to a Cassandra cluster
type Configuration struct {
Servers []string `validate:"nonzero"`
Keyspace string `validate:"nonzero"`
LocalDC string `yaml:"local_dc"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host"`
Timeout time.Duration `validate:"min=500"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt"`
ProtoVersion int `yaml:"proto_version"`
Consistency string `yaml:"consistency"`
Port int `yaml:"port"`
Authenticator Authenticator `yaml:"authenticator"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery"`
DependencySASIDisabled bool `yaml:"dependency_sasi_disabled"`
TLS TLS
Servers []string `validate:"nonzero"`
Keyspace string `validate:"nonzero"`
LocalDC string `yaml:"local_dc"`
ConnectionsPerHost int `validate:"min=1" yaml:"connections_per_host"`
Timeout time.Duration `validate:"min=500"`
ReconnectInterval time.Duration `validate:"min=500" yaml:"reconnect_interval"`
SocketKeepAlive time.Duration `validate:"min=0" yaml:"socket_keep_alive"`
MaxRetryAttempts int `validate:"min=0" yaml:"max_retry_attempt"`
ProtoVersion int `yaml:"proto_version"`
Consistency string `yaml:"consistency"`
Port int `yaml:"port"`
Authenticator Authenticator `yaml:"authenticator"`
DisableAutoDiscovery bool `yaml:"disable_auto_discovery"`
EnableDependenciesV2 bool `yaml:"enable_dependencies_v2"`
TLS TLS
}

// Authenticator holds the authentication properties needed to connect to a Cassandra cluster
Expand Down
58 changes: 29 additions & 29 deletions plugin/storage/cassandra/dependencystore/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,58 +26,58 @@ import (
casMetrics "github.com/jaegertracing/jaeger/pkg/cassandra/metrics"
)

// IndexMode determines how the dependency data is indexed.
type IndexMode int
// Version determines which version of the dependencies table to use.
type Version int

// IsValid returns true if the IndexMode is a valid one.
func (i IndexMode) IsValid() bool {
// IsValid returns true if the Version is a valid one.
func (i Version) IsValid() bool {
return i < end
}

const (
// SASIEnabled is used when the dependency table is SASI indexed.
SASIEnabled IndexMode = iota
// V1 is used when the dependency table is SASI indexed.
V1 Version = iota

// SASIDisabled is used when the dependency table is NOT SASI indexed.
SASIDisabled
// V2 is used when the dependency table is NOT SASI indexed.
V2
end

depsInsertStmtSASI = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsInsertStmt = "INSERT INTO dependencies_v2(ts, ts_bucket, dependencies) VALUES (?, ?, ?)"
depsSelectStmtSASI = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsSelectStmt = "SELECT ts, dependencies FROM dependencies_v2 WHERE ts_bucket IN ? AND ts >= ? AND ts < ?"
depsInsertStmtV1 = "INSERT INTO dependencies(ts, ts_index, dependencies) VALUES (?, ?, ?)"
depsInsertStmtV2 = "INSERT INTO dependencies_v2(ts, ts_bucket, dependencies) VALUES (?, ?, ?)"
depsSelectStmtV1 = "SELECT ts, dependencies FROM dependencies WHERE ts_index >= ? AND ts_index < ?"
depsSelectStmtV2 = "SELECT ts, dependencies FROM dependencies_v2 WHERE ts_bucket IN ? AND ts >= ? AND ts < ?"

// TODO: Make this customizable.
tsBucket = 24 * time.Hour
)

var (
errInvalidIndexMode = errors.New("invalid index mode")
errInvalidVersion = errors.New("invalid version")
)

// DependencyStore handles all queries and insertions to Cassandra dependencies
type DependencyStore struct {
session cassandra.Session
dependenciesTableMetrics *casMetrics.Table
logger *zap.Logger
indexMode IndexMode
version Version
}

// NewDependencyStore returns a DependencyStore
func NewDependencyStore(
session cassandra.Session,
metricsFactory metrics.Factory,
logger *zap.Logger,
indexMode IndexMode,
version Version,
) (*DependencyStore, error) {
if !indexMode.IsValid() {
return nil, errInvalidIndexMode
if !version.IsValid() {
return nil, errInvalidVersion
}
return &DependencyStore{
session: session,
dependenciesTableMetrics: casMetrics.NewTable(metricsFactory, "dependencies"),
logger: logger,
indexMode: indexMode,
version: version,
}, nil
}

Expand All @@ -90,18 +90,18 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
Child: d.Child,
CallCount: int64(d.CallCount),
}
if s.indexMode == SASIDisabled {
if s.version == V2 {
dep.Source = string(d.Source)
}
deps[i] = dep
}

var query cassandra.Query
switch s.indexMode {
case SASIDisabled:
query = s.session.Query(depsInsertStmt, ts, ts.Truncate(tsBucket), deps)
case SASIEnabled:
query = s.session.Query(depsInsertStmtSASI, ts, ts, deps)
switch s.version {
case V1:
query = s.session.Query(depsInsertStmtV1, ts, ts, deps)
case V2:
query = s.session.Query(depsInsertStmtV2, ts, ts.Truncate(tsBucket), deps)
}
return s.dependenciesTableMetrics.Exec(query, s.logger)
}
Expand All @@ -110,11 +110,11 @@ func (s *DependencyStore) WriteDependencies(ts time.Time, dependencies []model.D
func (s *DependencyStore) GetDependencies(endTs time.Time, lookback time.Duration) ([]model.DependencyLink, error) {
startTs := endTs.Add(-1 * lookback)
var query cassandra.Query
switch s.indexMode {
case SASIDisabled:
query = s.session.Query(depsSelectStmt, getBuckets(startTs, endTs), startTs, endTs)
case SASIEnabled:
query = s.session.Query(depsSelectStmtSASI, startTs, endTs)
switch s.version {
case V1:
query = s.session.Query(depsSelectStmtV1, startTs, endTs)
case V2:
query = s.session.Query(depsSelectStmtV2, getBuckets(startTs, endTs), startTs, endTs)
}
iter := query.Consistency(cassandra.One).Iter()

Expand Down
50 changes: 25 additions & 25 deletions plugin/storage/cassandra/dependencystore/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ type depStorageTest struct {
storage *DependencyStore
}

func withDepStore(indexMode IndexMode, fn func(s *depStorageTest)) {
func withDepStore(version Version, fn func(s *depStorageTest)) {
session := &mocks.Session{}
logger, logBuffer := testutils.NewLogger()
metricsFactory := metricstest.NewFactory(time.Second)
defer metricsFactory.Stop()
store, _ := NewDependencyStore(session, metricsFactory, logger, indexMode)
store, _ := NewDependencyStore(session, metricsFactory, logger, version)
s := &depStorageTest{
session: session,
logger: logger,
Expand All @@ -58,35 +58,35 @@ func withDepStore(indexMode IndexMode, fn func(s *depStorageTest)) {
var _ dependencystore.Reader = &DependencyStore{} // check API conformance
var _ dependencystore.Writer = &DependencyStore{} // check API conformance

func TestIndexModeIsValid(t *testing.T) {
assert.True(t, SASIEnabled.IsValid())
assert.True(t, SASIDisabled.IsValid())
func TestVersionIsValid(t *testing.T) {
assert.True(t, V1.IsValid())
assert.True(t, V2.IsValid())
assert.False(t, end.IsValid())
}

func TestInvalidIndexMode(t *testing.T) {
func TestInvalidVersion(t *testing.T) {
_, err := NewDependencyStore(&mocks.Session{}, metrics.NullFactory, zap.NewNop(), end)
assert.Error(t, err)
}

func TestDependencyStoreWrite(t *testing.T) {
testCases := []struct {
caption string
indexMode IndexMode
caption string
version Version
}{
{
caption: "SASI enabled",
indexMode: SASIEnabled,
caption: "V1",
version: V1,
},
{
caption: "SASI disabled",
indexMode: SASIDisabled,
caption: "V2",
version: V2,
},
}
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run(testCase.caption, func(t *testing.T) {
withDepStore(testCase.indexMode, func(s *depStorageTest) {
withDepStore(testCase.version, func(s *depStorageTest) {
query := &mocks.Query{}
query.On("Exec").Return(nil)

Expand Down Expand Up @@ -116,7 +116,7 @@ func TestDependencyStoreWrite(t *testing.T) {
} else {
assert.Fail(t, "expecting first arg as time.Time", "received: %+v", args)
}
if testCase.indexMode == SASIDisabled {
if testCase.version == V2 {
if d, ok := args[1].(time.Time); ok {
assert.Equal(t, time.Date(2017, time.January, 24, 0, 0, 0, 0, time.UTC), d)
} else {
Expand All @@ -130,7 +130,7 @@ func TestDependencyStoreWrite(t *testing.T) {
}
}
if d, ok := args[2].([]Dependency); ok {
if testCase.indexMode == SASIDisabled {
if testCase.version == V2 {
assert.Equal(t, []Dependency{
{
Parent: "a",
Expand Down Expand Up @@ -162,39 +162,39 @@ func TestDependencyStoreGetDependencies(t *testing.T) {
queryError error
expectedError string
expectedLogs []string
indexMode IndexMode
version Version
}{
{
caption: "success SASI enabled",
indexMode: SASIEnabled,
caption: "success V1",
version: V1,
},
{
caption: "success SASI disabled",
indexMode: SASIDisabled,
caption: "success V2",
version: V2,
},
{
caption: "failure SASI enabled",
caption: "failure V1",
queryError: errors.New("query error"),
expectedError: "Error reading dependencies from storage: query error",
expectedLogs: []string{
"Failure to read Dependencies",
},
indexMode: SASIEnabled,
version: V1,
},
{
caption: "failure SASI disabled",
caption: "failure V2",
queryError: errors.New("query error"),
expectedError: "Error reading dependencies from storage: query error",
expectedLogs: []string{
"Failure to read Dependencies",
},
indexMode: SASIDisabled,
version: V2,
},
}
for _, tc := range testCases {
testCase := tc // capture loop var
t.Run(testCase.caption, func(t *testing.T) {
withDepStore(testCase.indexMode, func(s *depStorageTest) {
withDepStore(testCase.version, func(s *depStorageTest) {
scanMatcher := func() interface{} {
deps := [][]Dependency{
{
Expand Down
8 changes: 4 additions & 4 deletions plugin/storage/cassandra/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,11 @@ func (f *Factory) CreateSpanWriter() (spanstore.Writer, error) {

// CreateDependencyReader implements storage.Factory
func (f *Factory) CreateDependencyReader() (dependencystore.Reader, error) {
indexMode := cDepStore.SASIEnabled
if f.Options.GetPrimary().DependencySASIDisabled {
indexMode = cDepStore.SASIDisabled
version := cDepStore.V1
if f.Options.GetPrimary().EnableDependenciesV2 {
version = cDepStore.V2
}
return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, indexMode)
return cDepStore.NewDependencyStore(f.primarySession, f.primaryMetricsFactory, f.logger, version)
}

// CreateArchiveSpanReader implements storage.ArchiveFactory
Expand Down
2 changes: 1 addition & 1 deletion plugin/storage/cassandra/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestCassandraFactory(t *testing.T) {
logger, logBuf := testutils.NewLogger()
f := NewFactory()
v, command := config.Viperize(f.AddFlags)
command.ParseFlags([]string{"--cassandra-archive.enabled=true", "--cassandra.sasi-disabled=true"})
command.ParseFlags([]string{"--cassandra-archive.enabled=true", "--cassandra.enable-dependencies-v2=true"})
f.InitFromViper(v)

// after InitFromViper, f.primaryConfig points to a real session builder that will fail in unit tests,
Expand Down
50 changes: 25 additions & 25 deletions plugin/storage/cassandra/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,27 +26,27 @@ import (

const (
// session settings
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
suffixReconnectInterval = ".reconnect-interval"
suffixServers = ".servers"
suffixPort = ".port"
suffixKeyspace = ".keyspace"
suffixDC = ".local-dc"
suffixConsistency = ".consistency"
suffixProtoVer = ".proto-version"
suffixSocketKeepAlive = ".socket-keep-alive"
suffixUsername = ".username"
suffixPassword = ".password"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixServerName = ".tls.server-name"
suffixVerifyHost = ".tls.verify-host"
suffixSASIDisabled = ".sasi-disabled"
suffixEnabled = ".enabled"
suffixConnPerHost = ".connections-per-host"
suffixMaxRetryAttempts = ".max-retry-attempts"
suffixTimeout = ".timeout"
suffixReconnectInterval = ".reconnect-interval"
suffixServers = ".servers"
suffixPort = ".port"
suffixKeyspace = ".keyspace"
suffixDC = ".local-dc"
suffixConsistency = ".consistency"
suffixProtoVer = ".proto-version"
suffixSocketKeepAlive = ".socket-keep-alive"
suffixUsername = ".username"
suffixPassword = ".password"
suffixTLS = ".tls"
suffixCert = ".tls.cert"
suffixKey = ".tls.key"
suffixCA = ".tls.ca"
suffixServerName = ".tls.server-name"
suffixVerifyHost = ".tls.verify-host"
suffixEnableDependenciesV2 = ".enable-dependencies-v2"

// common storage settings
suffixSpanStoreWriteCacheTTL = ".span-store-write-cache-ttl"
Expand Down Expand Up @@ -199,9 +199,9 @@ func addFlags(flagSet *flag.FlagSet, nsConfig *namespaceConfig) {
nsConfig.TLS.EnableHostVerification,
"Enable (or disable) host key verification")
flagSet.Bool(
nsConfig.namespace+suffixSASIDisabled,
nsConfig.DependencySASIDisabled,
"Disable (or enable) SASI indexes for the dependencies table. Only set this to true if you've migrated the dependencies table to NOT use SASI indexes")
nsConfig.namespace+suffixEnableDependenciesV2,
nsConfig.EnableDependenciesV2,
"Disable (or enable) the dependencies v2 table. Only set this to true if you've migrated the dependencies table to v2")
}

// InitFromViper initializes Options with properties from viper
Expand Down Expand Up @@ -236,7 +236,7 @@ func (cfg *namespaceConfig) initFromViper(v *viper.Viper) {
cfg.TLS.CaPath = v.GetString(cfg.namespace + suffixCA)
cfg.TLS.ServerName = v.GetString(cfg.namespace + suffixServerName)
cfg.TLS.EnableHostVerification = v.GetBool(cfg.namespace + suffixVerifyHost)
cfg.DependencySASIDisabled = v.GetBool(cfg.namespace + suffixSASIDisabled)
cfg.EnableDependenciesV2 = v.GetBool(cfg.namespace + suffixEnableDependenciesV2)
}

// GetPrimary returns primary configuration.
Expand Down
Loading

0 comments on commit efeaa92

Please sign in to comment.