Skip to content

Commit

Permalink
Portfolio bigtest (#43)
Browse files Browse the repository at this point in the history
Testing: portfolio bigtest fully functional; Cassandra nodes with ramdisk
  • Loading branch information
kleineshertz authored Aug 17, 2023
1 parent df9990c commit d9b1614
Show file tree
Hide file tree
Showing 54 changed files with 2,052 additions and 3,998 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: '1.20'

- name: Build
run: go build -v ./...
Expand Down Expand Up @@ -60,7 +60,7 @@ jobs:
- name: pkg/eval test coverage threshold
env:
TESTCOVERAGE_THRESHOLD: 80.89
TESTCOVERAGE_THRESHOLD: 80.99
run: |
echo "Quality Gate: checking test coverage is above threshold $TESTCOVERAGE_THRESHOLD %..."
go test -v ./pkg/eval/... -coverprofile coverage.out -covermode count
Expand Down
9 changes: 9 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Generate portfolio bigtest data",
"type": "go",
"request": "launch",
"mode": "debug",
"cwd":"${workspaceFolder}/test/code/portfolio/bigtest",
"program": "${workspaceFolder}/test/code/portfolio/bigtest/generate_bigtest_data.go",
"args": [""]
},
{
"name": "Deploy create security group",
"type": "go",
Expand Down
2 changes: 1 addition & 1 deletion doc/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

cd into any directory under pkg/ and run
```
go test - v
go test -v
```

To see test code coverage:
Expand Down
6 changes: 6 additions & 0 deletions pkg/eval/eval_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ func (eCtx *EvalCtx) EvalFunc(callExp *ast.CallExpr, funcName string, args []int
switch funcName {
case "math.Sqrt":
eCtx.Value, err = callMathSqrt(args)
case "math.Round":
eCtx.Value, err = callMathRound(args)
case "len":
eCtx.Value, err = callLen(args)
case "string":
Expand All @@ -472,6 +474,10 @@ func (eCtx *EvalCtx) EvalFunc(callExp *ast.CallExpr, funcName string, args []int
eCtx.Value, err = callTimeUnixMilli(args)
case "time.DiffMilli":
eCtx.Value, err = callTimeDiffMilli(args)
case "time.Before":
eCtx.Value, err = callTimeBefore(args)
case "time.After":
eCtx.Value, err = callTimeAfter(args)
case "time.FixedZone":
eCtx.Value, err = callTimeFixedZone(args)
case "re.MatchString":
Expand Down
12 changes: 12 additions & 0 deletions pkg/eval/math.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,15 @@ func callMathSqrt(args []interface{}) (interface{}, error) {

return math.Sqrt(argFloat), nil
}

func callMathRound(args []interface{}) (interface{}, error) {
if err := checkArgs("math.Round", 1, len(args)); err != nil {
return nil, err
}
argFloat, err := castToFloat64(args[0])
if err != nil {
return nil, fmt.Errorf("cannot evaluate math.Round(), invalid args %v: [%s]", args, err.Error())
}

return math.Round(argFloat), nil
}
4 changes: 4 additions & 0 deletions pkg/eval/math_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ func TestMathFunctions(t *testing.T) {
assertEvalError(t, `math.Sqrt("aa")`, "cannot evaluate math.Sqrt(), invalid args [aa]: [cannot cast aa(string) to float64, unsuported type]", varValuesMap)
assertFloatNan(t, "math.Sqrt(-1)", varValuesMap)
assertEvalError(t, "math.Sqrt(123,567)", "cannot evaluate math.Sqrt(), requires 1 args, 2 supplied", varValuesMap)

assertEqual(t, "math.Round(5.1)", 5.0, varValuesMap)
assertEvalError(t, `math.Round("aa")`, "cannot evaluate math.Round(), invalid args [aa]: [cannot cast aa(string) to float64, unsuported type]", varValuesMap)
assertEvalError(t, "math.Round(5,1)", "cannot evaluate math.Round(), requires 1 args, 2 supplied", varValuesMap)
}
26 changes: 26 additions & 0 deletions pkg/eval/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,29 @@ func callTimeUnixMilli(args []interface{}) (interface{}, error) {

return arg0.UnixMilli(), nil
}

func callTimeBefore(args []interface{}) (interface{}, error) {
if err := checkArgs("time.Before", 2, len(args)); err != nil {
return nil, err
}
arg0, ok0 := args[0].(time.Time)
arg1, ok1 := args[1].(time.Time)
if !ok0 || !ok1 {
return nil, fmt.Errorf("cannot evaluate time.Before(), invalid args %v", args)
}

return arg0.Before(arg1), nil
}

func callTimeAfter(args []interface{}) (interface{}, error) {
if err := checkArgs("time.After", 2, len(args)); err != nil {
return nil, err
}
arg0, ok0 := args[0].(time.Time)
arg1, ok1 := args[1].(time.Time)
if !ok0 || !ok1 {
return nil, fmt.Errorf("cannot evaluate time.After(), invalid args %v", args)
}

return arg0.After(arg1), nil
}
7 changes: 7 additions & 0 deletions pkg/eval/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (

func TestTimeFunctions(t *testing.T) {
testTime := time.Date(2001, 1, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200))
testTimeUtc := time.Date(2001, 1, 1, 0, 0, 0, 0, time.UTC)
varValuesMap := VarValuesMap{
"t": map[string]interface{}{
"test_time": testTime}}
assertEqual(t, `time.Parse("2006-01-02T15:04:05.000-0700","2001-01-01T01:01:01.100-0200")`, testTime, varValuesMap)
assertEvalError(t, `time.Parse("2006-01-02T15:04:05.000-0700","2001-01-01T01:01:01.100-0200","aaa")`, "cannot evaluate time.Parse(), requires 2 args, 3 supplied", varValuesMap)
assertEvalError(t, `time.Parse("2006-01-02T15:04:05.000-0700",123)`, "cannot evaluate time.Parse(), invalid args [2006-01-02T15:04:05.000-0700 123]", varValuesMap)
assertEvalError(t, `time.Parse("2006-01-02T15:04:05.000-0700","2001-01-01T01:01:01")`, `parsing time "2001-01-01T01:01:01" as "2006-01-02T15:04:05.000-0700": cannot parse "" as ".000"`, varValuesMap)
assertEqual(t, `time.Parse("2006-01-02","2001-01-01")`, testTimeUtc, varValuesMap)

assertEqual(t, `time.Format(time.Date(2001, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), "2006-01-02T15:04:05.000-0700")`, testTime.Format("2006-01-02T15:04:05.000-0700"), varValuesMap)

Expand All @@ -29,6 +31,11 @@ func TestTimeFunctions(t *testing.T) {
assertEqual(t, `time.DiffMilli(time.Date(2002, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, int64(31536000000), varValuesMap)
assertEqual(t, `time.DiffMilli(time.Date(2000, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, int64(-31622400000), varValuesMap)

assertEqual(t, `time.Before(time.Date(2000, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, true, varValuesMap)
assertEqual(t, `time.After(time.Date(2002, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, true, varValuesMap)
assertEqual(t, `time.Before(time.Date(2002, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, false, varValuesMap)
assertEqual(t, `time.After(time.Date(2000, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, false, varValuesMap)

assertEqual(t, `time.Unix(t.test_time)`, testTime.Unix(), varValuesMap)

assertEqual(t, `time.UnixMilli(t.test_time)`, testTime.UnixMilli(), varValuesMap)
Expand Down
6 changes: 2 additions & 4 deletions pkg/exe/daemon/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
FROM golang:1.19
FROM golang:1.20

# These image lacks pip and dateutil
RUN apt update
RUN apt install -y python3-venv python3-pip
RUN pip install --upgrade pip
RUN pip install python-dateutil --upgrade
RUN apt install -y python3-full python3-dateutil

WORKDIR /usr/src/capillaries

Expand Down
4 changes: 2 additions & 2 deletions pkg/exe/deploy/capideploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ func main() {
log.Fatalf(err.Error())
}

fmt.Printf("Creating instances, consider clearing known_hosts:\n")
fmt.Printf("Creating instances, consider clearing known_hosts to avoid ssh complaints:\n")
for _, i := range instances {
fmt.Printf("ssh-keygen -f ~/.ssh/known_hosts -R %s\n", i.BestIpAddress())
fmt.Printf("ssh-keygen -f ~/.ssh/known_hosts -R %s;\n", i.BestIpAddress())
}

for iNickname, _ := range instances {
Expand Down
2 changes: 1 addition & 1 deletion pkg/exe/webapi/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.19
FROM golang:1.20

WORKDIR /usr/src/capillaries

Expand Down
10 changes: 10 additions & 0 deletions pkg/proc/data_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func selectBatchFromDataTablePaged(logger *l.Logger,
if err := scanner.Scan(*rs.Rows[rs.RowCount]...); err != nil {
return nil, cql.WrapDbErrorWithQuery("cannot scan paged data row", q, err)
}
// We assume gocql creates only UTC timestamps, so this is not needed.
// If we ever catch a ts stored in our tables with a non-UTC tz, or gocql returning a non-UTC tz - investigate it. Sanitizing is the last resort and should be avoided.
// if err := rs.SanitizeScannedDatetimesToUtc(rs.RowCount); err != nil {
// return nil, cql.WrapDbErrorWithQuery("cannot sanitize datetimes", q, err)
// }
rs.RowCount++
}

Expand Down Expand Up @@ -152,6 +157,11 @@ func selectBatchPagedAllRowids(logger *l.Logger,
if err := scanner.Scan(*rs.Rows[rs.RowCount]...); err != nil {
return nil, cql.WrapDbErrorWithQuery("cannot scan all rows data row", q, err)
}
// We assume gocql creates only UTC timestamps, so this is not needed
// If we ever catch a ts stored in our tables with a non-UTC tz, or gocql returning a non-UTC tz - investigate it. Sanitizing is the last resort and should be avoided.
// if err := rs.SanitizeScannedDatetimesToUtc(rs.RowCount); err != nil {
// return nil, cql.WrapDbErrorWithQuery("cannot sanitize datetimes", q, err)
// }
rs.RowCount++
}
if err := scanner.Err(); err != nil {
Expand Down
20 changes: 18 additions & 2 deletions pkg/proc/proc_table_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,9 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
defer instr.waitForWorkersAndCloseErrorsOut(logger, pCtx)

curStartLeftToken := startLeftToken
leftPageIdx := 0
for {
selectLeftBatchByTokenStartTime := time.Now()
lastRetrievedLeftToken, err := selectBatchFromTableByToken(logger,
pCtx,
rsLeft,
Expand All @@ -497,6 +499,9 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
if err != nil {
return bs, err
}

logger.DebugCtx(pCtx, "selectBatchFromTableByToken: leftPageIdx %d, queried tokens from %d to %d in %.3fs, retrieved %d rows", leftPageIdx, curStartLeftToken, endLeftToken, time.Since(selectLeftBatchByTokenStartTime).Seconds(), rsLeft.RowCount)

curStartLeftToken = lastRetrievedLeftToken + 1

if rsLeft.RowCount == 0 {
Expand Down Expand Up @@ -559,7 +564,9 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
sc.FieldRefs{sc.IdxKeyFieldRef()})

var idxPageState []byte
rightIdxPageIdx := 0
for {
selectIdxBatchStartTime := time.Now()
idxPageState, err = selectBatchFromIdxTablePaged(logger,
pCtx,
rsIdx,
Expand Down Expand Up @@ -589,14 +596,18 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
rowidsToFind[k] = struct{}{}
}

logger.DebugCtx(pCtx, "selectBatchFromIdxTablePaged: leftPageIdx %d, rightIdxPageIdx %d, queried %d keys in %.3fs, retrieved %d rowids", leftPageIdx, rightIdxPageIdx, len(keysToFind), time.Since(selectIdxBatchStartTime).Seconds(), len(rowidsToFind))

// Select from right table by rowid
rsRight := NewRowsetFromFieldRefs(
sc.FieldRefs{sc.RowidFieldRef(node.Lookup.TableCreator.Name)},
sc.FieldRefs{sc.RowidTokenFieldRef()},
srcRightFieldRefs)

var rightPageState []byte
rightDataPageIdx := 0
for {
selectBatchStartTime := time.Now()
rightPageState, err = selectBatchFromDataTablePaged(logger,
pCtx,
rsRight,
Expand All @@ -609,6 +620,8 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
return bs, err
}

logger.DebugCtx(pCtx, "selectBatchFromDataTablePaged: leftPageIdx %d, rightIdxPageIdx %d, rightDataPageIdx %d, queried %d rowids in %.3fs, retrieved %d rowids", leftPageIdx, rightIdxPageIdx, rightDataPageIdx, len(rowidsToFind), time.Since(selectBatchStartTime).Seconds(), rsRight.RowCount)

if rsRight.RowCount == 0 {
break
}
Expand Down Expand Up @@ -711,12 +724,14 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
if rsRight.RowCount < node.Lookup.RightLookupReadBatchSize || len(rightPageState) == 0 {
break
}
}
rightDataPageIdx++
} // for each data page

if rsIdx.RowCount < node.Lookup.IdxReadBatchSize || len(idxPageState) == 0 {
break
}
} // for each idx batch
rightIdxPageIdx++
} // for each idx page

if node.Lookup.IsGroup {
// Time to write the result of the grouped
Expand Down Expand Up @@ -860,6 +875,7 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
if rsLeft.RowCount < leftBatchSize {
break
}
leftPageIdx++
} // for each source table batch

// Write leftovers regardless of tableRecordBatchCount == 0
Expand Down
15 changes: 15 additions & 0 deletions pkg/proc/rowset.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,18 @@ func (rs *Rowset) ExportToVarsWithAlias(rowIdx int, vars *eval.VarValuesMap, use
}
return nil
}

// Force UTC TZ to each ts returned by gocql
// func (rs *Rowset) SanitizeScannedDatetimesToUtc(rowIdx int) error {
// for valIdx := 0; valIdx < len(rs.Fields); valIdx++ {
// if rs.Fields[valIdx].FieldType == sc.FieldTypeDateTime {
// origVolatile := (*rs.Rows[rowIdx])[valIdx]
// origDt, ok := origVolatile.(time.Time)
// if !ok {
// return fmt.Errorf("invalid type %t(%v), expected datetime", origVolatile, origVolatile)
// }
// (*rs.Rows[rowIdx])[valIdx] = origDt.In(time.UTC)
// }
// }
// return nil
// }
2 changes: 1 addition & 1 deletion pkg/sc/file_creator_def.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (creatorDef *FileCreatorDef) Deserialize(rawWriter json.RawMessage) error {
creatorDef.Csv.Separator = ","
}
} else {
return fmt.Errorf("cannot cannot detect file creator type")
return fmt.Errorf("cannot cannot detect file creator type: parquet dhould have column_name, csv should have header etc")
}

// Having
Expand Down
2 changes: 1 addition & 1 deletion pkg/sc/file_reader_def.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (frDef *FileReaderDef) Deserialize(rawReader json.RawMessage) error {
}

if frDef.ReaderFileType == ReaderFileTypeUnknown {
errors = append(errors, "cannot detect file reader type")
errors = append(errors, "cannot detect file reader type: parquet should have col_name, csv should have col_hdr or col_idx etc")
}

if len(errors) > 0 {
Expand Down
15 changes: 9 additions & 6 deletions pkg/storage/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,33 +208,36 @@ func ParquetReadDateTime(val interface{}, se *gp_parquet.SchemaElement) (time.Ti
if !isParquetDateTime(se) && !isParquetInt96Date(se) && !isParquetInt32Date(se) {
return sc.DefaultDateTime(), fmt.Errorf("cannot read parquet datetime, schema %v", se)
}
// Important: all time constructor below createdatetime objects with Local TZ.
// This is not good because our time.Format("2006-01-02") will use this TZ and produce a datetime for a local TZ, causing confusion.
// Only UTC times should be used internally.
switch typedVal := val.(type) {
case int32:
if isParquetInt32Date(se) {
// It's a number of days from UNIX epoch
return time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).AddDate(0, 0, int(typedVal)), nil
return time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).AddDate(0, 0, int(typedVal)).In(time.UTC), nil
} else {
switch *se.ConvertedType {
case gp_parquet.ConvertedType_TIMESTAMP_MILLIS:
return time.UnixMilli(int64(typedVal)), nil
return time.UnixMilli(int64(typedVal)).In(time.UTC), nil
case gp_parquet.ConvertedType_TIMESTAMP_MICROS:
return time.UnixMicro(int64(typedVal)), nil
return time.UnixMicro(int64(typedVal)).In(time.UTC), nil
default:
return sc.DefaultDateTime(), fmt.Errorf("cannot read parquet datetime from int32, unsupported converted type, schema %v", se)
}
}
case int64:
switch *se.ConvertedType {
case gp_parquet.ConvertedType_TIMESTAMP_MILLIS:
return time.UnixMilli(typedVal), nil
return time.UnixMilli(typedVal).In(time.UTC), nil
case gp_parquet.ConvertedType_TIMESTAMP_MICROS:
return time.UnixMicro(typedVal), nil
return time.UnixMicro(typedVal).In(time.UTC), nil
default:
return sc.DefaultDateTime(), fmt.Errorf("cannot read parquet datetime from int64, unsupported converted type, schema %v", se)
}
case [12]byte:
// Deprecated parquet int96 timestamp
return gp.Int96ToTime(typedVal), nil
return gp.Int96ToTime(typedVal).In(time.UTC), nil
default:
return sc.DefaultDateTime(), fmt.Errorf("cannot read parquet datetime from %T, schema %v", se, typedVal)
}
Expand Down
14 changes: 11 additions & 3 deletions test/code/lookup/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@

Created using Ubuntu WSL. Other Linux flavors and MacOS may require edits.

`quicktest` - small number of items, CSV input/output
`bigtest` - large naumber of items. CSV and Parquet input/output
## lookup_quicktest vs lookup_bigtest

This test comes in two flavors.

portfolio_quicktest has all data ready, it just has to be copied to /tmp/capi_*, and you can run the test. Root-level [copy_demo_data.sh](../../../copy_demo_data.sh) script does that, among other things.

lookup_bigtest is a variation of this test that uses:
- large number of orders
- parquet files for input and output
and requires test data to be generated - see [1_create_test.data.sh](./bigtest/1_create_data.sh).

## Workflow

Expand All @@ -30,7 +38,7 @@ See [integration tests](../../../doc/testing.md#integration-tests) section for g

## Possible edits

Play with number of total line items (see "-items=..." in [1_create_data.sh](quicktest/1_create_data.sh)).
Play with number of total line items (see "-items=..." in [1_create_data.sh](./quicktest/1_create_data.sh)).

## References:

Expand Down
Loading

0 comments on commit d9b1614

Please sign in to comment.