From 31ada18c6f280d5005a8e9de41281cf89f8229f1 Mon Sep 17 00:00:00 2001 From: Jun Guo Date: Tue, 19 Nov 2019 15:03:51 -0500 Subject: [PATCH 1/4] Create new cassandra table "operation_names_v2" with "spanKind" column for operation name index - add migration script - read from the latest table if available, otherwise fail back to previous table Signed-off-by: Jun Guo --- plugin/storage/cassandra/factory_test.go | 2 +- .../cassandra/schema/migration/V002toV003.sh | 100 +++++++++ plugin/storage/cassandra/schema/v003.cql.tmpl | 204 ++++++++++++++++++ .../cassandra/spanstore/operation_names.go | 149 +++++++++++-- .../spanstore/operation_names_test.go | 104 ++++++--- .../cassandra/spanstore/reader_test.go | 4 + .../cassandra/spanstore/writer_test.go | 5 + 7 files changed, 513 insertions(+), 55 deletions(-) create mode 100644 plugin/storage/cassandra/schema/migration/V002toV003.sh create mode 100644 plugin/storage/cassandra/schema/v003.cql.tmpl diff --git a/plugin/storage/cassandra/factory_test.go b/plugin/storage/cassandra/factory_test.go index 6671203b187..2b739a71b3a 100644 --- a/plugin/storage/cassandra/factory_test.go +++ b/plugin/storage/cassandra/factory_test.go @@ -91,7 +91,7 @@ func TestCassandraFactory(t *testing.T) { _, err = f.CreateArchiveSpanWriter() assert.EqualError(t, err, "archive storage not configured") - f.archiveConfig = &mockSessionBuilder{} + f.archiveConfig = newMockSessionBuilder(session, nil) assert.NoError(t, f.Initialize(metrics.NullFactory, zap.NewNop())) _, err = f.CreateArchiveSpanReader() diff --git a/plugin/storage/cassandra/schema/migration/V002toV003.sh b/plugin/storage/cassandra/schema/migration/V002toV003.sh new file mode 100644 index 00000000000..9d53b06ca07 --- /dev/null +++ b/plugin/storage/cassandra/schema/migration/V002toV003.sh @@ -0,0 +1,100 @@ +#!/usr/bin/env bash + +# Create a new operation_names_v2 table and copy all data from operation_names table +# Sample usage: KEYSPACE=jaeger_v1_test TIMEOUT=1000 ./plugin/storage/cassandra/schema/migration/v002tov003.sh + +set -euo pipefail + +function usage { + >&2 echo "Error: $1" + >&2 echo "" + >&2 echo "Usage: KEYSPACE={keyspace} TTL={ttl} $0" + >&2 echo "" + >&2 echo "The following parameters can be set via environment:" + >&2 echo " KEYSPACE - keyspace" + >&2 echo "" + exit 1 +} + +confirm() { + read -r -p "${1:-Continue? [y/N]} " response + case "$response" in + [yY][eE][sS]|[yY]) + true + ;; + *) + exit 1 + ;; + esac +} + +if [[ ${KEYSPACE} == "" ]]; then + usage "missing KEYSPACE parameter" +fi + +if [[ ${KEYSPACE} =~ [^a-zA-Z0-9_] ]]; then + usage "invalid characters in KEYSPACE=$KEYSPACE parameter, please use letters, digits or underscores" +fi + +keyspace=${KEYSPACE} +old_table=operation_names +new_table=operation_names_v2 +cqlsh_cmd=cqlsh + +row_count=$(${cqlsh_cmd} -e "select count(*) from $keyspace.$old_table;"|head -4|tail -1| tr -d ' ') + +echo "About to copy $row_count rows to new table..." + +confirm + +${cqlsh_cmd} -e "COPY $keyspace.$old_table (service_name, operation_name) to '$old_table.csv';" + +if [[ ! -f ${old_table}.csv ]]; then + echo "Could not find $old_table.csv. Backup from cassandra was probably not successful" + exit 1 +fi + +csv_rows=$(wc -l ${old_table}.csv | tr -dc '0-9') + +if [[ ${row_count} -ne ${csv_rows} ]]; then + echo "Number of rows: $csv_rows in file is not equal to number of rows: $row_count in cassandra" + exit 1 +fi + +echo "Generating data for new table..." +while IFS="," read service_name operation_name; do + echo "$service_name,,$operation_name" +done < ${old_table}.csv > ${new_table}.csv + +ttl=$(${cqlsh_cmd} -e "select default_time_to_live from system_schema.tables WHERE keyspace_name='$keyspace' AND table_name='$old_table';"|head -4|tail -1|tr -d ' ') + +echo "Creating new table $new_table with ttl: $ttl" + +${cqlsh_cmd} -e "CREATE TABLE IF NOT EXISTS $keyspace.$new_table ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = $ttl + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800;" + +echo "Import data to new table: $keyspace.$new_table from $new_table.csv" + +# empty string will be inserted as empty string instead of null +${cqlsh_cmd} -e "COPY $keyspace.$new_table (service_name, span_kind, operation_name) + FROM '$new_table.csv' + WITH NULL='NIL';" + +echo "Data from old table are successfully imported to new table!" + +echo "Before finish, do you want to delete old table: $keyspace.$old_table?" +confirm +${cqlsh_cmd} -e "DROP TABLE IF EXISTS $keyspace.$old_table;" \ No newline at end of file diff --git a/plugin/storage/cassandra/schema/v003.cql.tmpl b/plugin/storage/cassandra/schema/v003.cql.tmpl new file mode 100644 index 00000000000..b1b664d3dd0 --- /dev/null +++ b/plugin/storage/cassandra/schema/v003.cql.tmpl @@ -0,0 +1,204 @@ +-- +-- Creates Cassandra keyspace with tables for traces and dependencies. +-- +-- Required parameters: +-- +-- keyspace +-- name of the keyspace +-- replication +-- replication strategy for the keyspace, such as +-- for prod environments +-- {'class': 'NetworkTopologyStrategy', '$datacenter': '${replication_factor}' } +-- for test environments +-- {'class': 'SimpleStrategy', 'replication_factor': '1'} +-- trace_ttl +-- default time to live for trace data, in seconds +-- dependencies_ttl +-- default time to live for dependencies data, in seconds (0 for no TTL) +-- +-- Non-configurable settings: +-- gc_grace_seconds is non-zero, see: http://www.uberobert.com/cassandra_gc_grace_disables_hinted_handoff/ +-- For TTL of 2 days, compaction window is 1 hour, rule of thumb here: http://thelastpickle.com/blog/2016/12/08/TWCS-part1.html + +CREATE KEYSPACE IF NOT EXISTS ${keyspace} WITH replication = ${replication}; + +CREATE TYPE IF NOT EXISTS ${keyspace}.keyvalue ( + key text, + value_type text, + value_string text, + value_bool boolean, + value_long bigint, + value_double double, + value_binary blob, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.log ( + ts bigint, + fields list>, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.span_ref ( + ref_type text, + trace_id blob, + span_id bigint, +); + +CREATE TYPE IF NOT EXISTS ${keyspace}.process ( + service_name text, + tags list>, +); + +-- Notice we have span_hash. This exists only for zipkin backwards compat. Zipkin allows spans with the same ID. +-- Note: Cassandra re-orders non-PK columns alphabetically, so the table looks differently in CQLSH "describe table". +-- start_time is bigint instead of timestamp as we require microsecond precision +CREATE TABLE IF NOT EXISTS ${keyspace}.traces ( + trace_id blob, + span_id bigint, + span_hash bigint, + parent_id bigint, + operation_name text, + flags int, + start_time bigint, + duration bigint, + tags list>, + logs list>, + refs list>, + process frozen, + PRIMARY KEY (trace_id, span_id, span_hash) +) + WITH compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_names ( + service_name text, + PRIMARY KEY (service_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.operation_names_v2 ( + service_name text, + span_kind text, + operation_name text, + PRIMARY KEY ((service_name), span_kind, operation_name) +) + WITH compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- index of trace IDs by service + operation names, sorted by span start_time. +CREATE TABLE IF NOT EXISTS ${keyspace}.service_operation_index ( + service_name text, + operation_name text, + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, operation_name), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.service_name_index ( + service_name text, + bucket int, + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, bucket), start_time) +) WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TABLE IF NOT EXISTS ${keyspace}.duration_index ( + service_name text, // service name + operation_name text, // operation name, or blank for queries without span name + bucket timestamp, // time bucket, - the start_time of the given span rounded to an hour + duration bigint, // span duration, in microseconds + start_time bigint, + trace_id blob, + PRIMARY KEY ((service_name, operation_name, bucket), duration, start_time, trace_id) +) WITH CLUSTERING ORDER BY (duration DESC, start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +-- a bucketing strategy may have to be added for tag queries +-- we can make this table even better by adding a timestamp to it +CREATE TABLE IF NOT EXISTS ${keyspace}.tag_index ( + service_name text, + tag_key text, + tag_value text, + start_time bigint, + trace_id blob, + span_id bigint, + PRIMARY KEY ((service_name, tag_key, tag_value), start_time, trace_id, span_id) +) + WITH CLUSTERING ORDER BY (start_time DESC) + AND compaction = { + 'compaction_window_size': '1', + 'compaction_window_unit': 'HOURS', + 'class': 'org.apache.cassandra.db.compaction.TimeWindowCompactionStrategy' + } + AND dclocal_read_repair_chance = 0.0 + AND default_time_to_live = ${trace_ttl} + AND speculative_retry = 'NONE' + AND gc_grace_seconds = 10800; -- 3 hours of downtime acceptable on nodes + +CREATE TYPE IF NOT EXISTS ${keyspace}.dependency ( + parent text, + child text, + call_count bigint, + source text, +); + +-- compaction strategy is intentionally different as compared to other tables due to the size of dependencies data +CREATE TABLE IF NOT EXISTS ${keyspace}.dependencies_v2 ( + ts_bucket timestamp, + ts timestamp, + dependencies list>, + PRIMARY KEY (ts_bucket, ts) +) WITH CLUSTERING ORDER BY (ts DESC) + AND compaction = { + 'min_threshold': '4', + 'max_threshold': '32', + 'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy' + } + AND default_time_to_live = ${dependencies_ttl}; \ No newline at end of file diff --git a/plugin/storage/cassandra/spanstore/operation_names.go b/plugin/storage/cassandra/spanstore/operation_names.go index de73cf4347e..8a2f993a693 100644 --- a/plugin/storage/cassandra/spanstore/operation_names.go +++ b/plugin/storage/cassandra/spanstore/operation_names.go @@ -16,6 +16,7 @@ package spanstore import ( + "fmt" "time" "github.com/pkg/errors" @@ -25,23 +26,51 @@ import ( "github.com/jaegertracing/jaeger/pkg/cache" "github.com/jaegertracing/jaeger/pkg/cassandra" casMetrics "github.com/jaegertracing/jaeger/pkg/cassandra/metrics" + "github.com/jaegertracing/jaeger/storage/spanstore" ) const ( - insertOperationName = `INSERT INTO operation_names(service_name, operation_name) VALUES (?, ?)` - queryOperationNames = `SELECT operation_name FROM operation_names WHERE service_name = ?` + // LatestVersion latest version of operation_names table schema, increase the version if your table schema changes require code change + LatestVersion = 1 + // TableQueryStmt the query statement used to check if a table exists or not + TableQueryStmt = "SELECT * from %s limit 1" ) +type schemaMeta struct { + TableName string + InsertStmt string + QueryByKindStmt string + QueryStmt string +} + +var schemas = []schemaMeta{ + { + TableName: "operation_names", + InsertStmt: "INSERT INTO %s(service_name, operation_name) VALUES (?, ?)", + QueryByKindStmt: "SELECT operation_name FROM %s WHERE service_name = ?", + QueryStmt: "SELECT operation_name FROM %s WHERE service_name = ?", + }, + { + TableName: "operation_names_v2", + InsertStmt: "INSERT INTO %s(service_name, span_kind, operation_name) VALUES (?, ?, ?)", + QueryByKindStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ? AND span_kind = ?", + QueryStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ?", + }, +} + // OperationNamesStorage stores known operation names by service. type OperationNamesStorage struct { // CQL statements are public so that Cassandra2 storage can override them - InsertStmt string - QueryStmt string - session cassandra.Session - writeCacheTTL time.Duration - metrics *casMetrics.Table - operationNames cache.Cache - logger *zap.Logger + SchemaVersion int + TableName string + InsertStmt string + QueryStmt string + QueryByKindStmt string + session cassandra.Session + writeCacheTTL time.Duration + metrics *casMetrics.Table + operationNames cache.Cache + logger *zap.Logger } // NewOperationNamesStorage returns a new OperationNamesStorage @@ -51,13 +80,23 @@ func NewOperationNamesStorage( metricsFactory metrics.Factory, logger *zap.Logger, ) *OperationNamesStorage { + + schemaVersion := LatestVersion + + if !tableExist(session, schemas[schemaVersion].TableName) { + schemaVersion = schemaVersion - 1 + } + return &OperationNamesStorage{ - session: session, - InsertStmt: insertOperationName, - QueryStmt: queryOperationNames, - metrics: casMetrics.NewTable(metricsFactory, "operation_names"), - writeCacheTTL: writeCacheTTL, - logger: logger, + session: session, + TableName: schemas[schemaVersion].TableName, + SchemaVersion: schemaVersion, + InsertStmt: fmt.Sprintf(schemas[schemaVersion].InsertStmt, schemas[schemaVersion].TableName), + QueryByKindStmt: fmt.Sprintf(schemas[schemaVersion].QueryByKindStmt, schemas[schemaVersion].TableName), + QueryStmt: fmt.Sprintf(schemas[schemaVersion].QueryStmt, schemas[schemaVersion].TableName), + metrics: casMetrics.NewTable(metricsFactory, schemas[schemaVersion].TableName), + writeCacheTTL: writeCacheTTL, + logger: logger, operationNames: cache.NewLRUWithOptions( 100000, &cache.Options{ @@ -67,12 +106,27 @@ func NewOperationNamesStorage( } } +func tableExist(session cassandra.Session, tableName string) bool { + query := session.Query(fmt.Sprintf(TableQueryStmt, tableName)) + err := query.Exec() + return err == nil +} + // Write saves Operation and Service name tuples func (s *OperationNamesStorage) Write(serviceName string, operationName string) error { var err error + //TODO: take spanKind from args + spanKind := "" query := s.session.Query(s.InsertStmt) - if inCache := checkWriteCache(serviceName+"|"+operationName, s.operationNames, s.writeCacheTTL); !inCache { - q := query.Bind(serviceName, operationName) + if inCache := checkWriteCache(serviceName+"|"+spanKind+"|"+operationName, s.operationNames, s.writeCacheTTL); !inCache { + var q cassandra.Query + switch s.SchemaVersion { + case 1: + q = query.Bind(serviceName, spanKind, operationName) + case 0: + q = query.Bind(serviceName, operationName) + } + err2 := s.metrics.Exec(q, s.logger) if err2 != nil { err = err2 @@ -83,16 +137,73 @@ func (s *OperationNamesStorage) Write(serviceName string, operationName string) // GetOperations returns all operations for a specific service traced by Jaeger func (s *OperationNamesStorage) GetOperations(service string) ([]string, error) { + var operations []*spanstore.Operation + var err error + + switch s.SchemaVersion { + case 1: + operations, err = getOperationsV1(s, &spanstore.OperationQueryParameters{ + ServiceName: service, + }) + case 0: + operations, err = getOperationsV0(s, service) + } + + if err != nil { + return nil, err + } + operationNames := make([]string, len(operations)) + for idx, operation := range operations { + operationNames[idx] = operation.Name + } + return operationNames, err +} + +func getOperationsV0(s *OperationNamesStorage, service string) ([]*spanstore.Operation, error) { iter := s.session.Query(s.QueryStmt, service).Iter() var operation string - var operations []string + var operationNames []string for iter.Scan(&operation) { - operations = append(operations, operation) + operationNames = append(operationNames, operation) } if err := iter.Close(); err != nil { err = errors.Wrap(err, "Error reading operation_names from storage") return nil, err } + + operations := make([]*spanstore.Operation, len(operationNames)) + for idx, name := range operationNames { + operations[idx] = &spanstore.Operation{ + Name: name, + } + } + return operations, nil +} + +func getOperationsV1(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error) { + var casQuery cassandra.Query + if query.SpanKind == "" { + // Get operations for all spanKind + casQuery = s.session.Query(s.QueryStmt, query.ServiceName) + } else { + // Get operations for given spanKind + casQuery = s.session.Query(s.QueryByKindStmt, query.ServiceName, query.SpanKind) + } + iter := casQuery.Iter() + + var operationName string + var spanKind string + var operations []*spanstore.Operation + for iter.Scan(&spanKind, &operationName) { + operations = append(operations, &spanstore.Operation{ + Name: operationName, + SpanKind: spanKind, + }) + } + if err := iter.Close(); err != nil { + err = errors.Wrap(err, fmt.Sprintf("Error reading %s from storage", s.TableName)) + return nil, err + } return operations, nil } diff --git a/plugin/storage/cassandra/spanstore/operation_names_test.go b/plugin/storage/cassandra/spanstore/operation_names_test.go index ca8357542f8..86c257cf789 100644 --- a/plugin/storage/cassandra/spanstore/operation_names_test.go +++ b/plugin/storage/cassandra/spanstore/operation_names_test.go @@ -30,6 +30,12 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" ) +type test struct { + ttl time.Duration + schemaVersion int + expErr error +} + type operationNameStorageTest struct { session *mocks.Session writeCacheTTL time.Duration @@ -39,10 +45,18 @@ type operationNameStorageTest struct { storage *OperationNamesStorage } -func withOperationNamesStorage(writeCacheTTL time.Duration, fn func(s *operationNameStorageTest)) { +func withOperationNamesStorage(writeCacheTTL time.Duration, schemaVersion int, fn func(s *operationNameStorageTest)) { session := &mocks.Session{} logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) + query := &mocks.Query{} + session.On("Query", fmt.Sprintf(TableQueryStmt, schemas[LatestVersion].TableName), mock.Anything).Return(query) + if schemaVersion != LatestVersion { + query.On("Exec").Return(errors.New("new table does not exist")) + } else { + query.On("Exec").Return(nil) + } + s := &operationNameStorageTest{ session: session, writeCacheTTL: writeCacheTTL, @@ -55,38 +69,49 @@ func withOperationNamesStorage(writeCacheTTL time.Duration, fn func(s *operation } func TestOperationNamesStorageWrite(t *testing.T) { - for _, ttl := range []time.Duration{0, time.Minute} { - writeCacheTTL := ttl // capture loop var - t.Run(fmt.Sprintf("writeCacheTTL=%v", writeCacheTTL), func(t *testing.T) { - withOperationNamesStorage(writeCacheTTL, func(s *operationNameStorageTest) { + for _, test := range []test{ + {0, 0, nil}, + {time.Minute, 0, nil}, + {0, 1, nil}, + {time.Minute, 1, nil}, + } { + writeCacheTTL := test.ttl // capture loop var + t.Run(fmt.Sprintf("test %#v", test), func(t *testing.T) { + withOperationNamesStorage(writeCacheTTL, test.schemaVersion, func(s *operationNameStorageTest) { var execError = errors.New("exec error") query := &mocks.Query{} query1 := &mocks.Query{} query2 := &mocks.Query{} - query.On("Bind", []interface{}{"service-a", "Operation-b"}).Return(query1) - query.On("Bind", []interface{}{"service-c", "operation-d"}).Return(query2) + + if test.schemaVersion == 0 { + query.On("Bind", []interface{}{"service-a", "Operation-b"}).Return(query1) + query.On("Bind", []interface{}{"service-c", "operation-d"}).Return(query2) + } else { + query.On("Bind", []interface{}{"service-a", "", "Operation-b"}).Return(query1) + query.On("Bind", []interface{}{"service-c", "", "operation-d"}).Return(query2) + } + query1.On("Exec").Return(nil) query2.On("Exec").Return(execError) - query2.On("String").Return("select from operation_names") + query2.On("String").Return("select from " + schemas[test.schemaVersion].TableName) - var emptyArgs []interface{} - s.session.On("Query", mock.AnythingOfType("string"), emptyArgs).Return(query) + s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) err := s.storage.Write("service-a", "Operation-b") assert.NoError(t, err) err = s.storage.Write("service-c", "operation-d") - assert.EqualError(t, err, "failed to Exec query 'select from operation_names': exec error") + assert.EqualError(t, err, "failed to Exec query 'select from "+schemas[test.schemaVersion].TableName+"': exec error") assert.Equal(t, map[string]string{ "level": "error", "msg": "Failed to exec query", - "query": "select from operation_names", + "query": "select from " + schemas[test.schemaVersion].TableName, "error": "exec error", }, s.logBuffer.JSONLine(0)) counts, _ := s.metricsFactory.Snapshot() assert.Equal(t, map[string]int64{ - "attempts|table=operation_names": 2, "inserts|table=operation_names": 1, "errors|table=operation_names": 1, + "attempts|table=" + schemas[test.schemaVersion].TableName: 2, "inserts|table=" + schemas[test.schemaVersion].TableName: 1, "errors|table=" + schemas[test.schemaVersion].TableName: 1, }, counts, "after first two writes") // write again @@ -97,8 +122,8 @@ func TestOperationNamesStorageWrite(t *testing.T) { expCounts := counts if writeCacheTTL == 0 { // without write cache, the second write must succeed - expCounts["attempts|table=operation_names"]++ - expCounts["inserts|table=operation_names"]++ + expCounts["attempts|table="+schemas[test.schemaVersion].TableName]++ + expCounts["inserts|table="+schemas[test.schemaVersion].TableName]++ } assert.Equal(t, expCounts, counts2) }) @@ -118,28 +143,37 @@ func TestOperationNamesStorageGetServices(t *testing.T) { return true }) matchEverything := mock.MatchedBy(func(v []interface{}) bool { return true }) - for _, expErr := range []error{nil, scanError} { - withOperationNamesStorage(writeCacheTTL, func(s *operationNameStorageTest) { - iter := &mocks.Iterator{} - iter.On("Scan", matchOnce).Return(true) - iter.On("Scan", matchEverything).Return(false) // false to stop the loop - iter.On("Close").Return(expErr) + for _, test := range []test{ + {0, 0, nil}, + {0, 0, scanError}, + {0, 1, nil}, + {0, 1, scanError}, + } { + t.Run(fmt.Sprintf("test %#v", test), func(t *testing.T) { + withOperationNamesStorage(writeCacheTTL, test.schemaVersion, func(s *operationNameStorageTest) { + iter := &mocks.Iterator{} + iter.On("Scan", matchOnce).Return(true) + iter.On("Scan", matchEverything).Return(false) // false to stop the loop + iter.On("Close").Return(test.expErr) - query := &mocks.Query{} - query.On("Iter").Return(iter) - - s.session.On("Query", mock.AnythingOfType("string"), []interface{}{"service-a"}).Return(query) - - services, err := s.storage.GetOperations("service-a") - if expErr == nil { - assert.NoError(t, err) - // expect empty string because mock iter.Scan(&placeholder) does not write to `placeholder` - assert.Equal(t, []string{""}, services) - } else { - assert.EqualError(t, err, "Error reading operation_names from storage: "+expErr.Error()) - } + query := &mocks.Query{} + query.On("Iter").Return(iter) + + s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) + services, err := s.storage.GetOperations("service-a") + if test.expErr == nil { + assert.NoError(t, err) + if test.schemaVersion == 0 { + // expect one empty operation result because mock iter.Scan(&placeholder) does not write to `placeholder` + assert.Equal(t, []string{""}, services) + } else { + assert.Equal(t, []string{}, services) + } + } else { + assert.EqualError(t, err, fmt.Sprintf("Error reading %s from storage: %s", schemas[test.schemaVersion].TableName, test.expErr.Error())) + } + }) }) - } } diff --git a/plugin/storage/cassandra/spanstore/reader_test.go b/plugin/storage/cassandra/spanstore/reader_test.go index 5f60eeff361..95596731ca8 100644 --- a/plugin/storage/cassandra/spanstore/reader_test.go +++ b/plugin/storage/cassandra/spanstore/reader_test.go @@ -18,6 +18,7 @@ package spanstore import ( "context" "errors" + "fmt" "strings" "testing" "time" @@ -45,6 +46,9 @@ type spanReaderTest struct { func withSpanReader(fn func(r *spanReaderTest)) { session := &mocks.Session{} + query := &mocks.Query{} + session.On("Query", fmt.Sprintf(TableQueryStmt, schemas[LatestVersion].TableName), mock.Anything).Return(query) + query.On("Exec").Return(nil) logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) r := &spanReaderTest{ diff --git a/plugin/storage/cassandra/spanstore/writer_test.go b/plugin/storage/cassandra/spanstore/writer_test.go index e911d117bae..4d5ce7706c8 100644 --- a/plugin/storage/cassandra/spanstore/writer_test.go +++ b/plugin/storage/cassandra/spanstore/writer_test.go @@ -17,11 +17,13 @@ package spanstore import ( "errors" + "fmt" "strings" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/uber/jaeger-lib/metrics/metricstest" "go.uber.org/zap" @@ -42,6 +44,9 @@ type spanWriterTest struct { func withSpanWriter(writeCacheTTL time.Duration, fn func(w *spanWriterTest), options ...Option, ) { session := &mocks.Session{} + query := &mocks.Query{} + session.On("Query", fmt.Sprintf(TableQueryStmt, schemas[LatestVersion].TableName), mock.Anything).Return(query) + query.On("Exec").Return(nil) logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) w := &spanWriterTest{ From b9f7c2e0b47134286b0fa5a193edee982bf5c832 Mon Sep 17 00:00:00 2001 From: Jun Guo Date: Wed, 20 Nov 2019 16:39:10 -0500 Subject: [PATCH 2/4] Refactor schema versions Signed-off-by: Jun Guo --- .../cassandra/spanstore/operation_names.go | 152 +++++++------- .../spanstore/operation_names_test.go | 185 +++++++++--------- .../cassandra/spanstore/reader_test.go | 2 +- .../cassandra/spanstore/writer_test.go | 2 +- 4 files changed, 171 insertions(+), 170 deletions(-) diff --git a/plugin/storage/cassandra/spanstore/operation_names.go b/plugin/storage/cassandra/spanstore/operation_names.go index 8a2f993a693..5900e85fb4a 100644 --- a/plugin/storage/cassandra/spanstore/operation_names.go +++ b/plugin/storage/cassandra/spanstore/operation_names.go @@ -30,47 +30,66 @@ import ( ) const ( - // LatestVersion latest version of operation_names table schema, increase the version if your table schema changes require code change - LatestVersion = 1 - // TableQueryStmt the query statement used to check if a table exists or not - TableQueryStmt = "SELECT * from %s limit 1" + // latestVersion of operation_names table + // increase the version if your table schema changes require code change + latestVersion = "v2" + + // previous version of operation_names table + // if latest version does not work, will fail back to use previous version + previousVersion = "v1" + + // tableCheckStmt the query statement used to check if a table exists or not + tableCheckStmt = "SELECT * from %s limit 1" ) -type schemaMeta struct { - TableName string - InsertStmt string - QueryByKindStmt string - QueryStmt string +type tableMeta struct { + tableName string + insertStmt string + queryByKindStmt string + queryStmt string + createWriteQuery func(query cassandra.Query, service, kind, opName string) cassandra.Query + getOperations func(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error) +} + +func (t *tableMeta) materialize() { + t.insertStmt = fmt.Sprintf(t.insertStmt, t.tableName) + t.queryByKindStmt = fmt.Sprintf(t.queryByKindStmt, t.tableName) + t.queryStmt = fmt.Sprintf(t.queryStmt, t.tableName) } -var schemas = []schemaMeta{ - { - TableName: "operation_names", - InsertStmt: "INSERT INTO %s(service_name, operation_name) VALUES (?, ?)", - QueryByKindStmt: "SELECT operation_name FROM %s WHERE service_name = ?", - QueryStmt: "SELECT operation_name FROM %s WHERE service_name = ?", +var schemas = map[string]*tableMeta{ + "v1": { + tableName: "operation_names", + insertStmt: "INSERT INTO %s(service_name, operation_name) VALUES (?, ?)", + queryByKindStmt: "SELECT operation_name FROM %s WHERE service_name = ?", + queryStmt: "SELECT operation_name FROM %s WHERE service_name = ?", + getOperations: getOperationsV1, + createWriteQuery: func(query cassandra.Query, service, kind, opName string) cassandra.Query { + return query.Bind(service, opName) + }, }, - { - TableName: "operation_names_v2", - InsertStmt: "INSERT INTO %s(service_name, span_kind, operation_name) VALUES (?, ?, ?)", - QueryByKindStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ? AND span_kind = ?", - QueryStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ?", + "v2": { + tableName: "operation_names_v2", + insertStmt: "INSERT INTO %s(service_name, span_kind, operation_name) VALUES (?, ?, ?)", + queryByKindStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ? AND span_kind = ?", + queryStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ?", + getOperations: getOperationsV2, + createWriteQuery: func(query cassandra.Query, service, kind, opName string) cassandra.Query { + return query.Bind(service, kind, opName) + }, }, } // OperationNamesStorage stores known operation names by service. type OperationNamesStorage struct { // CQL statements are public so that Cassandra2 storage can override them - SchemaVersion int - TableName string - InsertStmt string - QueryStmt string - QueryByKindStmt string - session cassandra.Session - writeCacheTTL time.Duration - metrics *casMetrics.Table - operationNames cache.Cache - logger *zap.Logger + schemaVersion string + table *tableMeta + session cassandra.Session + writeCacheTTL time.Duration + metrics *casMetrics.Table + operationNames cache.Cache + logger *zap.Logger } // NewOperationNamesStorage returns a new OperationNamesStorage @@ -81,22 +100,20 @@ func NewOperationNamesStorage( logger *zap.Logger, ) *OperationNamesStorage { - schemaVersion := LatestVersion - - if !tableExist(session, schemas[schemaVersion].TableName) { - schemaVersion = schemaVersion - 1 + schemaVersion := latestVersion + if !tableExist(session, schemas[schemaVersion].tableName) { + schemaVersion = previousVersion } + table := schemas[schemaVersion] + table.materialize() return &OperationNamesStorage{ - session: session, - TableName: schemas[schemaVersion].TableName, - SchemaVersion: schemaVersion, - InsertStmt: fmt.Sprintf(schemas[schemaVersion].InsertStmt, schemas[schemaVersion].TableName), - QueryByKindStmt: fmt.Sprintf(schemas[schemaVersion].QueryByKindStmt, schemas[schemaVersion].TableName), - QueryStmt: fmt.Sprintf(schemas[schemaVersion].QueryStmt, schemas[schemaVersion].TableName), - metrics: casMetrics.NewTable(metricsFactory, schemas[schemaVersion].TableName), - writeCacheTTL: writeCacheTTL, - logger: logger, + session: session, + schemaVersion: schemaVersion, + table: table, + metrics: casMetrics.NewTable(metricsFactory, schemas[schemaVersion].tableName), + writeCacheTTL: writeCacheTTL, + logger: logger, operationNames: cache.NewLRUWithOptions( 100000, &cache.Options{ @@ -106,27 +123,14 @@ func NewOperationNamesStorage( } } -func tableExist(session cassandra.Session, tableName string) bool { - query := session.Query(fmt.Sprintf(TableQueryStmt, tableName)) - err := query.Exec() - return err == nil -} - // Write saves Operation and Service name tuples func (s *OperationNamesStorage) Write(serviceName string, operationName string) error { var err error //TODO: take spanKind from args spanKind := "" - query := s.session.Query(s.InsertStmt) - if inCache := checkWriteCache(serviceName+"|"+spanKind+"|"+operationName, s.operationNames, s.writeCacheTTL); !inCache { - var q cassandra.Query - switch s.SchemaVersion { - case 1: - q = query.Bind(serviceName, spanKind, operationName) - case 0: - q = query.Bind(serviceName, operationName) - } + if inCache := checkWriteCache(serviceName+"|"+spanKind+"|"+operationName, s.operationNames, s.writeCacheTTL); !inCache { + q := s.table.createWriteQuery(s.session.Query(s.table.insertStmt), serviceName, spanKind, operationName) err2 := s.metrics.Exec(q, s.logger) if err2 != nil { err = err2 @@ -137,17 +141,9 @@ func (s *OperationNamesStorage) Write(serviceName string, operationName string) // GetOperations returns all operations for a specific service traced by Jaeger func (s *OperationNamesStorage) GetOperations(service string) ([]string, error) { - var operations []*spanstore.Operation - var err error - - switch s.SchemaVersion { - case 1: - operations, err = getOperationsV1(s, &spanstore.OperationQueryParameters{ - ServiceName: service, - }) - case 0: - operations, err = getOperationsV0(s, service) - } + operations, err := s.table.getOperations(s, &spanstore.OperationQueryParameters{ + ServiceName: service, + }) if err != nil { return nil, err @@ -159,8 +155,14 @@ func (s *OperationNamesStorage) GetOperations(service string) ([]string, error) return operationNames, err } -func getOperationsV0(s *OperationNamesStorage, service string) ([]*spanstore.Operation, error) { - iter := s.session.Query(s.QueryStmt, service).Iter() +func tableExist(session cassandra.Session, tableName string) bool { + query := session.Query(fmt.Sprintf(tableCheckStmt, tableName)) + err := query.Exec() + return err == nil +} + +func getOperationsV1(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error) { + iter := s.session.Query(s.table.queryStmt, query.ServiceName).Iter() var operation string var operationNames []string @@ -181,14 +183,14 @@ func getOperationsV0(s *OperationNamesStorage, service string) ([]*spanstore.Ope return operations, nil } -func getOperationsV1(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error) { +func getOperationsV2(s *OperationNamesStorage, query *spanstore.OperationQueryParameters) ([]*spanstore.Operation, error) { var casQuery cassandra.Query if query.SpanKind == "" { // Get operations for all spanKind - casQuery = s.session.Query(s.QueryStmt, query.ServiceName) + casQuery = s.session.Query(s.table.queryStmt, query.ServiceName) } else { // Get operations for given spanKind - casQuery = s.session.Query(s.QueryByKindStmt, query.ServiceName, query.SpanKind) + casQuery = s.session.Query(s.table.queryByKindStmt, query.ServiceName, query.SpanKind) } iter := casQuery.Iter() @@ -202,7 +204,7 @@ func getOperationsV1(s *OperationNamesStorage, query *spanstore.OperationQueryPa }) } if err := iter.Close(); err != nil { - err = errors.Wrap(err, fmt.Sprintf("Error reading %s from storage", s.TableName)) + err = errors.Wrap(err, fmt.Sprintf("Error reading %s from storage", s.table.tableName)) return nil, err } return operations, nil diff --git a/plugin/storage/cassandra/spanstore/operation_names_test.go b/plugin/storage/cassandra/spanstore/operation_names_test.go index 86c257cf789..e348b4a0a56 100644 --- a/plugin/storage/cassandra/spanstore/operation_names_test.go +++ b/plugin/storage/cassandra/spanstore/operation_names_test.go @@ -30,12 +30,6 @@ import ( "github.com/jaegertracing/jaeger/pkg/testutils" ) -type test struct { - ttl time.Duration - schemaVersion int - expErr error -} - type operationNameStorageTest struct { session *mocks.Session writeCacheTTL time.Duration @@ -45,13 +39,13 @@ type operationNameStorageTest struct { storage *OperationNamesStorage } -func withOperationNamesStorage(writeCacheTTL time.Duration, schemaVersion int, fn func(s *operationNameStorageTest)) { +func withOperationNamesStorage(writeCacheTTL time.Duration, schemaVersion string, fn func(s *operationNameStorageTest)) { session := &mocks.Session{} logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) query := &mocks.Query{} - session.On("Query", fmt.Sprintf(TableQueryStmt, schemas[LatestVersion].TableName), mock.Anything).Return(query) - if schemaVersion != LatestVersion { + session.On("Query", fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName), mock.Anything).Return(query) + if schemaVersion != latestVersion { query.On("Exec").Return(errors.New("new table does not exist")) } else { query.On("Exec").Return(nil) @@ -69,71 +63,73 @@ func withOperationNamesStorage(writeCacheTTL time.Duration, schemaVersion int, f } func TestOperationNamesStorageWrite(t *testing.T) { - for _, test := range []test{ - {0, 0, nil}, - {time.Minute, 0, nil}, - {0, 1, nil}, - {time.Minute, 1, nil}, + for _, test := range []struct { + name string + ttl time.Duration + schemaVersion string + }{ + {name: "test old schema with 0 ttl", ttl: 0, schemaVersion: previousVersion}, + {name: "test old schema with 1min ttl", ttl: time.Minute, schemaVersion: previousVersion}, + {name: "test new schema with 0 ttl", ttl: 0, schemaVersion: latestVersion}, + {name: "test new schema with 1min ttl", ttl: time.Minute, schemaVersion: latestVersion}, } { + fmt.Printf(test.name) writeCacheTTL := test.ttl // capture loop var - t.Run(fmt.Sprintf("test %#v", test), func(t *testing.T) { - withOperationNamesStorage(writeCacheTTL, test.schemaVersion, func(s *operationNameStorageTest) { - var execError = errors.New("exec error") - query := &mocks.Query{} - query1 := &mocks.Query{} - query2 := &mocks.Query{} - - if test.schemaVersion == 0 { - query.On("Bind", []interface{}{"service-a", "Operation-b"}).Return(query1) - query.On("Bind", []interface{}{"service-c", "operation-d"}).Return(query2) - } else { - query.On("Bind", []interface{}{"service-a", "", "Operation-b"}).Return(query1) - query.On("Bind", []interface{}{"service-c", "", "operation-d"}).Return(query2) - } - - query1.On("Exec").Return(nil) - query2.On("Exec").Return(execError) - query2.On("String").Return("select from " + schemas[test.schemaVersion].TableName) - - s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) - - err := s.storage.Write("service-a", "Operation-b") - assert.NoError(t, err) - - err = s.storage.Write("service-c", "operation-d") - assert.EqualError(t, err, "failed to Exec query 'select from "+schemas[test.schemaVersion].TableName+"': exec error") - assert.Equal(t, map[string]string{ - "level": "error", - "msg": "Failed to exec query", - "query": "select from " + schemas[test.schemaVersion].TableName, - "error": "exec error", - }, s.logBuffer.JSONLine(0)) - - counts, _ := s.metricsFactory.Snapshot() - assert.Equal(t, map[string]int64{ - "attempts|table=" + schemas[test.schemaVersion].TableName: 2, "inserts|table=" + schemas[test.schemaVersion].TableName: 1, "errors|table=" + schemas[test.schemaVersion].TableName: 1, - }, counts, "after first two writes") - - // write again - err = s.storage.Write("service-a", "Operation-b") - assert.NoError(t, err) - - counts2, _ := s.metricsFactory.Snapshot() - expCounts := counts - if writeCacheTTL == 0 { - // without write cache, the second write must succeed - expCounts["attempts|table="+schemas[test.schemaVersion].TableName]++ - expCounts["inserts|table="+schemas[test.schemaVersion].TableName]++ - } - assert.Equal(t, expCounts, counts2) - }) + withOperationNamesStorage(writeCacheTTL, test.schemaVersion, func(s *operationNameStorageTest) { + var execError = errors.New("exec error") + query := &mocks.Query{} + query1 := &mocks.Query{} + query2 := &mocks.Query{} + + if test.schemaVersion == previousVersion { + query.On("Bind", []interface{}{"service-a", "Operation-b"}).Return(query1) + query.On("Bind", []interface{}{"service-c", "operation-d"}).Return(query2) + } else { + query.On("Bind", []interface{}{"service-a", "", "Operation-b"}).Return(query1) + query.On("Bind", []interface{}{"service-c", "", "operation-d"}).Return(query2) + } + + query1.On("Exec").Return(nil) + query2.On("Exec").Return(execError) + query2.On("String").Return("select from " + schemas[test.schemaVersion].tableName) + + s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) + + err := s.storage.Write("service-a", "Operation-b") + assert.NoError(t, err) + + err = s.storage.Write("service-c", "operation-d") + assert.EqualError(t, err, "failed to Exec query 'select from "+schemas[test.schemaVersion].tableName+"': exec error") + assert.Equal(t, map[string]string{ + "level": "error", + "msg": "Failed to exec query", + "query": "select from " + schemas[test.schemaVersion].tableName, + "error": "exec error", + }, s.logBuffer.JSONLine(0)) + + counts, _ := s.metricsFactory.Snapshot() + assert.Equal(t, map[string]int64{ + "attempts|table=" + schemas[test.schemaVersion].tableName: 2, "inserts|table=" + schemas[test.schemaVersion].tableName: 1, "errors|table=" + schemas[test.schemaVersion].tableName: 1, + }, counts, "after first two writes") + + // write again + err = s.storage.Write("service-a", "Operation-b") + assert.NoError(t, err) + + counts2, _ := s.metricsFactory.Snapshot() + expCounts := counts + if writeCacheTTL == 0 { + // without write cache, the second write must succeed + expCounts["attempts|table="+schemas[test.schemaVersion].tableName]++ + expCounts["inserts|table="+schemas[test.schemaVersion].tableName]++ + } + assert.Equal(t, expCounts, counts2) }) } } func TestOperationNamesStorageGetServices(t *testing.T) { var scanError = errors.New("scan error") - var writeCacheTTL time.Duration var matched bool matchOnce := mock.MatchedBy(func(v []interface{}) bool { if matched { @@ -143,36 +139,39 @@ func TestOperationNamesStorageGetServices(t *testing.T) { return true }) matchEverything := mock.MatchedBy(func(v []interface{}) bool { return true }) - for _, test := range []test{ - {0, 0, nil}, - {0, 0, scanError}, - {0, 1, nil}, - {0, 1, scanError}, + for _, test := range []struct { + name string + schemaVersion string + expErr error + }{ + {name: "test old schema without error", schemaVersion: previousVersion, expErr: nil}, + {name: "test old schema with scan error", schemaVersion: previousVersion, expErr: scanError}, + {name: "test new schema without error", schemaVersion: latestVersion, expErr: nil}, + {name: "test new schema with scan error", schemaVersion: latestVersion, expErr: scanError}, } { - t.Run(fmt.Sprintf("test %#v", test), func(t *testing.T) { - withOperationNamesStorage(writeCacheTTL, test.schemaVersion, func(s *operationNameStorageTest) { - iter := &mocks.Iterator{} - iter.On("Scan", matchOnce).Return(true) - iter.On("Scan", matchEverything).Return(false) // false to stop the loop - iter.On("Close").Return(test.expErr) - - query := &mocks.Query{} - query.On("Iter").Return(iter) - - s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) - services, err := s.storage.GetOperations("service-a") - if test.expErr == nil { - assert.NoError(t, err) - if test.schemaVersion == 0 { - // expect one empty operation result because mock iter.Scan(&placeholder) does not write to `placeholder` - assert.Equal(t, []string{""}, services) - } else { - assert.Equal(t, []string{}, services) - } + fmt.Printf(test.name) + withOperationNamesStorage(0, test.schemaVersion, func(s *operationNameStorageTest) { + iter := &mocks.Iterator{} + iter.On("Scan", matchOnce).Return(true) + iter.On("Scan", matchEverything).Return(false) // false to stop the loop + iter.On("Close").Return(test.expErr) + + query := &mocks.Query{} + query.On("Iter").Return(iter) + + s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) + services, err := s.storage.GetOperations("service-a") + if test.expErr == nil { + assert.NoError(t, err) + if test.schemaVersion == previousVersion { + // expect one empty operation result because mock iter.Scan(&placeholder) does not write to `placeholder` + assert.Equal(t, []string{""}, services) } else { - assert.EqualError(t, err, fmt.Sprintf("Error reading %s from storage: %s", schemas[test.schemaVersion].TableName, test.expErr.Error())) + assert.Equal(t, []string{}, services) } - }) + } else { + assert.EqualError(t, err, fmt.Sprintf("Error reading %s from storage: %s", schemas[test.schemaVersion].tableName, test.expErr.Error())) + } }) } diff --git a/plugin/storage/cassandra/spanstore/reader_test.go b/plugin/storage/cassandra/spanstore/reader_test.go index 95596731ca8..24e2af007cf 100644 --- a/plugin/storage/cassandra/spanstore/reader_test.go +++ b/plugin/storage/cassandra/spanstore/reader_test.go @@ -47,7 +47,7 @@ type spanReaderTest struct { func withSpanReader(fn func(r *spanReaderTest)) { session := &mocks.Session{} query := &mocks.Query{} - session.On("Query", fmt.Sprintf(TableQueryStmt, schemas[LatestVersion].TableName), mock.Anything).Return(query) + session.On("Query", fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName), mock.Anything).Return(query) query.On("Exec").Return(nil) logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) diff --git a/plugin/storage/cassandra/spanstore/writer_test.go b/plugin/storage/cassandra/spanstore/writer_test.go index 4d5ce7706c8..460125e42ac 100644 --- a/plugin/storage/cassandra/spanstore/writer_test.go +++ b/plugin/storage/cassandra/spanstore/writer_test.go @@ -45,7 +45,7 @@ func withSpanWriter(writeCacheTTL time.Duration, fn func(w *spanWriterTest), opt ) { session := &mocks.Session{} query := &mocks.Query{} - session.On("Query", fmt.Sprintf(TableQueryStmt, schemas[LatestVersion].TableName), mock.Anything).Return(query) + session.On("Query", fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName), mock.Anything).Return(query) query.On("Exec").Return(nil) logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) From 6539d277f88611210373c3dfa468d9fe9b9ae223 Mon Sep 17 00:00:00 2001 From: Jun Guo Date: Thu, 21 Nov 2019 19:11:48 -0500 Subject: [PATCH 3/4] Using typed constant for schemaVersion Signed-off-by: Jun Guo --- .../cassandra/spanstore/operation_names.go | 27 ++- .../spanstore/operation_names_test.go | 176 +++++++++--------- 2 files changed, 105 insertions(+), 98 deletions(-) diff --git a/plugin/storage/cassandra/spanstore/operation_names.go b/plugin/storage/cassandra/spanstore/operation_names.go index 5900e85fb4a..71a8215c4ed 100644 --- a/plugin/storage/cassandra/spanstore/operation_names.go +++ b/plugin/storage/cassandra/spanstore/operation_names.go @@ -32,16 +32,18 @@ import ( const ( // latestVersion of operation_names table // increase the version if your table schema changes require code change - latestVersion = "v2" + latestVersion = schemaVersion("v2") // previous version of operation_names table // if latest version does not work, will fail back to use previous version - previousVersion = "v1" + previousVersion = schemaVersion("v1") // tableCheckStmt the query statement used to check if a table exists or not tableCheckStmt = "SELECT * from %s limit 1" ) +type schemaVersion string + type tableMeta struct { tableName string insertStmt string @@ -57,8 +59,8 @@ func (t *tableMeta) materialize() { t.queryStmt = fmt.Sprintf(t.queryStmt, t.tableName) } -var schemas = map[string]*tableMeta{ - "v1": { +var schemas = map[schemaVersion]*tableMeta{ + previousVersion: { tableName: "operation_names", insertStmt: "INSERT INTO %s(service_name, operation_name) VALUES (?, ?)", queryByKindStmt: "SELECT operation_name FROM %s WHERE service_name = ?", @@ -68,7 +70,7 @@ var schemas = map[string]*tableMeta{ return query.Bind(service, opName) }, }, - "v2": { + latestVersion: { tableName: "operation_names_v2", insertStmt: "INSERT INTO %s(service_name, span_kind, operation_name) VALUES (?, ?, ?)", queryByKindStmt: "SELECT span_kind, operation_name FROM %s WHERE service_name = ? AND span_kind = ?", @@ -83,7 +85,7 @@ var schemas = map[string]*tableMeta{ // OperationNamesStorage stores known operation names by service. type OperationNamesStorage struct { // CQL statements are public so that Cassandra2 storage can override them - schemaVersion string + schemaVersion schemaVersion table *tableMeta session cassandra.Session writeCacheTTL time.Duration @@ -148,6 +150,7 @@ func (s *OperationNamesStorage) GetOperations(service string) ([]string, error) if err != nil { return nil, err } + //TODO: return operations instead of list of string operationNames := make([]string, len(operations)) for idx, operation := range operations { operationNames[idx] = operation.Name @@ -165,21 +168,17 @@ func getOperationsV1(s *OperationNamesStorage, query *spanstore.OperationQueryPa iter := s.session.Query(s.table.queryStmt, query.ServiceName).Iter() var operation string - var operationNames []string + var operations []*spanstore.Operation for iter.Scan(&operation) { - operationNames = append(operationNames, operation) + operations = append(operations, &spanstore.Operation{ + Name: operation, + }) } if err := iter.Close(); err != nil { err = errors.Wrap(err, "Error reading operation_names from storage") return nil, err } - operations := make([]*spanstore.Operation, len(operationNames)) - for idx, name := range operationNames { - operations[idx] = &spanstore.Operation{ - Name: name, - } - } return operations, nil } diff --git a/plugin/storage/cassandra/spanstore/operation_names_test.go b/plugin/storage/cassandra/spanstore/operation_names_test.go index e348b4a0a56..920831f4586 100644 --- a/plugin/storage/cassandra/spanstore/operation_names_test.go +++ b/plugin/storage/cassandra/spanstore/operation_names_test.go @@ -39,12 +39,16 @@ type operationNameStorageTest struct { storage *OperationNamesStorage } -func withOperationNamesStorage(writeCacheTTL time.Duration, schemaVersion string, fn func(s *operationNameStorageTest)) { +func withOperationNamesStorage(writeCacheTTL time.Duration, + schemaVersion schemaVersion, + fn func(s *operationNameStorageTest)) { + session := &mocks.Session{} logger, logBuffer := testutils.NewLogger() metricsFactory := metricstest.NewFactory(0) query := &mocks.Query{} - session.On("Query", fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName), mock.Anything).Return(query) + session.On("Query", + fmt.Sprintf(tableCheckStmt, schemas[latestVersion].tableName), mock.Anything).Return(query) if schemaVersion != latestVersion { query.On("Exec").Return(errors.New("new table does not exist")) } else { @@ -66,7 +70,7 @@ func TestOperationNamesStorageWrite(t *testing.T) { for _, test := range []struct { name string ttl time.Duration - schemaVersion string + schemaVersion schemaVersion }{ {name: "test old schema with 0 ttl", ttl: 0, schemaVersion: previousVersion}, {name: "test old schema with 1min ttl", ttl: time.Minute, schemaVersion: previousVersion}, @@ -74,74 +78,69 @@ func TestOperationNamesStorageWrite(t *testing.T) { {name: "test new schema with 1min ttl", ttl: time.Minute, schemaVersion: latestVersion}, } { fmt.Printf(test.name) - writeCacheTTL := test.ttl // capture loop var - withOperationNamesStorage(writeCacheTTL, test.schemaVersion, func(s *operationNameStorageTest) { - var execError = errors.New("exec error") - query := &mocks.Query{} - query1 := &mocks.Query{} - query2 := &mocks.Query{} - - if test.schemaVersion == previousVersion { - query.On("Bind", []interface{}{"service-a", "Operation-b"}).Return(query1) - query.On("Bind", []interface{}{"service-c", "operation-d"}).Return(query2) - } else { - query.On("Bind", []interface{}{"service-a", "", "Operation-b"}).Return(query1) - query.On("Bind", []interface{}{"service-c", "", "operation-d"}).Return(query2) - } - - query1.On("Exec").Return(nil) - query2.On("Exec").Return(execError) - query2.On("String").Return("select from " + schemas[test.schemaVersion].tableName) - - s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) - - err := s.storage.Write("service-a", "Operation-b") - assert.NoError(t, err) - - err = s.storage.Write("service-c", "operation-d") - assert.EqualError(t, err, "failed to Exec query 'select from "+schemas[test.schemaVersion].tableName+"': exec error") - assert.Equal(t, map[string]string{ - "level": "error", - "msg": "Failed to exec query", - "query": "select from " + schemas[test.schemaVersion].tableName, - "error": "exec error", - }, s.logBuffer.JSONLine(0)) - - counts, _ := s.metricsFactory.Snapshot() - assert.Equal(t, map[string]int64{ - "attempts|table=" + schemas[test.schemaVersion].tableName: 2, "inserts|table=" + schemas[test.schemaVersion].tableName: 1, "errors|table=" + schemas[test.schemaVersion].tableName: 1, - }, counts, "after first two writes") - - // write again - err = s.storage.Write("service-a", "Operation-b") - assert.NoError(t, err) - - counts2, _ := s.metricsFactory.Snapshot() - expCounts := counts - if writeCacheTTL == 0 { - // without write cache, the second write must succeed - expCounts["attempts|table="+schemas[test.schemaVersion].tableName]++ - expCounts["inserts|table="+schemas[test.schemaVersion].tableName]++ - } - assert.Equal(t, expCounts, counts2) + t.Run(fmt.Sprintf("%s", test.name), func(t *testing.T) { + withOperationNamesStorage(test.ttl, test.schemaVersion, func(s *operationNameStorageTest) { + var execError = errors.New("exec error") + query := &mocks.Query{} + query1 := &mocks.Query{} + query2 := &mocks.Query{} + + if test.schemaVersion == previousVersion { + query.On("Bind", []interface{}{"service-a", "Operation-b"}).Return(query1) + query.On("Bind", []interface{}{"service-c", "operation-d"}).Return(query2) + } else { + query.On("Bind", []interface{}{"service-a", "", "Operation-b"}).Return(query1) + query.On("Bind", []interface{}{"service-c", "", "operation-d"}).Return(query2) + } + + query1.On("Exec").Return(nil) + query2.On("Exec").Return(execError) + query2.On("String").Return("select from " + schemas[test.schemaVersion].tableName) + + s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) + + err := s.storage.Write("service-a", "Operation-b") + assert.NoError(t, err) + + err = s.storage.Write("service-c", "operation-d") + assert.EqualError(t, err, + "failed to Exec query 'select from "+schemas[test.schemaVersion].tableName+"': exec error") + assert.Equal(t, map[string]string{ + "level": "error", + "msg": "Failed to exec query", + "query": "select from " + schemas[test.schemaVersion].tableName, + "error": "exec error", + }, s.logBuffer.JSONLine(0)) + + counts, _ := s.metricsFactory.Snapshot() + assert.Equal(t, map[string]int64{ + "attempts|table=" + schemas[test.schemaVersion].tableName: 2, + "inserts|table=" + schemas[test.schemaVersion].tableName: 1, + "errors|table=" + schemas[test.schemaVersion].tableName: 1, + }, counts, "after first two writes") + + // write again + err = s.storage.Write("service-a", "Operation-b") + assert.NoError(t, err) + + counts2, _ := s.metricsFactory.Snapshot() + expCounts := counts + if test.ttl == 0 { + // without write cache, the second write must succeed + expCounts["attempts|table="+schemas[test.schemaVersion].tableName]++ + expCounts["inserts|table="+schemas[test.schemaVersion].tableName]++ + } + assert.Equal(t, expCounts, counts2) + }) }) } } func TestOperationNamesStorageGetServices(t *testing.T) { var scanError = errors.New("scan error") - var matched bool - matchOnce := mock.MatchedBy(func(v []interface{}) bool { - if matched { - return false - } - matched = true - return true - }) - matchEverything := mock.MatchedBy(func(v []interface{}) bool { return true }) for _, test := range []struct { name string - schemaVersion string + schemaVersion schemaVersion expErr error }{ {name: "test old schema without error", schemaVersion: previousVersion, expErr: nil}, @@ -149,30 +148,39 @@ func TestOperationNamesStorageGetServices(t *testing.T) { {name: "test new schema without error", schemaVersion: latestVersion, expErr: nil}, {name: "test new schema with scan error", schemaVersion: latestVersion, expErr: scanError}, } { - fmt.Printf(test.name) - withOperationNamesStorage(0, test.schemaVersion, func(s *operationNameStorageTest) { - iter := &mocks.Iterator{} - iter.On("Scan", matchOnce).Return(true) - iter.On("Scan", matchEverything).Return(false) // false to stop the loop - iter.On("Close").Return(test.expErr) - - query := &mocks.Query{} - query.On("Iter").Return(iter) - - s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) - services, err := s.storage.GetOperations("service-a") - if test.expErr == nil { - assert.NoError(t, err) - if test.schemaVersion == previousVersion { - // expect one empty operation result because mock iter.Scan(&placeholder) does not write to `placeholder` + t.Run(fmt.Sprintf("%s", test.name), func(t *testing.T) { + withOperationNamesStorage(0, test.schemaVersion, func(s *operationNameStorageTest) { + var matched bool + matchOnce := mock.MatchedBy(func(v []interface{}) bool { + if matched { + return false + } + matched = true + return true + }) + matchEverything := mock.MatchedBy(func(v []interface{}) bool { return true }) + + iter := &mocks.Iterator{} + iter.On("Scan", matchOnce).Return(true) + iter.On("Scan", matchEverything).Return(false) // false to stop the loop + iter.On("Close").Return(test.expErr) + + query := &mocks.Query{} + query.On("Iter").Return(iter) + + s.session.On("Query", mock.AnythingOfType("string"), mock.Anything).Return(query) + services, err := s.storage.GetOperations("service-a") + if test.expErr == nil { + assert.NoError(t, err) + // expect empty string because mock iter.Scan(&placeholder) does not write to `placeholder` assert.Equal(t, []string{""}, services) } else { - assert.Equal(t, []string{}, services) + assert.EqualError(t, err, + fmt.Sprintf("Error reading %s from storage: %s", + schemas[test.schemaVersion].tableName, + test.expErr.Error())) } - } else { - assert.EqualError(t, err, fmt.Sprintf("Error reading %s from storage: %s", schemas[test.schemaVersion].tableName, test.expErr.Error())) - } + }) }) } - } From 7e1880667e8d7bbb8711803fb7424fe1335c96b2 Mon Sep 17 00:00:00 2001 From: Jun Guo Date: Fri, 22 Nov 2019 14:11:14 -0500 Subject: [PATCH 4/4] Fix travis build error Signed-off-by: Jun Guo --- plugin/storage/cassandra/spanstore/operation_names_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/plugin/storage/cassandra/spanstore/operation_names_test.go b/plugin/storage/cassandra/spanstore/operation_names_test.go index 920831f4586..91b0e812e2e 100644 --- a/plugin/storage/cassandra/spanstore/operation_names_test.go +++ b/plugin/storage/cassandra/spanstore/operation_names_test.go @@ -77,8 +77,7 @@ func TestOperationNamesStorageWrite(t *testing.T) { {name: "test new schema with 0 ttl", ttl: 0, schemaVersion: latestVersion}, {name: "test new schema with 1min ttl", ttl: time.Minute, schemaVersion: latestVersion}, } { - fmt.Printf(test.name) - t.Run(fmt.Sprintf("%s", test.name), func(t *testing.T) { + t.Run(test.name, func(t *testing.T) { withOperationNamesStorage(test.ttl, test.schemaVersion, func(s *operationNameStorageTest) { var execError = errors.New("exec error") query := &mocks.Query{} @@ -148,7 +147,7 @@ func TestOperationNamesStorageGetServices(t *testing.T) { {name: "test new schema without error", schemaVersion: latestVersion, expErr: nil}, {name: "test new schema with scan error", schemaVersion: latestVersion, expErr: scanError}, } { - t.Run(fmt.Sprintf("%s", test.name), func(t *testing.T) { + t.Run(test.name, func(t *testing.T) { withOperationNamesStorage(0, test.schemaVersion, func(s *operationNameStorageTest) { var matched bool matchOnce := mock.MatchedBy(func(v []interface{}) bool {