Skip to content

Commit

Permalink
Prepared insert (#30)
Browse files Browse the repository at this point in the history
Prepared inserts
  • Loading branch information
kleineshertz authored May 10, 2023
1 parent 15b2208 commit 7087c4c
Show file tree
Hide file tree
Showing 20 changed files with 256 additions and 74 deletions.
2 changes: 2 additions & 0 deletions .gitattributes
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
test/data/cfg/py_calc_quicktest/py/* linguist-documentation
test/data/cfg/portfolio_quicktest/py/* linguist-documentation
2 changes: 1 addition & 1 deletion .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ jobs:
- name: pkg/cql test coverage threshold
env:
TESTCOVERAGE_THRESHOLD: 50.79
TESTCOVERAGE_THRESHOLD: 44.09
run: |
echo "Quality Gate: checking test coverage is above threshold $TESTCOVERAGE_THRESHOLD %..."
go test -v ./pkg/cql/... -coverprofile coverage.out -covermode count
Expand Down
2 changes: 1 addition & 1 deletion .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@
"mode": "debug",
"cwd":"${workspaceFolder}/pkg/exe/toolbelt",
"program": "${workspaceFolder}/pkg/exe/toolbelt/capitoolbelt.go",
"args": ["exec_node","-script_file=/tmp/capi_cfg/portfolio_quicktest/script.json", "-params_file=/tmp/capi_cfg/portfolio_quicktest/script_params.json", "-keyspace=portfolio_quicktest", "-node_id=calc_account_period_perf"]
"args": ["exec_node","-script_file=/tmp/capi_cfg/lookup_quicktest/script.json", "-params_file=/tmp/capi_cfg/lookup_quicktest/script_params_one_run.json", "-keyspace=lookup_quicktest", "-node_id=read_orders"]
},
{
"name": "Webapi",
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ docker-compose -p "test_capillaries_containers" up

Wait until all containers are started and Cassandra is fully initialized (it will log something like `Created default superuser role 'cassandra'`). Now Capillaries is ready to process data.

Navigate to `http://localhost:8080`, click "New run" and start a new data processing run with the following parameters:
Navigate to `http://localhost:8080`, click "New run" and start a new data processing run with the following parameters (no tabs or spaces allowed):

| Field | Value |
|- | - |
Expand Down
4 changes: 2 additions & 2 deletions doc/binconfig.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,10 @@ As is, passed to gocql.PasswordAuthenticator
The string passed to `CREATE KEYSPACE IF NOT EXISTS <keyspace_name> WITH REPLICATION = ...` when a [keyspace](glossary.md#keyspace) is created

### writer_workers
Capillaries processors that write to [data tables](glossary.md#data-table) produce data at a rate much higher than a single-thread code writing to Cassandra can handle. Capillaries inserts into data and index from multiple threads, and the number of those threads is specified here. 10-20 threads may be considered conservative, 50 threads is more aggressive. Choose these settings according to your hardware environment specifics.
Capillaries processors that write to [data tables](glossary.md#data-table) produce data at a rate much higher than a single-thread code writing to Cassandra can handle. Capillaries inserts into data and index from multiple threads, and the number of those threads is specified here. 10 threads may be considered conservative, 50 threads is aggressive. Choose these settings according to your hardware environment specifics.

### min_inserter_rate
If average number of records all writer workers used by a single worker thread from the thread pool (see [thread_pool_size](#thread_pool_size)) falls below this value, chances are that Cassandra cluster performance has degraded substantially, and the user will wait for the results for too long. In this case, table inserter throws an error and the [batch](./glossary.md#data-batch) is marked as failed.
If average number of records written per second by all writer workers used by a single worker thread from the thread pool (see [thread_pool_size](#thread_pool_size)) falls below this value, chances are that Cassandra cluster performance has degraded substantially, and the user will wait for the results for too long. In this case, table inserter throws an error and the [batch](./glossary.md#data-batch) is marked as failed.

### num_conns
Passed to gocql.ClusterConfig.NumConns
Expand Down
22 changes: 12 additions & 10 deletions doc/qna.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,9 +75,9 @@ Yes. See Capillaries [100% Docker-based demo](started.md#run-100-dockerized-capi

Q. Can I run Capillaries against cloud-based Cassandra?

A. While processing [nodes](glossary.md#script-node) that create [tables](glossary.md#table), Capillaries creates [keyspaces](glossary.md#keyspace) and [tables](glossary.md#table) on-the-fly as a [processor](glossary.md#processor) handles the node. As of this writing (2022), Azure CosmosDB and AWS Keyspaces have notoriously high latency. For example, Azure can complete "CREATE TABLE" command successfully, but an "INSERT" command executed immediately after that may return an error saying that the table does not exist.
A. As of this writing (2022), Azure CosmosDB and AWS Keyspaces have notoriously high latency. For example, Azure can complete "CREATE TABLE" command successfully, but an "INSERT" command executed immediately after that may return an error saying that the table does not exist.

This situation can be potentially mitigated by creating all tables for a specific [run](glossary.md#run) in advance. A [toolbelt](glossary.md#toolbelt) command producing CQL statements that creates all tables for a [run](glossary.md#run) may look like this:
This situation can be mitigated to some extent by creating all tables for a specific [run](glossary.md#run) in advance and verifying that all tables are in place. A [toolbelt](glossary.md#toolbelt) command producing CQL statements that creates all tables for a [run](glossary.md#run) may look like this:

```
go run capitoolbelt.go get_table_cql -script_file=... -params_file=... -keyspace=... -run_id=... -start_nodes=...
Expand All @@ -91,20 +91,22 @@ Bottom line: Capillaries' use of cloud-based Cassandra is questionable at the mo

## What's next?

Q. What are the potential directions to improve Capillaries?
Q. What are the potential directions for improvement?

A. Here are some, in no particular order:

1. Read/write from/to other file formats, maybe databases.
1. Performance enhancements, espcecially those related to the efficient use of Cassandra.

2. Creating node configuration is a tedious job. Consider adding a toolbelt command that takes a CSV file as an input and generates JSON for a corresponding file_table/table_file node.
2. Read/write from/to other file formats, maybe databases.

3. Is the lack of NULL support a deal-breaker?
3. Creating node configuration is a tedious job. Consider adding a toolbelt command that takes a CSV file as an input and generates JSON for a corresponding file_table/table_file node.

4. Need a strategy to mitigate potential security threats introduced by py_calc. SELinux/AppArmor?
4. Is the lack of NULL support a deal-breaker?

5. Keep an eye on Azure/AWS/GCP progress with Cassandra-compatible databases (latency!) and RabbitMQ offerings.
5. Need a strategy to mitigate potential security threats introduced by py_calc. SELinux/AppArmor?

6. Select distinct field values from a table: it can be implemented easily using a set, but it will not scale and it will be limited by the size of the map. Alternatively, it can be implemented using Cassandra features, but it will require Capillaries to support tables without [rowid](glossary.md#rowid) (so the unique values are stored in a partitioning key field).
6. Keep an eye on Azure/AWS/GCP progress with Cassandra-compatible databases (latency!) and RabbitMQ offerings.

7. Keep adding support for Go library functions
7. Select distinct field values from a table: it can be implemented easily using a set, but it will not scale and it will be limited by the size of the map. Alternatively, it can be implemented using Cassandra features, but it will require Capillaries to support tables without [rowid](glossary.md#rowid) (so the unique values are stored in a partitioning key field).

8. Keep adding support for Go library functions
2 changes: 1 addition & 1 deletion pkg/api/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func GetTablesCql(script *sc.ScriptDef, keyspace string, runId int16, startNodeN
sb.WriteString(fmt.Sprintf("%s\n", wfmodel.GetCreateTableCql(reflect.TypeOf(wfmodel.RunProperties{}), keyspace, wfmodel.TableNameRunAffectedNodes)))
sb.WriteString(fmt.Sprintf("%s\n", wfmodel.GetCreateTableCql(reflect.TypeOf(wfmodel.RunCounter{}), keyspace, wfmodel.TableNameRunCounter)))
qb := cql.QueryBuilder{}
sb.WriteString(fmt.Sprintf("%s\n", qb.Keyspace(keyspace).Write("ks", keyspace).Write("last_run", 0).Insert(wfmodel.TableNameRunCounter, cql.IgnoreIfExists)))
sb.WriteString(fmt.Sprintf("%s\n", qb.Keyspace(keyspace).Write("ks", keyspace).Write("last_run", 0).InsertUnpreparedQuery(wfmodel.TableNameRunCounter, cql.IgnoreIfExists)))

for _, nodeName := range script.GetAffectedNodes(startNodeNames) {
node, ok := script.ScriptNodes[nodeName]
Expand Down
92 changes: 89 additions & 3 deletions pkg/cql/cql_query_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package cql

import (
"fmt"
"math"
"strings"
"time"

"github.com/capillariesio/capillaries/pkg/sc"
"github.com/shopspring/decimal"
"gopkg.in/inf.v0"
)

type IfNotExistsType int
Expand Down Expand Up @@ -55,6 +58,17 @@ func valueToString(value interface{}, quotePolicy QuotePolicyType) string {
}
}

func valueToCqlParam(value interface{}) interface{} {
switch v := value.(type) {
case decimal.Decimal:
f, _ := v.Float64()
scaled := int64(math.Round(f * 100))
return inf.NewDec(scaled, 2)
default:
return v
}
}

type queryBuilderColumnDefs struct {
Columns [256]string
Types [256]string
Expand Down Expand Up @@ -82,6 +96,32 @@ func (cd *queryBuilderColumnDefs) add(column string, fieldType sc.TableFieldType
cd.Len++
}

type queryBuilderPreparedColumnData struct {
Columns [256]string
Values [256]interface{}
ColumnIdxMap map[string]int
ValueIdxMap map[string]int
}

func (cd *queryBuilderPreparedColumnData) addColumnName(column string) error {
if _, ok := cd.ColumnIdxMap[column]; ok {
return fmt.Errorf("cannot add same column %s to a prepared query twice: %v", column, cd.Columns)
}
curColCount := len(cd.ColumnIdxMap)
cd.Columns[curColCount] = column
cd.ColumnIdxMap[column] = curColCount
return nil
}
func (cd *queryBuilderPreparedColumnData) addColumnValue(column string, value interface{}) error {
colIdx, ok := cd.ColumnIdxMap[column]
if !ok {
return fmt.Errorf("cannot set value for non-prepared column %s, available columns are %v", column, cd.Columns)
}
cd.Values[colIdx] = valueToCqlParam(value)
cd.ValueIdxMap[column] = colIdx
return nil
}

type queryBuilderColumnData struct {
Columns [256]string
Values [256]string
Expand Down Expand Up @@ -144,13 +184,21 @@ type QueryBuilder struct {
PartitionKeyColumns []string
ClusteringKeyColumns []string
ColumnData queryBuilderColumnData
PreparedColumnData queryBuilderPreparedColumnData
Conditions queryBuilderConditions
IfConditions queryBuilderConditions
SelectLimit int
FormattedKeyspace string
OrderByColumns []string
}

func NewQB() *QueryBuilder {
var qb QueryBuilder
qb.PreparedColumnData.ColumnIdxMap = map[string]int{}
qb.PreparedColumnData.ValueIdxMap = map[string]int{}
return &qb
}

func (qb *QueryBuilder) ColumnDef(column string, fieldType sc.TableFieldType) *QueryBuilder {
qb.ColumnDefs.add(column, fieldType)
return qb
Expand Down Expand Up @@ -192,6 +240,14 @@ func (qb *QueryBuilder) Write(column string, value interface{}) *QueryBuilder {
return qb
}

func (qb *QueryBuilder) WritePreparedColumn(column string) error {
return qb.PreparedColumnData.addColumnName(column)
}

func (qb *QueryBuilder) WritePreparedValue(column string, value interface{}) error {
return qb.PreparedColumnData.addColumnValue(column, value)
}

/*
WriteForceUnquote - add a column for INSERT or UPDATE
*/
Expand Down Expand Up @@ -246,10 +302,10 @@ Insert - build INSERT query
*/
const RunIdForEmptyRun = -1

func (qb *QueryBuilder) Insert(tableName string, ifNotExists IfNotExistsType) string {
return qb.InsertRun(tableName, RunIdForEmptyRun, ifNotExists)
func (qb *QueryBuilder) InsertUnpreparedQuery(tableName string, ifNotExists IfNotExistsType) string {
return qb.insertRunUnpreparedQuery(tableName, RunIdForEmptyRun, ifNotExists)
}
func (qb *QueryBuilder) InsertRun(tableName string, runId int16, ifNotExists IfNotExistsType) string {
func (qb *QueryBuilder) insertRunUnpreparedQuery(tableName string, runId int16, ifNotExists IfNotExistsType) string {
ifNotExistsStr := ""
if ifNotExists == IgnoreIfExists {
ifNotExistsStr = "IF NOT EXISTS"
Expand All @@ -267,6 +323,36 @@ func (qb *QueryBuilder) InsertRun(tableName string, runId int16, ifNotExists IfN
return q
}

func (qb *QueryBuilder) InsertRunPreparedQuery(tableName string, runId int16, ifNotExists IfNotExistsType) (string, error) {
ifNotExistsStr := ""
if ifNotExists == IgnoreIfExists {
ifNotExistsStr = "IF NOT EXISTS"
}
columnCount := len(qb.PreparedColumnData.ColumnIdxMap)
paramArray := make([]string, columnCount)
for paramIdx := 0; paramIdx < columnCount; paramIdx++ {
paramArray[paramIdx] = "?"
}
q := fmt.Sprintf("INSERT INTO %s%s%s ( %s ) VALUES ( %s ) %s;",
qb.FormattedKeyspace,
tableName,
RunIdSuffix(runId),
strings.Join(qb.PreparedColumnData.Columns[:columnCount], ", "),
strings.Join(paramArray, ", "),
ifNotExistsStr)
if runId == 0 {
return "", fmt.Errorf("invalid runId=0 in %s", q)
}
return q, nil
}

func (qb *QueryBuilder) InsertRunParams() ([]interface{}, error) {
if len(qb.PreparedColumnData.ColumnIdxMap) != len(qb.PreparedColumnData.ValueIdxMap) {
return nil, fmt.Errorf("cannot produce insert params, length mismatch: columns %v, values %v", qb.PreparedColumnData.ColumnIdxMap, qb.PreparedColumnData.ValueIdxMap)
}
return qb.PreparedColumnData.Values[:len(qb.PreparedColumnData.ValueIdxMap)], nil
}

/*
Select - build SELECT query
*/
Expand Down
28 changes: 27 additions & 1 deletion pkg/cql/cql_query_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,42 @@ import (
"testing"

"github.com/capillariesio/capillaries/pkg/sc"
"github.com/shopspring/decimal"
"gopkg.in/inf.v0"
)

func TestValueToCqlParam(t *testing.T) {
// Simple
expected := "1.23"
actual := valueToCqlParam(decimal.NewFromFloat(1.23)).(*inf.Dec).String()
if actual != expected {
t.Errorf("Unmatch:\n%v\n%v\n", expected, actual)
}

// big round up
expected = "1.24"
actual = valueToCqlParam(decimal.NewFromFloat(1.235)).(*inf.Dec).String()
if actual != expected {
t.Errorf("Unmatch:\n%v\n%v\n", expected, actual)
}

// small round down
expected = "0.03"
actual = valueToCqlParam(decimal.NewFromFloat(0.0345)).(*inf.Dec).String()
if actual != expected {
t.Errorf("Unmatch:\n%v\n%v\n", expected, actual)
}

}

func TestInsert(t *testing.T) {
const q = "INSERT INTO table1_00123 ( col1, col2, col3 ) VALUES ( 'val1', 2, now() ) IF NOT EXISTS;"
qb := QueryBuilder{}
s := qb.
Write("col1", "val1").
Write("col2", 2).
WriteForceUnquote("col3", "now()").
InsertRun("table1", 123, IgnoreIfExists)
insertRunUnpreparedQuery("table1", 123, IgnoreIfExists)
if s != q {
t.Errorf("Unmatch:\n%v\n%v\n", q, s)
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/cql/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,9 @@ func NewSession(envConfig *env.EnvConfig, keyspace string, createKeyspace Create
dataCluster.NumConns = envConfig.Cassandra.NumConns
dataCluster.Timeout = time.Duration(envConfig.Cassandra.Timeout * int(time.Millisecond))
dataCluster.ConnectTimeout = time.Duration(envConfig.Cassandra.ConnectTimeout * int(time.Millisecond))
dataCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy()) // TODO: consider making it configurable
// Token-aware policy should give better perf results when used together with prepared queries, and Capillaries chatty inserts are killing Cassandra.
// TODO: consider making it configurable
dataCluster.PoolConfig.HostSelectionPolicy = gocql.TokenAwareHostPolicy(gocql.RoundRobinHostPolicy())
// When testing, we load Cassandra cluster at 100%. There will be "Operation timed out - received only 0 responses" errors.
// It's up to admins how to handle the load, but we should not give up quickly in any case. Make it 3 attempts.
dataCluster.RetryPolicy = &gocql.SimpleRetryPolicy{NumRetries: 3}
Expand Down Expand Up @@ -93,7 +95,7 @@ func NewSession(envConfig *env.EnvConfig, keyspace string, createKeyspace Create
Keyspace(keyspace).
Write("ks", keyspace).
Write("last_run", 0)
q := qb.Insert(wfmodel.TableNameRunCounter, IgnoreIfExists) // If not exists. Insert only once.
q := qb.InsertUnpreparedQuery(wfmodel.TableNameRunCounter, IgnoreIfExists) // If not exists. Insert only once.
err = cqlSession.Query(q).Exec()
if err != nil {
return nil, WrapDbErrorWithQuery("cannot initialize run counter", q, err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/exe/daemon/capidaemon.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"username": "cassandra",
"password": "cassandra",
"keyspace_replication_config": "{ 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 }",
"writer_workers": 50,
"writer_workers": 20,
"min_inserter_rate": 10,
"num_conns": 2,
"timeout": 5000,
Expand Down
2 changes: 1 addition & 1 deletion pkg/exe/toolbelt/capitoolbelt.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"username": "cassandra",
"password": "cassandra",
"keyspace_replication_config": "{ 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 }",
"writer_workers": 50,
"writer_workers": 20,
"min_inserter_rate": 10,
"num_conns": 2,
"timeout": 5000,
Expand Down
2 changes: 1 addition & 1 deletion pkg/exe/webapi/capiwebapi.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"username": "cassandra",
"password": "cassandra",
"keyspace_replication_config": "{ 'class' : 'NetworkTopologyStrategy', 'datacenter1' : 1 }",
"writer_workers": 50,
"writer_workers": 20,
"min_inserter_rate": 10,
"num_conns": 2,
"timeout": 5000,
Expand Down
Loading

0 comments on commit 7087c4c

Please sign in to comment.