diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 6671203b187..2b739a71b3a 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -91,7 +91,7 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateArchiveSpanWriter() assert.EqualError(t, err, "archive storage not configured") - f.archiveConfig = &mockSessionBuilder{} + f.archiveConfig = newMockSessionBuilder(session, nil) assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) _, err = f.CreateArchiveSpanReader() diff --git a/plugin/storage/cassandra/schema/migration/V002toV003.sh b/plugin/storage/cassandra/schema/migration/V002toV003.sh new file mode 100644 index 00000000000..9d53b06ca07 --- /dev/null +++ b/plugin/storage/cassandra/schema/migration/V002toV003.sh @@ -0,0 +1,100 @@ +#!/usr/bin/env bash + +# Create a new operation_names_v2 table and copy all data from operation_names table +# Sample usage: KEYSPACE=jaeger_v1_test TIMEOUT=1000 ./plugin/storage/cassandra/schema/migration/v002tov003.sh + +set -euo pipefail + +function usage { + >&2 echo "Error: $1" + >&2 echo "" + >&2 echo "Usage: KEYSPACE={keyspace} TTL={ttl} $0" + >&2 echo "" + >&2 echo "The following parameters can be set via environment:" + >&2 echo " KEYSPACE - keyspace" + >&2 echo "" + exit 1 +} + +confirm() { + read -r -p "${1:-Continue? [y/N]} " response + case "$response" in + [yY][eE][sS]|[yY]) + true + ;; + *) + exit 1 + ;; + esac +} + +if [[ ${KEYSPACE} == "" ]]; then + usage "missing KEYSPACE parameter" +fi + +if [[ ${KEYSPACE} =~ [^a-zA-Z0-9_] ]]; then + usage "invalid characters in KEYSPACE=$KEYSPACE parameter, please use letters, digits or underscores" +fi + +keyspace=${KEYSPACE} +old_table=operation_names +new_table=operation_names_v2 +cqlsh_cmd=cqlsh + +row_count=$(${cqlsh_cmd} -e "select count(*) from $keyspace.$old_table;"|head -4|tail -1| tr -d ' ') + +echo "About to copy $row_count rows to new table..." + +confirm + +${cqlsh_cmd} -e "COPY $keyspace.$old_table (service_name, operation_name) to '$old_table.csv';" + +if [[ ! -f ${old_table}.csv ]]; then + echo "Could not find $old_table.csv. Backup from cassandra was probably not successful" + exit 1 +fi + +csv_rows=$(wc -l ${old_table}.csv | tr -dc '0-9') + +if [[ ${row_count} -ne ${csv_rows} ]]; then + echo "Number of rows: $csv_rows in file is not equal to number of rows: $row_count in cassandra" + exit 1 +fi + +echo "Generating data for new table..." +while IFS="," read service_name operation_name; do + echo "$service_name,,$operation_name" +done < ${old_table}.csv > ${new_table}.csv + +ttl=$(${cqlsh_cmd} -e "select default_time_to_live from system_schema.tables WHERE keyspace_name='$keyspace' AND table_name='$old_table';"|head -4|tail -1|tr -d ' ') + +echo "Creating new table $new_table with ttl: $ttl" + +${cqlsh_cmd} -e "CREATE TABLE IF NOT EXISTS $keyspace.$new_table ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = $ttl + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800;" + +echo "Import data to new table: $keyspace.$new_table from $new_table.csv" + +# empty string will be inserted as empty string instead of null +${cqlsh_cmd} -e "COPY $keyspace.$new_table (service_name, span_kind, operation_name) + FROM '$new_table.csv' + WITH NULL='NIL';" + +echo "Data from old table are successfully imported to new table!" + +echo "Before finish, do you want to delete old table: $keyspace.$old_table?" +confirm +${cqlsh_cmd} -e "DROP TABLE IF EXISTS $keyspace.$old_table;" \ No newline at end of file diff --git a/plugin/storage/cassandra/schema/v003.cql.tmpl b/plugin/storage/cassandra/schema/v003.cql.tmpl new file mode 100644 index 00000000000..b1b664d3dd0 --- /dev/null +++ b/plugin/storage/cassandra/schema/v003.cql.tmpl @@ -0,0 +1,204 @@ +-- +-- Creates Cassandra keyspace with tables for traces and dependencies. +-- +-- Required parameters: +-- +-- keyspace +-- name of the keyspace +-- replication +-- replication strategy for the keyspace, such as +-- for prod environments +-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } +-- for test environments +-- {'class': 'SimpleStrategy', 'replication_factor': '1'} +-- trace_ttl +-- default time to live for trace data, in seconds +-- dependencies_ttl +-- default time to live for dependencies data, in seconds (0 for no TTL) +-- +-- Non-configurable settings: +-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ +-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html + +CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; + +CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.log ( + ts bigint, + fields list>, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.process ( + service_name text, + tags list>, +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, + duration bigint, + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names_v2 ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( + service_name text, + bucket int, + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( + service_name text, // service name + operation_name text, // operation name, or blank for queries without span name + bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour + duration bigint, // span duration, in microseconds + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( + parent text, + child text, + call_count bigint, + source text, +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${dependencies_ttl}; \ No newline at end of file diff --git a/plugin/storage/cassandra/spanstore/operation_names.go b/plugin/storage/cassandra/spanstore/operation_names.go index de73cf4347e..71a8215c4ed 100644 --- a/plugin/storage/cassandra/spanstore/operation_names.go +++ b/plugin/storage/cassandra/spanstore/operation_names.go @@ -16,6 +16,7 @@ package spanstore import ( + "fmt" "time" "github.com/pkg/errors" @@ -25,18 +26,67 @@ import ( "github.com/jaegertracing/jaeger/pkg/cache" "github.com/jaegertracing/jaeger/pkg/cassandra" casMetrics "github.com/jaegertracing/jaeger/pkg/cassandra/metrics" + "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( - insertOperationName = `INSERT INTO operation_names(service_name, operation_name) VALUES (?, ?)` - queryOperationNames = `SELECT operation_name FROM operation_names WHERE service_name = ?` + // latestVersion of operation_names table + // increase the version if your table schema changes require code change + latestVersion = schemaVersion("v2") + + // previous version of operation_names table + // if latest version does not work, will fail back to use previous version + previousVersion = schemaVersion("v1") + + // tableCheckStmt the query statement used to check if a table exists or not + tableCheckStmt = "SELECT * from %s limit 1" ) +type schemaVersion string + +type tableMeta struct { + tableName string + insertStmt string + queryByKindStmt string + queryStmt string + createWriteQuery func(query cassandra.Query, service, kind, opName string) cassandra.Query + getOperations func(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error) +} + +func (t *tableMeta) materialize() { + t.insertStmt = fmt.Sprintf(t.insertStmt, t.tableName) + t.queryByKindStmt = fmt.Sprintf(t.queryByKindStmt, t.tableName) + t.queryStmt = fmt.Sprintf(t.queryStmt, t.tableName) +} + +var schemas = map[schemaVersion]*tableMeta{ + previousVersion: { + tableName: "operation_names", + insertStmt: "INSERT INTO %s(service_name, operation_name) VALUES (?, ?)", + queryByKindStmt: "SELECT operation_name FROM %s WHERE service_name = ?", + queryStmt: "SELECT operation_name FROM %s WHERE service_name = ?", + getOperations: getOperationsV1, + createWriteQuery: func(query cassandra.Query, service, kind, opName string) cassandra.Query { + return query.Bind(service, opName) + }, + }, + latestVersion: { + tableName: "operation_names_v2", + insertStmt: "INSERT INTO %s(service_name, span_kind, operation_name) VALUES (?, ?, ?)", + queryByKindStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ? AND span_kind = ?", + queryStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ?", + getOperations: getOperationsV2, + createWriteQuery: func(query cassandra.Query, service, kind, opName string) cassandra.Query { + return query.Bind(service, kind, opName) + }, + }, +} + // OperationNamesStorage stores known operation names by service. type OperationNamesStorage struct { // CQL statements are public so that Cassandra2 storage can override them - InsertStmt string - QueryStmt string + schemaVersion schemaVersion + table *tableMeta session cassandra.Session writeCacheTTL time.Duration metrics *casMetrics.Table @@ -51,11 +101,19 @@ func NewOperationNamesStorage( metricsFactory metrics.Factory, logger *zap.Logger, ) *OperationNamesStorage { + + schemaVersion := latestVersion + if !tableExist(session, schemas[schemaVersion].tableName) { + schemaVersion = previousVersion + } + table := schemas[schemaVersion] + table.materialize() + return &OperationNamesStorage{ session: session, - InsertStmt: insertOperationName, - QueryStmt: queryOperationNames, - metrics: casMetrics.NewTable(metricsFactory, "operation_names"), + schemaVersion: schemaVersion, + table: table, + metrics: casMetrics.NewTable(metricsFactory, schemas[schemaVersion].tableName), writeCacheTTL: writeCacheTTL, logger: logger, operationNames: cache.NewLRUWithOptions( @@ -70,9 +128,11 @@ func NewOperationNamesStorage( // Write saves Operation and Service name tuples func (s *OperationNamesStorage) Write(serviceName string, operationName string) error { var err error - query := s.session.Query(s.InsertStmt) - if inCache := checkWriteCache(serviceName+"|"+operationName, s.operationNames, s.writeCacheTTL); !inCache { - q := query.Bind(serviceName, operationName) + //TODO: take spanKind from args + spanKind := "" + + if inCache := checkWriteCache(serviceName+"|"+spanKind+"|"+operationName, s.operationNames, s.writeCacheTTL); !inCache { + q := s.table.createWriteQuery(s.session.Query(s.table.insertStmt), serviceName, spanKind, operationName) err2 := s.metrics.Exec(q, s.logger) if err2 != nil { err = err2 @@ -83,16 +143,68 @@ func (s *OperationNamesStorage) Write(serviceName string, operationName string) // GetOperations returns all operations for a specific service traced by Jaeger func (s *OperationNamesStorage) GetOperations(service string) ([]string, error) { - iter := s.session.Query(s.QueryStmt, service).Iter() + operations, err := s.table.getOperations(s, &spanstore.OperationQueryParameters{ + ServiceName: service, + }) + + if err != nil { + return nil, err + } + //TODO: return operations instead of list of string + operationNames := make([]string, len(operations)) + for idx, operation := range operations { + operationNames[idx] = operation.Name + } + return operationNames, err +} + +func tableExist(session cassandra.Session, tableName string) bool { + query := session.Query(fmt.Sprintf(tableCheckStmt, tableName)) + err := query.Exec() + return err == nil +} + +func getOperationsV1(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error) { + iter := s.session.Query(s.table.queryStmt, query.ServiceName).Iter() var operation string - var operations []string + var operations []*spanstore.Operation for iter.Scan(&operation) { - operations = append(operations, operation) + operations = append(operations, &spanstore.Operation{ + Name: operation, + }) } if err := iter.Close(); err != nil { err = errors.Wrap(err, "Error reading operation_names from storage") return nil, err } + + return operations, nil +} + +func getOperationsV2(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error) { + var casQuery cassandra.Query + if query.SpanKind == "" { + // Get operations for all spanKind + casQuery = s.session.Query(s.table.queryStmt, query.ServiceName) + } else { + // Get operations for given spanKind + casQuery = s.session.Query(s.table.queryByKindStmt, query.ServiceName, query.SpanKind) + } + iter := casQuery.Iter() + + var operationName string + var spanKind string + var operations []*spanstore.Operation + for iter.Scan(&spanKind, &operationName) { + operations = append(operations, &spanstore.Operation{ + Name: operationName, + SpanKind: spanKind, + }) + } + if err := iter.Close(); err != nil { + err = errors.Wrap(err, fmt.Sprintf("Error reading %s from storage", s.table.tableName)) + return nil, err + } return operations, nil } diff --git a/plugin/storage/cassandra/spanstore/operation_names_test.go b/plugin/storage/cassandra/spanstore/operation_names_test.go index ca8357542f8..91b0e812e2e 100644 --- a/plugin/storage/cassandra/spanstore/operation_names_test.go +++ b/plugin/storage/cassandra/spanstore/operation_names_test.go @@ -39,10 +39,22 @@ type operationNameStorageTest struct { storage *OperationNamesStorage } -func withOperationNamesStorage(writeCacheTTL time.Duration, fn func(s *operationNameStorageTest)) { +func withOperationNamesStorage(writeCacheTTL time.Duration, + schemaVersion schemaVersion, + fn func(s *operationNameStorageTest)) { + session := &mocks.Session{} logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) + query := &mocks.Query{} + session.On("Query", + fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName), mock.Anything).Return(query) + if schemaVersion != latestVersion { + query.On("Exec").Return(errors.New("new table does not exist")) + } else { + query.On("Exec").Return(nil) + } + s := &operationNameStorageTest{ session: session, writeCacheTTL: writeCacheTTL, @@ -55,38 +67,55 @@ func withOperationNamesStorage(writeCacheTTL time.Duration, fn func(s *operation } func TestOperationNamesStorageWrite(t *testing.T) { - for _, ttl := range []time.Duration{0, time.Minute} { - writeCacheTTL := ttl // capture loop var - t.Run(fmt.Sprintf("writeCacheTTL=%v", writeCacheTTL), func(t *testing.T) { - withOperationNamesStorage(writeCacheTTL, func(s *operationNameStorageTest) { + for _, test := range []struct { + name string + ttl time.Duration + schemaVersion schemaVersion + }{ + {name: "test old schema with 0 ttl", ttl: 0, schemaVersion: previousVersion}, + {name: "test old schema with 1min ttl", ttl: time.Minute, schemaVersion: previousVersion}, + {name: "test new schema with 0 ttl", ttl: 0, schemaVersion: latestVersion}, + {name: "test new schema with 1min ttl", ttl: time.Minute, schemaVersion: latestVersion}, + } { + t.Run(test.name, func(t *testing.T) { + withOperationNamesStorage(test.ttl, test.schemaVersion, func(s *operationNameStorageTest) { var execError = errors.New("exec error") query := &mocks.Query{} query1 := &mocks.Query{} query2 := &mocks.Query{} - query.On("Bind", []interface{}{"service-a", "Operation-b"}).Return(query1) - query.On("Bind", []interface{}{"service-c", "operation-d"}).Return(query2) + + if test.schemaVersion == previousVersion { + query.On("Bind", []interface{}{"service-a", "Operation-b"}).Return(query1) + query.On("Bind", []interface{}{"service-c", "operation-d"}).Return(query2) + } else { + query.On("Bind", []interface{}{"service-a", "", "Operation-b"}).Return(query1) + query.On("Bind", []interface{}{"service-c", "", "operation-d"}).Return(query2) + } + query1.On("Exec").Return(nil) query2.On("Exec").Return(execError) - query2.On("String").Return("select from operation_names") + query2.On("String").Return("select from " + schemas[test.schemaVersion].tableName) - var emptyArgs []interface{} - s.session.On("Query", mock.AnythingOfType("string"), emptyArgs).Return(query) + s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) err := s.storage.Write("service-a", "Operation-b") assert.NoError(t, err) err = s.storage.Write("service-c", "operation-d") - assert.EqualError(t, err, "failed to Exec query 'select from operation_names': exec error") + assert.EqualError(t, err, + "failed to Exec query 'select from "+schemas[test.schemaVersion].tableName+"': exec error") assert.Equal(t, map[string]string{ "level": "error", "msg": "Failed to exec query", - "query": "select from operation_names", + "query": "select from " + schemas[test.schemaVersion].tableName, "error": "exec error", }, s.logBuffer.JSONLine(0)) counts, _ := s.metricsFactory.Snapshot() assert.Equal(t, map[string]int64{ - "attempts|table=operation_names": 2, "inserts|table=operation_names": 1, "errors|table=operation_names": 1, + "attempts|table=" + schemas[test.schemaVersion].tableName: 2, + "inserts|table=" + schemas[test.schemaVersion].tableName: 1, + "errors|table=" + schemas[test.schemaVersion].tableName: 1, }, counts, "after first two writes") // write again @@ -95,10 +124,10 @@ func TestOperationNamesStorageWrite(t *testing.T) { counts2, _ := s.metricsFactory.Snapshot() expCounts := counts - if writeCacheTTL == 0 { + if test.ttl == 0 { // without write cache, the second write must succeed - expCounts["attempts|table=operation_names"]++ - expCounts["inserts|table=operation_names"]++ + expCounts["attempts|table="+schemas[test.schemaVersion].tableName]++ + expCounts["inserts|table="+schemas[test.schemaVersion].tableName]++ } assert.Equal(t, expCounts, counts2) }) @@ -108,38 +137,49 @@ func TestOperationNamesStorageWrite(t *testing.T) { func TestOperationNamesStorageGetServices(t *testing.T) { var scanError = errors.New("scan error") - var writeCacheTTL time.Duration - var matched bool - matchOnce := mock.MatchedBy(func(v []interface{}) bool { - if matched { - return false - } - matched = true - return true - }) - matchEverything := mock.MatchedBy(func(v []interface{}) bool { return true }) - for _, expErr := range []error{nil, scanError} { - withOperationNamesStorage(writeCacheTTL, func(s *operationNameStorageTest) { - iter := &mocks.Iterator{} - iter.On("Scan", matchOnce).Return(true) - iter.On("Scan", matchEverything).Return(false) // false to stop the loop - iter.On("Close").Return(expErr) - - query := &mocks.Query{} - query.On("Iter").Return(iter) - - s.session.On("Query", mock.AnythingOfType("string"), []interface{}{"service-a"}).Return(query) - - services, err := s.storage.GetOperations("service-a") - if expErr == nil { - assert.NoError(t, err) - // expect empty string because mock iter.Scan(&placeholder) does not write to `placeholder` - assert.Equal(t, []string{""}, services) - } else { - assert.EqualError(t, err, "Error reading operation_names from storage: "+expErr.Error()) - } - }) + for _, test := range []struct { + name string + schemaVersion schemaVersion + expErr error + }{ + {name: "test old schema without error", schemaVersion: previousVersion, expErr: nil}, + {name: "test old schema with scan error", schemaVersion: previousVersion, expErr: scanError}, + {name: "test new schema without error", schemaVersion: latestVersion, expErr: nil}, + {name: "test new schema with scan error", schemaVersion: latestVersion, expErr: scanError}, + } { + t.Run(test.name, func(t *testing.T) { + withOperationNamesStorage(0, test.schemaVersion, func(s *operationNameStorageTest) { + var matched bool + matchOnce := mock.MatchedBy(func(v []interface{}) bool { + if matched { + return false + } + matched = true + return true + }) + matchEverything := mock.MatchedBy(func(v []interface{}) bool { return true }) + + iter := &mocks.Iterator{} + iter.On("Scan", matchOnce).Return(true) + iter.On("Scan", matchEverything).Return(false) // false to stop the loop + iter.On("Close").Return(test.expErr) + query := &mocks.Query{} + query.On("Iter").Return(iter) + + s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) + services, err := s.storage.GetOperations("service-a") + if test.expErr == nil { + assert.NoError(t, err) + // expect empty string because mock iter.Scan(&placeholder) does not write to `placeholder` + assert.Equal(t, []string{""}, services) + } else { + assert.EqualError(t, err, + fmt.Sprintf("Error reading %s from storage: %s", + schemas[test.schemaVersion].tableName, + test.expErr.Error())) + } + }) + }) } - } diff --git a/plugin/storage/cassandra/spanstore/reader_test.go b/plugin/storage/cassandra/spanstore/reader_test.go index 5f60eeff361..24e2af007cf 100644 --- a/plugin/storage/cassandra/spanstore/reader_test.go +++ b/plugin/storage/cassandra/spanstore/reader_test.go @@ -18,6 +18,7 @@ package spanstore import ( "context" "errors" + "fmt" "strings" "testing" "time" @@ -45,6 +46,9 @@ type spanReaderTest struct { func withSpanReader(fn func(r *spanReaderTest)) { session := &mocks.Session{} + query := &mocks.Query{} + session.On("Query", fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName), mock.Anything).Return(query) + query.On("Exec").Return(nil) logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) r := &spanReaderTest{ diff --git a/plugin/storage/cassandra/spanstore/writer_test.go b/plugin/storage/cassandra/spanstore/writer_test.go index e911d117bae..460125e42ac 100644 --- a/plugin/storage/cassandra/spanstore/writer_test.go +++ b/plugin/storage/cassandra/spanstore/writer_test.go @@ -17,11 +17,13 @@ package spanstore import ( "errors" + "fmt" "strings" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/uber/jaeger-lib/metrics/metricstest" "go.uber.org/zap" @@ -42,6 +44,9 @@ type spanWriterTest struct { func withSpanWriter(writeCacheTTL time.Duration, fn func(w *spanWriterTest), options ...Option, ) { session := &mocks.Session{} + query := &mocks.Query{} + session.On("Query", fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName), mock.Anything).Return(query) + query.On("Exec").Return(nil) logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) w := &spanWriterTest{