Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create Cassandra db schema on session initialization #5922

Merged
Merged
Changes from 41 commits
Commits
Show all changes
57 commits
Select commit Hold shift + click to select a range
5041f31
Embeded template file in binary and added query construction and exec…
akstron Oct 28, 2024
30db170
Removed unnecessary SchemaConfig struct
akstron Oct 28, 2024
ce0c375
Added new schema configs in default config generator
akstron Oct 28, 2024
810ab1c
Revert Keyspace removal
akstron Oct 28, 2024
0d6383f
Bug fix while creating queries
akstron Oct 29, 2024
207945f
Improving test
akstron Oct 29, 2024
985f65b
Created new struct for derived params
akstron Oct 29, 2024
1c30503
Remove fields from yaml file
akstron Oct 29, 2024
e4ab709
Added integration test
akstron Nov 19, 2024
c329bba
Rebase fixes
akstron Nov 19, 2024
e3c6045
Minor changes in integration script
akstron Nov 19, 2024
492e15e
removed test
akstron Nov 19, 2024
44c39dc
Updated fields with time.Duration type and added validators and tests
akstron Nov 20, 2024
dfc0c43
minor changes in script
akstron Nov 20, 2024
cb8ae19
Addressed comments
akstron Nov 20, 2024
c3d0fbd
Addressed comments
akstron Nov 21, 2024
728a139
Update pkg/cassandra/config/schema.go
akstron Nov 21, 2024
1b6683d
Update pkg/cassandra/config/config.go
akstron Nov 21, 2024
ce11cc1
Addressed comments
akstron Nov 21, 2024
de1c563
Removed unused CasVersion
akstron Nov 21, 2024
edabe22
Addressed validation comments
akstron Nov 22, 2024
d0e1976
Created helper function for session created and updated tests
akstron Nov 26, 2024
d8479b5
Added schema unit tests
akstron Nov 26, 2024
02b6159
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
73d276a
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
57349a8
Update pkg/cassandra/config/config.go
akstron Nov 26, 2024
84b52e1
Fixed build
akstron Nov 26, 2024
9c2f05b
formatting fixes
akstron Nov 26, 2024
2c8de88
test fix
akstron Nov 27, 2024
eeb1951
Added test in workflow
akstron Nov 28, 2024
601365d
fmt fixes
akstron Nov 28, 2024
4aeaad7
create schema bug fix
akstron Nov 28, 2024
0167cb6
exclude v1 run with skip-apply-schema as true
akstron Nov 28, 2024
07b99da
Added schemaCreator and comments in workflow
akstron Nov 28, 2024
04ec76f
ci changes
akstron Nov 28, 2024
7246494
made template params private
akstron Nov 28, 2024
d8613f1
workflow fix
akstron Nov 28, 2024
a7853ec
Changed env variable name
akstron Nov 29, 2024
10bd9aa
lint fixes
akstron Nov 29, 2024
a682db2
lint fix
akstron Nov 29, 2024
35c26b5
test fix
akstron Nov 29, 2024
ddf4fc0
Workflow and test minor changes
akstron Nov 29, 2024
bbf3ac8
test fix
akstron Nov 30, 2024
a8e3aae
workflow changes
akstron Nov 30, 2024
a46bf37
Merge branch 'main' into create-database-scheme-cassandra
akstron Nov 30, 2024
c2b89e0
Apply suggestions from code review
yurishkuro Nov 30, 2024
de2d2ef
Update docs
yurishkuro Nov 30, 2024
100208c
refactor
yurishkuro Nov 30, 2024
37d34dd
clean-up imports
yurishkuro Nov 30, 2024
8d65894
clean-up
yurishkuro Nov 30, 2024
23a9b62
simplify
yurishkuro Nov 30, 2024
d925074
fix
yurishkuro Nov 30, 2024
154deb9
Fix workflow
yurishkuro Nov 30, 2024
a238455
Merge branch 'main' into create-database-scheme-cassandra
yurishkuro Nov 30, 2024
cc9db9c
rename
yurishkuro Nov 30, 2024
9d8ba06
Fix script
yurishkuro Nov 30, 2024
b0ac38a
fix naming for code coverage
yurishkuro Nov 30, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion .github/workflows/ci-e2e-cassandra.yml
Original file line number Diff line number Diff line change
@@ -22,14 +22,19 @@ jobs:
fail-fast: false
matrix:
jaeger-version: [v1, v2]
skip-apply-schema: [true, false]
version:
- distribution: cassandra
major: 4.x
schema: v004
- distribution: cassandra
major: 5.x
schema: v004
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }}
exclude:
# Exclude v1 as create schema on fly is available for v2 only
- jaeger-version: v1
skip-apply-schema: true
name: ${{ matrix.version.distribution }} ${{ matrix.version.major }} ${{ matrix.jaeger-version }} ${{ matrix.skip-apply-schema == true && 'auto' || 'manual' }}
steps:
- name: Harden Runner
uses: step-security/harden-runner@91182cccc01eb5e619899d80e4e971d6181294a7 # v2.10.1
@@ -45,6 +50,8 @@ jobs:
- name: Run cassandra integration tests
id: test-execution
run: bash scripts/cassandra-integration-test.sh ${{ matrix.version.major }} ${{ matrix.version.schema }} ${{ matrix.jaeger-version }}
env:
CASSANDRA_CREATE_SCHEMA: ${{ matrix.skip-apply-schema }}

