Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Portfolio bigtest #43

Merged
merged 19 commits into from
Aug 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/go.yml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- name: Set up Go
uses: actions/setup-go@v3
with:
go-version: 1.19
go-version: '1.20'

- name: Build
run: go build -v ./...
Expand Down Expand Up @@ -60,7 +60,7 @@ jobs:

- name: pkg/eval test coverage threshold
env:
TESTCOVERAGE_THRESHOLD: 80.89
TESTCOVERAGE_THRESHOLD: 80.99
run: |
echo "Quality Gate: checking test coverage is above threshold $TESTCOVERAGE_THRESHOLD %..."
go test -v ./pkg/eval/... -coverprofile coverage.out -covermode count
Expand Down
9 changes: 9 additions & 0 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,15 @@
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Generate portfolio bigtest data",
"type": "go",
"request": "launch",
"mode": "debug",
"cwd":"${workspaceFolder}/test/code/portfolio/bigtest",
"program": "${workspaceFolder}/test/code/portfolio/bigtest/generate_bigtest_data.go",
"args": [""]
},
{
"name": "Deploy create security group",
"type": "go",
Expand Down
2 changes: 1 addition & 1 deletion doc/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

cd into any directory under pkg/ and run
```
go test - v
go test -v
```

To see test code coverage:
Expand Down
6 changes: 6 additions & 0 deletions pkg/eval/eval_ctx.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ func (eCtx *EvalCtx) EvalFunc(callExp *ast.CallExpr, funcName string, args []int
switch funcName {
case "math.Sqrt":
eCtx.Value, err = callMathSqrt(args)
case "math.Round":
eCtx.Value, err = callMathRound(args)
case "len":
eCtx.Value, err = callLen(args)
case "string":
Expand All @@ -472,6 +474,10 @@ func (eCtx *EvalCtx) EvalFunc(callExp *ast.CallExpr, funcName string, args []int
eCtx.Value, err = callTimeUnixMilli(args)
case "time.DiffMilli":
eCtx.Value, err = callTimeDiffMilli(args)
case "time.Before":
eCtx.Value, err = callTimeBefore(args)
case "time.After":
eCtx.Value, err = callTimeAfter(args)
case "time.FixedZone":
eCtx.Value, err = callTimeFixedZone(args)
case "re.MatchString":
Expand Down
12 changes: 12 additions & 0 deletions pkg/eval/math.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,15 @@ func callMathSqrt(args []interface{}) (interface{}, error) {

return math.Sqrt(argFloat), nil
}

func callMathRound(args []interface{}) (interface{}, error) {
if err := checkArgs("math.Round", 1, len(args)); err != nil {
return nil, err
}
argFloat, err := castToFloat64(args[0])
if err != nil {
return nil, fmt.Errorf("cannot evaluate math.Round(), invalid args %v: [%s]", args, err.Error())
}

return math.Round(argFloat), nil
}
4 changes: 4 additions & 0 deletions pkg/eval/math_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,8 @@ func TestMathFunctions(t *testing.T) {
assertEvalError(t, `math.Sqrt("aa")`, "cannot evaluate math.Sqrt(), invalid args [aa]: [cannot cast aa(string) to float64, unsuported type]", varValuesMap)
assertFloatNan(t, "math.Sqrt(-1)", varValuesMap)
assertEvalError(t, "math.Sqrt(123,567)", "cannot evaluate math.Sqrt(), requires 1 args, 2 supplied", varValuesMap)

assertEqual(t, "math.Round(5.1)", 5.0, varValuesMap)
assertEvalError(t, `math.Round("aa")`, "cannot evaluate math.Round(), invalid args [aa]: [cannot cast aa(string) to float64, unsuported type]", varValuesMap)
assertEvalError(t, "math.Round(5,1)", "cannot evaluate math.Round(), requires 1 args, 2 supplied", varValuesMap)
}
26 changes: 26 additions & 0 deletions pkg/eval/time.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,3 +102,29 @@ func callTimeUnixMilli(args []interface{}) (interface{}, error) {

return arg0.UnixMilli(), nil
}

func callTimeBefore(args []interface{}) (interface{}, error) {
if err := checkArgs("time.Before", 2, len(args)); err != nil {
return nil, err
}
arg0, ok0 := args[0].(time.Time)
arg1, ok1 := args[1].(time.Time)
if !ok0 || !ok1 {
return nil, fmt.Errorf("cannot evaluate time.Before(), invalid args %v", args)
}

return arg0.Before(arg1), nil
}

func callTimeAfter(args []interface{}) (interface{}, error) {
if err := checkArgs("time.After", 2, len(args)); err != nil {
return nil, err
}
arg0, ok0 := args[0].(time.Time)
arg1, ok1 := args[1].(time.Time)
if !ok0 || !ok1 {
return nil, fmt.Errorf("cannot evaluate time.After(), invalid args %v", args)
}

return arg0.After(arg1), nil
}
7 changes: 7 additions & 0 deletions pkg/eval/time_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@ import (

func TestTimeFunctions(t *testing.T) {
testTime := time.Date(2001, 1, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200))
testTimeUtc := time.Date(2001, 1, 1, 0, 0, 0, 0, time.UTC)
varValuesMap := VarValuesMap{
"t": map[string]interface{}{
"test_time": testTime}}
assertEqual(t, `time.Parse("2006-01-02T15:04:05.000-0700","2001-01-01T01:01:01.100-0200")`, testTime, varValuesMap)
assertEvalError(t, `time.Parse("2006-01-02T15:04:05.000-0700","2001-01-01T01:01:01.100-0200","aaa")`, "cannot evaluate time.Parse(), requires 2 args, 3 supplied", varValuesMap)
assertEvalError(t, `time.Parse("2006-01-02T15:04:05.000-0700",123)`, "cannot evaluate time.Parse(), invalid args [2006-01-02T15:04:05.000-0700 123]", varValuesMap)
assertEvalError(t, `time.Parse("2006-01-02T15:04:05.000-0700","2001-01-01T01:01:01")`, `parsing time "2001-01-01T01:01:01" as "2006-01-02T15:04:05.000-0700": cannot parse "" as ".000"`, varValuesMap)
assertEqual(t, `time.Parse("2006-01-02","2001-01-01")`, testTimeUtc, varValuesMap)

assertEqual(t, `time.Format(time.Date(2001, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), "2006-01-02T15:04:05.000-0700")`, testTime.Format("2006-01-02T15:04:05.000-0700"), varValuesMap)

Expand All @@ -29,6 +31,11 @@ func TestTimeFunctions(t *testing.T) {
assertEqual(t, `time.DiffMilli(time.Date(2002, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, int64(31536000000), varValuesMap)
assertEqual(t, `time.DiffMilli(time.Date(2000, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, int64(-31622400000), varValuesMap)

assertEqual(t, `time.Before(time.Date(2000, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, true, varValuesMap)
assertEqual(t, `time.After(time.Date(2002, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, true, varValuesMap)
assertEqual(t, `time.Before(time.Date(2002, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, false, varValuesMap)
assertEqual(t, `time.After(time.Date(2000, time.January, 1, 1, 1, 1, 100000000, time.FixedZone("", -7200)), t.test_time)`, false, varValuesMap)

assertEqual(t, `time.Unix(t.test_time)`, testTime.Unix(), varValuesMap)

assertEqual(t, `time.UnixMilli(t.test_time)`, testTime.UnixMilli(), varValuesMap)
Expand Down
6 changes: 2 additions & 4 deletions pkg/exe/daemon/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
FROM golang:1.19
FROM golang:1.20

# These image lacks pip and dateutil
RUN apt update
RUN apt install -y python3-venv python3-pip
RUN pip install --upgrade pip
RUN pip install python-dateutil --upgrade
RUN apt install -y python3-full python3-dateutil

WORKDIR /usr/src/capillaries

Expand Down
4 changes: 2 additions & 2 deletions pkg/exe/deploy/capideploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,9 +258,9 @@ func main() {
log.Fatalf(err.Error())
}

fmt.Printf("Creating instances, consider clearing known_hosts:\n")
fmt.Printf("Creating instances, consider clearing known_hosts to avoid ssh complaints:\n")
for _, i := range instances {
fmt.Printf("ssh-keygen -f ~/.ssh/known_hosts -R %s\n", i.BestIpAddress())
fmt.Printf("ssh-keygen -f ~/.ssh/known_hosts -R %s;\n", i.BestIpAddress())
}

for iNickname, _ := range instances {
Expand Down
2 changes: 1 addition & 1 deletion pkg/exe/webapi/docker/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM golang:1.19
FROM golang:1.20

WORKDIR /usr/src/capillaries

Expand Down
10 changes: 10 additions & 0 deletions pkg/proc/data_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func selectBatchFromDataTablePaged(logger *l.Logger,
if err := scanner.Scan(*rs.Rows[rs.RowCount]...); err != nil {
return nil, cql.WrapDbErrorWithQuery("cannot scan paged data row", q, err)
}
// We assume gocql creates only UTC timestamps, so this is not needed.
// If we ever catch a ts stored in our tables with a non-UTC tz, or gocql returning a non-UTC tz - investigate it. Sanitizing is the last resort and should be avoided.
// if err := rs.SanitizeScannedDatetimesToUtc(rs.RowCount); err != nil {
// return nil, cql.WrapDbErrorWithQuery("cannot sanitize datetimes", q, err)
// }
rs.RowCount++
}

Expand Down Expand Up @@ -152,6 +157,11 @@ func selectBatchPagedAllRowids(logger *l.Logger,
if err := scanner.Scan(*rs.Rows[rs.RowCount]...); err != nil {
return nil, cql.WrapDbErrorWithQuery("cannot scan all rows data row", q, err)
}
// We assume gocql creates only UTC timestamps, so this is not needed
// If we ever catch a ts stored in our tables with a non-UTC tz, or gocql returning a non-UTC tz - investigate it. Sanitizing is the last resort and should be avoided.
// if err := rs.SanitizeScannedDatetimesToUtc(rs.RowCount); err != nil {
// return nil, cql.WrapDbErrorWithQuery("cannot sanitize datetimes", q, err)
// }
rs.RowCount++
}
if err := scanner.Err(); err != nil {
Expand Down
20 changes: 18 additions & 2 deletions pkg/proc/proc_table_creator.go
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,9 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
defer instr.waitForWorkersAndCloseErrorsOut(logger, pCtx)

curStartLeftToken := startLeftToken
leftPageIdx := 0
for {
selectLeftBatchByTokenStartTime := time.Now()
lastRetrievedLeftToken, err := selectBatchFromTableByToken(logger,
pCtx,
rsLeft,
Expand All @@ -497,6 +499,9 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
if err != nil {
return bs, err
}

logger.DebugCtx(pCtx, "selectBatchFromTableByToken: leftPageIdx %d, queried tokens from %d to %d in %.3fs, retrieved %d rows", leftPageIdx, curStartLeftToken, endLeftToken, time.Since(selectLeftBatchByTokenStartTime).Seconds(), rsLeft.RowCount)

curStartLeftToken = lastRetrievedLeftToken + 1

if rsLeft.RowCount == 0 {
Expand Down Expand Up @@ -559,7 +564,9 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
sc.FieldRefs{sc.IdxKeyFieldRef()})

var idxPageState []byte
rightIdxPageIdx := 0
for {
selectIdxBatchStartTime := time.Now()
idxPageState, err = selectBatchFromIdxTablePaged(logger,
pCtx,
rsIdx,
Expand Down Expand Up @@ -589,14 +596,18 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
rowidsToFind[k] = struct{}{}
}

logger.DebugCtx(pCtx, "selectBatchFromIdxTablePaged: leftPageIdx %d, rightIdxPageIdx %d, queried %d keys in %.3fs, retrieved %d rowids", leftPageIdx, rightIdxPageIdx, len(keysToFind), time.Since(selectIdxBatchStartTime).Seconds(), len(rowidsToFind))

// Select from right table by rowid
rsRight := NewRowsetFromFieldRefs(
sc.FieldRefs{sc.RowidFieldRef(node.Lookup.TableCreator.Name)},
sc.FieldRefs{sc.RowidTokenFieldRef()},
srcRightFieldRefs)

var rightPageState []byte
rightDataPageIdx := 0
for {
selectBatchStartTime := time.Now()
rightPageState, err = selectBatchFromDataTablePaged(logger,
pCtx,
rsRight,
Expand All @@ -609,6 +620,8 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
return bs, err
}

logger.DebugCtx(pCtx, "selectBatchFromDataTablePaged: leftPageIdx %d, rightIdxPageIdx %d, rightDataPageIdx %d, queried %d rowids in %.3fs, retrieved %d rowids", leftPageIdx, rightIdxPageIdx, rightDataPageIdx, len(rowidsToFind), time.Since(selectBatchStartTime).Seconds(), rsRight.RowCount)

if rsRight.RowCount == 0 {
break
}
Expand Down Expand Up @@ -711,12 +724,14 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
if rsRight.RowCount < node.Lookup.RightLookupReadBatchSize || len(rightPageState) == 0 {
break
}
}
rightDataPageIdx++
} // for each data page

if rsIdx.RowCount < node.Lookup.IdxReadBatchSize || len(idxPageState) == 0 {
break
}
} // for each idx batch
rightIdxPageIdx++
} // for each idx page

if node.Lookup.IsGroup {
// Time to write the result of the grouped
Expand Down Expand Up @@ -860,6 +875,7 @@ func RunCreateTableRelForBatch(envConfig *env.EnvConfig,
if rsLeft.RowCount < leftBatchSize {
break
}
leftPageIdx++
} // for each source table batch

// Write leftovers regardless of tableRecordBatchCount == 0
Expand Down
15 changes: 15 additions & 0 deletions pkg/proc/rowset.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,3 +238,18 @@ func (rs *Rowset) ExportToVarsWithAlias(rowIdx int, vars *eval.VarValuesMap, use
}
return nil
}

// Force UTC TZ to each ts returned by gocql
// func (rs *Rowset) SanitizeScannedDatetimesToUtc(rowIdx int) error {
// for valIdx := 0; valIdx < len(rs.Fields); valIdx++ {
// if rs.Fields[valIdx].FieldType == sc.FieldTypeDateTime {
// origVolatile := (*rs.Rows[rowIdx])[valIdx]
// origDt, ok := origVolatile.(time.Time)
// if !ok {
// return fmt.Errorf("invalid type %t(%v), expected datetime", origVolatile, origVolatile)
// }
// (*rs.Rows[rowIdx])[valIdx] = origDt.In(time.UTC)
// }
// }
// return nil
// }
2 changes: 1 addition & 1 deletion pkg/sc/file_creator_def.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (creatorDef *FileCreatorDef) Deserialize(rawWriter json.RawMessage) error {
creatorDef.Csv.Separator = ","
}
} else {
return fmt.Errorf("cannot cannot detect file creator type")
return fmt.Errorf("cannot cannot detect file creator type: parquet dhould have column_name, csv should have header etc")
}

// Having
Expand Down
2 changes: 1 addition & 1 deletion pkg/sc/file_reader_def.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (frDef *FileReaderDef) Deserialize(rawReader json.RawMessage) error {
}

if frDef.ReaderFileType == ReaderFileTypeUnknown {
errors = append(errors, "cannot detect file reader type")
errors = append(errors, "cannot detect file reader type: parquet should have col_name, csv should have col_hdr or col_idx etc")
}

if len(errors) > 0 {
Expand Down
15 changes: 9 additions & 6 deletions pkg/storage/parquet.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,33 +208,36 @@ func ParquetReadDateTime(val interface{}, se *gp_parquet.SchemaElement) (time.Ti
if !isParquetDateTime(se) && !isParquetInt96Date(se) && !isParquetInt32Date(se) {
return sc.DefaultDateTime(), fmt.Errorf("cannot read parquet datetime, schema %v", se)
}
// Important: all time constructor below createdatetime objects with Local TZ.
// This is not good because our time.Format("2006-01-02") will use this TZ and produce a datetime for a local TZ, causing confusion.
// Only UTC times should be used internally.
switch typedVal := val.(type) {
case int32:
if isParquetInt32Date(se) {
// It's a number of days from UNIX epoch
return time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).AddDate(0, 0, int(typedVal)), nil
return time.Date(1970, 1, 1, 0, 0, 0, 0, time.UTC).AddDate(0, 0, int(typedVal)).In(time.UTC), nil
} else {
switch *se.ConvertedType {
case gp_parquet.ConvertedType_TIMESTAMP_MILLIS:
return time.UnixMilli(int64(typedVal)), nil
return time.UnixMilli(int64(typedVal)).In(time.UTC), nil
case gp_parquet.ConvertedType_TIMESTAMP_MICROS:
return time.UnixMicro(int64(typedVal)), nil
return time.UnixMicro(int64(typedVal)).In(time.UTC), nil
default:
return sc.DefaultDateTime(), fmt.Errorf("cannot read parquet datetime from int32, unsupported converted type, schema %v", se)
}
}
case int64:
switch *se.ConvertedType {
case gp_parquet.ConvertedType_TIMESTAMP_MILLIS:
return time.UnixMilli(typedVal), nil
return time.UnixMilli(typedVal).In(time.UTC), nil
case gp_parquet.ConvertedType_TIMESTAMP_MICROS:
return time.UnixMicro(typedVal), nil
return time.UnixMicro(typedVal).In(time.UTC), nil
default:
return sc.DefaultDateTime(), fmt.Errorf("cannot read parquet datetime from int64, unsupported converted type, schema %v", se)
}
case [12]byte:
// Deprecated parquet int96 timestamp
return gp.Int96ToTime(typedVal), nil
return gp.Int96ToTime(typedVal).In(time.UTC), nil
default:
return sc.DefaultDateTime(), fmt.Errorf("cannot read parquet datetime from %T, schema %v", se, typedVal)
}
Expand Down
14 changes: 11 additions & 3 deletions test/code/lookup/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,16 @@

Created using Ubuntu WSL. Other Linux flavors and MacOS may require edits.

`quicktest` - small number of items, CSV input/output
`bigtest` - large naumber of items. CSV and Parquet input/output
## lookup_quicktest vs lookup_bigtest

This test comes in two flavors.

portfolio_quicktest has all data ready, it just has to be copied to /tmp/capi_*, and you can run the test. Root-level [copy_demo_data.sh](../../../copy_demo_data.sh) script does that, among other things.

lookup_bigtest is a variation of this test that uses:
- large number of orders
- parquet files for input and output
and requires test data to be generated - see [1_create_test.data.sh](./bigtest/1_create_data.sh).

## Workflow

Expand All @@ -30,7 +38,7 @@ See [integration tests](../../../doc/testing.md#integration-tests) section for g

## Possible edits

Play with number of total line items (see "-items=..." in [1_create_data.sh](quicktest/1_create_data.sh)).
Play with number of total line items (see "-items=..." in [1_create_data.sh](./quicktest/1_create_data.sh)).

## References:

Expand Down
Loading