- name: Upload coverage to codecov
uses: ./.github/actions/upload-codecov
4 changes: 3 additions & 1 deletion cmd/jaeger/config-cassandra.yaml
Original file line number Diff line number Diff line change
@@ -34,6 +34,7 @@ extensions:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
create: "${env:CASSANDRA_CREATE_SCHEMA:-false}"
connection:
auth:
basic:
@@ -44,7 +45,8 @@ extensions:
another_storage:
cassandra:
schema:
keyspace: "jaeger_v1_dc1"
keyspace: "jaeger_v1_dc1_archive"
create: "${env:CASSANDRA_CREATE_SCHEMA:-false}"
connection:
auth:
basic:
1 change: 1 addition & 0 deletions cmd/jaeger/internal/integration/cassandra_test.go
Original file line number Diff line number Diff line change
@@ -11,6 +11,7 @@ import (

func TestCassandraStorage(t *testing.T) {
integration.SkipUnlessEnv(t, "cassandra")

s := &E2EStorageIntegration{
ConfigFile: "../../config-cassandra.yaml",
StorageIntegration: integration.StorageIntegration{
101 changes: 97 additions & 4 deletions pkg/cassandra/config/config.go
Original file line number Diff line number Diff line change
@@ -6,6 +6,7 @@

import (
"context"
"errors"
"fmt"
"time"

@@ -58,6 +59,18 @@
// while connecting to the Cassandra Cluster. This is useful for connecting to clusters, like Azure Cosmos DB,
// that do not support SnappyCompression.
DisableCompression bool `mapstructure:"disable_compression"`
// CreateSchema tells if the schema ahould be created during session initialization based on the configs provided
CreateSchema bool `mapstructure:"create" valid:"optional"`
// Datacenter is the name for network topology
Datacenter string `mapstructure:"datacenter" valid:"optional"`
// TraceTTL is Time To Live (TTL) for the trace data. Should at least be 1 second
TraceTTL time.Duration `mapstructure:"trace_ttl" valid:"optional"`
// DependenciesTTL is Time To Live (TTL) for dependencies data. Should at least be 1 second
DependenciesTTL time.Duration `mapstructure:"dependencies_ttl" valid:"optional"`
// Replication factor for the db
ReplicationFactor int `mapstructure:"replication_factor" valid:"optional"`
// CompactionWindow of format tells the compaction window of the db. Should atleast be 1 minute
CompactionWindow time.Duration `mapstructure:"compaction_window" valid:"optional"`
}

type Query struct {
@@ -86,7 +99,13 @@
func DefaultConfiguration() Configuration {
return Configuration{
Schema: Schema{
Keyspace: "jaeger_v1_test",
CreateSchema: false,
Keyspace: "jaeger_dc1",
Datacenter: "dc1",
TraceTTL: 2 * 24 * time.Hour,
DependenciesTTL: 2 * 24 * time.Hour,
ReplicationFactor: 1,
CompactionWindow: time.Minute,
},
Connection: Connection{
Servers: []string{"127.0.0.1"},
@@ -106,6 +125,27 @@
if c.Schema.Keyspace == "" {
c.Schema.Keyspace = source.Schema.Keyspace
}

if c.Schema.Datacenter == "" {
c.Schema.Datacenter = source.Schema.Datacenter
}

if c.Schema.TraceTTL == 0 {
c.Schema.TraceTTL = source.Schema.TraceTTL
}

if c.Schema.DependenciesTTL == 0 {
c.Schema.DependenciesTTL = source.Schema.DependenciesTTL
}

if c.Schema.ReplicationFactor == 0 {
c.Schema.ReplicationFactor = source.Schema.ReplicationFactor
}

if c.Schema.CompactionWindow == 0 {
c.Schema.CompactionWindow = source.Schema.CompactionWindow
}

if c.Connection.ConnectionsPerHost == 0 {
c.Connection.ConnectionsPerHost = source.Connection.ConnectionsPerHost
}
@@ -134,19 +174,52 @@
NewSession() (cassandra.Session, error)
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession() (cassandra.Session, error) {
// createSession creates session from a configuration
func createSession(c *Configuration) (cassandra.Session, error) {
cluster, err := c.NewCluster()
if err != nil {
return nil, err
}

session, err := cluster.CreateSession()
if err != nil {
return nil, err
}

return gocqlw.WrapCQLSession(session), nil
}

// newSessionPrerequisites creates tables and types before creating a session
func (c *Configuration) newSessionPrerequisites() error {
if !c.Schema.CreateSchema {
return nil
}

cfg := *c // clone because we need to connect without specifying a keyspace
cfg.Schema.Keyspace = ""

session, err := createSession(&cfg)
if err != nil {
return err
}

Check warning on line 204 in pkg/cassandra/config/config.go

Codecov / codecov/patch

pkg/cassandra/config/config.go#L198-L204

Added lines #L198 - L204 were not covered by tests

sc := schemaCreator{
session: session,
schema: c.Schema,
}

return sc.createSchemaIfNotPresent()

Check warning on line 211 in pkg/cassandra/config/config.go

Codecov / codecov/patch

pkg/cassandra/config/config.go#L206-L211

Added lines #L206 - L211 were not covered by tests
}

// NewSession creates a new Cassandra session
func (c *Configuration) NewSession() (cassandra.Session, error) {
if err := c.newSessionPrerequisites(); err != nil {
return nil, err
}

Check warning on line 218 in pkg/cassandra/config/config.go

Codecov / codecov/patch

pkg/cassandra/config/config.go#L217-L218

Added lines #L217 - L218 were not covered by tests

return createSession(c)
}

// NewCluster creates a new gocql cluster from the configuration
func (c *Configuration) NewCluster() (*gocql.ClusterConfig, error) {
cluster := gocql.NewCluster(c.Connection.Servers...)
@@ -210,7 +283,27 @@
return fmt.Sprintf("%+v", *c)
}

func isValidTTL(duration time.Duration) bool {
return duration == 0 || duration >= time.Second
}

func (c *Configuration) Validate() error {
_, err := govalidator.ValidateStruct(c)
return err
if err != nil {
return err
}

if !isValidTTL(c.Schema.TraceTTL) {
return errors.New("trace_ttl can either be 0 or greater than or equal to 1 second")
}

if !isValidTTL(c.Schema.DependenciesTTL) {
return errors.New("dependencies_ttl can either be 0 or greater than or equal to 1 second")
}

Check warning on line 302 in pkg/cassandra/config/config.go

Codecov / codecov/patch

pkg/cassandra/config/config.go#L301-L302

Added lines #L301 - L302 were not covered by tests

if c.Schema.CompactionWindow < time.Minute {
return errors.New("compaction_window should at least be 1 minute")
}

return nil
}
19 changes: 19 additions & 0 deletions pkg/cassandra/config/config_test.go
Original file line number Diff line number Diff line change
@@ -5,6 +5,7 @@ package config

import (
"testing"
"time"

"github.com/gocql/gocql"
"github.com/stretchr/testify/assert"
@@ -43,6 +44,9 @@ func TestValidate_DoesNotReturnErrorWhenRequiredFieldsSet(t *testing.T) {
Connection: Connection{
Servers: []string{"localhost:9200"},
},
Schema: Schema{
CompactionWindow: time.Minute,
},
}

err := cfg.Validate()
@@ -94,3 +98,18 @@ func TestToString(t *testing.T) {
s := cfg.String()
assert.Contains(t, s, "Keyspace:test")
}

func TestConfigSchemaValidation(t *testing.T) {
cfg := DefaultConfiguration()
err := cfg.Validate()
require.NoError(t, err)

cfg.Schema.TraceTTL = time.Millisecond
err = cfg.Validate()
require.Error(t, err)

cfg.Schema.TraceTTL = time.Second
cfg.Schema.CompactionWindow = time.Minute - 1
err = cfg.Validate()
require.Error(t, err)
}
145 changes: 145 additions & 0 deletions pkg/cassandra/config/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright (c) 2024 The Jaeger Authors.
// SPDX-License-Identifier: Apache-2.0

package config

import (
"bytes"
"embed"
"errors"
"fmt"
"text/template"
"time"

"github.com/jaegertracing/jaeger/pkg/cassandra"
)

//go:embed v004-go-tmpl.cql.tmpl
var schemaFile embed.FS

type templateParams struct {
// Keyspace in which tables and types will be created for storage
Keyspace string
// Replication is the replication strategy used. Ex: "{'class': 'NetworkTopologyStrategy', 'replication_factor': '1' }"
Replication string
// CompactionWindowInMinutes is constructed from CompactionWindow for using in template
CompactionWindowInMinutes int64
// TraceTTLInSeconds is constructed from TraceTTL for using in template
TraceTTLInSeconds int64
// DependenciesTTLInSeconds is constructed from DependenciesTTL for using in template
DependenciesTTLInSeconds int64
}

type schemaCreator struct {
session cassandra.Session
schema Schema
}

func (sc *schemaCreator) constructTemplateParams() templateParams {
return templateParams{
Keyspace: sc.schema.Keyspace,
Replication: fmt.Sprintf("{'class': 'NetworkTopologyStrategy', 'replication_factor': '%v' }", sc.schema.ReplicationFactor),
CompactionWindowInMinutes: int64(sc.schema.CompactionWindow / time.Minute),
TraceTTLInSeconds: int64(sc.schema.TraceTTL / time.Second),
DependenciesTTLInSeconds: int64(sc.schema.DependenciesTTL / time.Second),
}

Check warning on line 45 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L38-L45

Added lines #L38 - L45 were not covered by tests
}

func (*schemaCreator) getQueryFileAsBytes(fileName string, params templateParams) ([]byte, error) {
tmpl, err := template.ParseFS(schemaFile, fileName)
if err != nil {
return nil, err
}

Check warning on line 52 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L48-L52

Added lines #L48 - L52 were not covered by tests

var result bytes.Buffer
err = tmpl.Execute(&result, params)
if err != nil {
return nil, err
}

Check warning on line 58 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L54-L58

Added lines #L54 - L58 were not covered by tests

return result.Bytes(), nil

Check warning on line 60 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L60

Added line #L60 was not covered by tests
}

func (*schemaCreator) getQueriesFromBytes(queryFile []byte) ([]string, error) {
lines := bytes.Split(queryFile, []byte("\n"))

var extractedLines [][]byte

for _, line := range lines {
// Remove any comments, if at the end of the line
commentIndex := bytes.Index(line, []byte(`--`))
if commentIndex != -1 {
// remove everything after comment
line = line[0:commentIndex]
}

trimmedLine := bytes.TrimSpace(line)

if len(trimmedLine) == 0 {
continue
}

extractedLines = append(extractedLines, trimmedLine)
}

var queries []string

// Construct individual queries strings
var queryString string
for _, line := range extractedLines {
queryString += string(line) + "\n"
if bytes.HasSuffix(line, []byte(";")) {
queries = append(queries, queryString)
queryString = ""
}
}

if len(queryString) > 0 {
return nil, errors.New(`invalid template`)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do it in generateSchemaIfNotPresent against casQueries, not here.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't quite get this. The logic here is that if queryString > 0, means that there is a query string which does not ends with ;, that's what I am validating here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok then please make the error message self-explanatory


return queries, nil
}

func (sc *schemaCreator) getCassandraQueriesFromQueryStrings(queries []string) []cassandra.Query {
var casQueries []cassandra.Query

for _, query := range queries {
casQueries = append(casQueries, sc.session.Query(query))
}

Check warning on line 109 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L104-L109

Added lines #L104 - L109 were not covered by tests

return casQueries

Check warning on line 111 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L111

Added line #L111 was not covered by tests
}

func (sc *schemaCreator) contructSchemaQueries() ([]cassandra.Query, error) {
params := sc.constructTemplateParams()

queryFile, err := sc.getQueryFileAsBytes(`v004-go-tmpl.cql.tmpl`, params)
if err != nil {
return nil, err
}

Check warning on line 120 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L114-L120

Added lines #L114 - L120 were not covered by tests

queryStrings, err := sc.getQueriesFromBytes(queryFile)
if err != nil {
return nil, err
}

Check warning on line 125 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L122-L125

Added lines #L122 - L125 were not covered by tests

casQueries := sc.getCassandraQueriesFromQueryStrings(queryStrings)

return casQueries, nil

Check warning on line 129 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L127-L129

Added lines #L127 - L129 were not covered by tests
}

func (sc *schemaCreator) createSchemaIfNotPresent() error {
casQueries, err := sc.contructSchemaQueries()
if err != nil {
return err
}

Check warning on line 136 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L132-L136

Added lines #L132 - L136 were not covered by tests

for _, query := range casQueries {
if err := query.Exec(); err != nil {
return err
}

Check warning on line 141 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L138-L141

Added lines #L138 - L141 were not covered by tests
}

return nil

Check warning on line 144 in pkg/cassandra/config/schema.go

Codecov / codecov/patch

pkg/cassandra/config/schema.go#L144

Added line #L144 was not covered by tests
}
Loading