Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add DQM Logging on GRPC Server with FileLogStorage for Testing #2403

Merged
merged 97 commits into from
Apr 13, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
97 commits
Select commit Hold shift + click to select a range
65880a9
Make a proof of concept
kevjumba Mar 9, 2022
b32b157
Update
kevjumba Mar 11, 2022
1996fd9
revert feature store
kevjumba Mar 17, 2022
c6656d4
refactor
kevjumba Mar 17, 2022
17c8168
Add time
kevjumba Mar 17, 2022
a4efbce
Add time
kevjumba Mar 17, 2022
79d8ba7
clean up
kevjumba Mar 17, 2022
cd652bc
Add comment
kevjumba Mar 17, 2022
4a13e8e
Add pseudocode
kevjumba Mar 17, 2022
a3f8384
Refactor logging functionality to hide internals
kevjumba Mar 20, 2022
706ee80
Refactor
kevjumba Mar 20, 2022
1831003
Revert changes
kevjumba Mar 20, 2022
9e669c3
Add tests
kevjumba Mar 20, 2022
b972634
Add new timeout test
kevjumba Mar 21, 2022
42bdc8b
Fix python ci for m1 mac
kevjumba Mar 25, 2022
ab8d1e0
Fix lint
kevjumba Mar 29, 2022
f6c2c87
Working state
kevjumba Mar 30, 2022
b6b412d
Move offline log store
kevjumba Mar 30, 2022
cb1da99
refactor
kevjumba Mar 30, 2022
155fc77
Update logs
kevjumba Mar 30, 2022
86f2208
Update log storage
kevjumba Mar 30, 2022
746dae9
WOrking state
kevjumba Mar 30, 2022
514c2f6
Work
kevjumba Mar 31, 2022
161d81d
Add tests for filestorage
kevjumba Mar 31, 2022
beec6b2
Fix logging
kevjumba Mar 31, 2022
2a1d2ec
Add more tests
kevjumba Mar 31, 2022
9446e05
Fix
kevjumba Mar 31, 2022
31e0de8
Fix
kevjumba Mar 31, 2022
137d700
Clean up
kevjumba Mar 31, 2022
ee847cd
Update error
kevjumba Mar 31, 2022
ce12eee
semi working state
kevjumba Apr 1, 2022
79c700c
b state
kevjumba Apr 4, 2022
6cbf2c2
Update types to be public
kevjumba Apr 6, 2022
aefda92
Update structs to make fields public
kevjumba Apr 6, 2022
29f72b0
Fix
kevjumba Apr 6, 2022
c4dbb84
clean up
kevjumba Apr 6, 2022
5d7978f
Fix
kevjumba Apr 7, 2022
686c583
Fix go
kevjumba Apr 7, 2022
884b014
Fix issues
kevjumba Apr 7, 2022
c02cf83
Fix
kevjumba Apr 7, 2022
e89abc1
Fix tests
kevjumba Apr 7, 2022
559bb83
Fix
kevjumba Apr 7, 2022
0f2aca9
Working state
kevjumba Apr 8, 2022
c631ef8
Fix
kevjumba Apr 8, 2022
b0a2166
fix
kevjumba Apr 8, 2022
9ed79e8
fix
kevjumba Apr 8, 2022
e472104
Clean up code a bit
kevjumba Apr 8, 2022
629be18
Fixes
kevjumba Apr 8, 2022
8a789fb
Fix
kevjumba Apr 8, 2022
6357efd
Fix tests
kevjumba Apr 8, 2022
3168473
Fix
kevjumba Apr 8, 2022
00228f7
Clean up
kevjumba Apr 8, 2022
b692e81
Update schema functionality
kevjumba Apr 9, 2022
49cd91a
Remove xitongsys parquet reader
kevjumba Apr 9, 2022
4890116
Clean up
kevjumba Apr 11, 2022
71e1e56
Fix go mode
kevjumba Apr 11, 2022
050db82
Fix tests and errors and everything
kevjumba Apr 11, 2022
dd235ca
Fix tests
kevjumba Apr 11, 2022
7c92d93
Fix
kevjumba Apr 11, 2022
4265efd
Remove unused code
kevjumba Apr 11, 2022
79cbe42
Fix
kevjumba Apr 11, 2022
21e99bd
Last working commit
kevjumba Apr 11, 2022
1b173da
work
kevjumba Apr 11, 2022
b5484c3
Address some changes
kevjumba Apr 11, 2022
5458034
More addresses.
kevjumba Apr 11, 2022
17b2bf4
Fix more review comments
kevjumba Apr 11, 2022
0a1802d
Fix
kevjumba Apr 11, 2022
18c7d60
Fix
kevjumba Apr 12, 2022
a39c9b5
Rename
kevjumba Apr 12, 2022
a4587c3
Fix
kevjumba Apr 12, 2022
83c5f89
Add request id
kevjumba Apr 12, 2022
86e605d
More fixes
kevjumba Apr 12, 2022
fa14dae
Fix odfv
kevjumba Apr 12, 2022
9f81d04
Fix
kevjumba Apr 12, 2022
e87fe22
Fix
kevjumba Apr 12, 2022
bdf0c2b
Address other changes
kevjumba Apr 12, 2022
f59f98d
Reorder for optimization
kevjumba Apr 12, 2022
d185ccd
Fix
kevjumba Apr 12, 2022
3747a5d
Add more shcema tests
kevjumba Apr 12, 2022
e9bd35b
Fix tests
kevjumba Apr 12, 2022
89974b3
refactor to clean
kevjumba Apr 12, 2022
d51544b
Add initialized repo for testing
kevjumba Apr 13, 2022
e0a4ec6
Fix
kevjumba Apr 13, 2022
5f9a50e
Remove
kevjumba Apr 13, 2022
1a5d98e
Fix tests
kevjumba Apr 13, 2022
b4d94dc
Fix
kevjumba Apr 13, 2022
8d9d0f9
Fix tests
kevjumba Apr 13, 2022
4e65987
Fix
kevjumba Apr 13, 2022
9d4effa
Text
kevjumba Apr 13, 2022
e604750
Fix
kevjumba Apr 13, 2022
49fb8bc
Fix?
kevjumba Apr 13, 2022
72b1700
Fix
kevjumba Apr 13, 2022
b4f7f41
Fix
kevjumba Apr 13, 2022
f19d19d
remove entity map
pyalex Apr 13, 2022
0145e3d
remove Cache method from registry
pyalex Apr 13, 2022
cad1156
clean up pre-initialized repo
pyalex Apr 13, 2022
89960b2
git ignore full data directory in tests
pyalex Apr 13, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ coverage.xml
.hypothesis/
.pytest_cache/
infra/scripts/*.conf
go/internal/test/feature_repo
go/cmd/server/logging/feature_repo/data/

# Translations
*.mo
Expand Down
30 changes: 14 additions & 16 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,12 @@ require (
github.com/go-python/gopy v0.4.0
github.com/go-redis/redis/v8 v8.11.4
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.2.0
github.com/google/uuid v1.3.0
github.com/mattn/go-sqlite3 v1.14.12
github.com/spaolacci/murmur3 v1.1.0
github.com/stretchr/testify v1.7.0
google.golang.org/grpc v1.44.0
google.golang.org/protobuf v1.27.1
google.golang.org/grpc v1.45.0
google.golang.org/protobuf v1.28.0
)

require (
Expand All @@ -23,28 +23,26 @@ require (
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/goccy/go-json v0.7.10 // indirect
github.com/goccy/go-json v0.9.6 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/gonuts/commander v0.1.0 // indirect
github.com/gonuts/flag v0.1.0 // indirect
github.com/google/flatbuffers v2.0.5+incompatible // indirect
github.com/klauspost/asmfmt v1.3.1 // indirect
github.com/google/flatbuffers v2.0.6+incompatible // indirect
github.com/klauspost/asmfmt v1.3.2 // indirect
github.com/klauspost/compress v1.15.1 // indirect
github.com/klauspost/cpuid/v2 v2.0.9 // indirect
github.com/klauspost/cpuid/v2 v2.0.12 // indirect
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect
github.com/pierrec/lz4/v4 v4.1.12 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pierrec/lz4/v4 v4.1.14 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/zeebo/xxh3 v1.0.1 // indirect
golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
golang.org/x/exp v0.0.0-20220407100705-7b9b53b0aca4 // indirect
golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect
golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect
golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3 // indirect
golang.org/x/sys v0.0.0-20220406163625-3f8b81556e12 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/tools v0.1.10 // indirect
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect
google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 // indirect
google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect
)
Expand Down
53 changes: 34 additions & 19 deletions go.sum

Large diffs are not rendered by default.

Empty file.
Binary file not shown.
40 changes: 40 additions & 0 deletions go/cmd/server/logging/feature_repo/example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# This is an example feature definition file

from google.protobuf.duration_pb2 import Duration

from feast import Entity, Feature, FeatureView, FileSource, ValueType, FeatureService

# Read data from parquet files. Parquet is convenient for local development mode. For
# production, you can use your favorite DWH, such as BigQuery. See Feast documentation
# for more info.
driver_hourly_stats = FileSource(
path="driver_stats.parquet",
event_timestamp_column="event_timestamp",
created_timestamp_column="created",
)

# Define an entity for the driver. You can think of entity as a primary key used to
# fetch features.
driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",)

# Our parquet files contain sample data that includes a driver_id column, timestamps and
# three feature column. Here we define a Feature View that will allow us to serve this
# data to our model online.
driver_hourly_stats_view = FeatureView(
name="driver_hourly_stats",
entities=["driver_id"],
ttl=Duration(seconds=86400 * 365 * 10),
features=[
Feature(name="conv_rate", dtype=ValueType.FLOAT),
Feature(name="acc_rate", dtype=ValueType.FLOAT),
Feature(name="avg_daily_trips", dtype=ValueType.INT64),
],
online=True,
batch_source=driver_hourly_stats,
tags={},
)

driver_stats_fs = FeatureService(
name="test_service",
features=[driver_hourly_stats_view]
)
5 changes: 5 additions & 0 deletions go/cmd/server/logging/feature_repo/feature_store.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
project: feature_repo
registry: data/registry.db
provider: local
online_store:
path: data/online_store.db
86 changes: 86 additions & 0 deletions go/cmd/server/logging/filelogstorage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package logging

import (
"errors"
"fmt"
"io"
"os"
"path/filepath"

"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/parquet"
"github.com/apache/arrow/go/v8/parquet/pqarrow"
"github.com/feast-dev/feast/go/internal/feast/registry"
)

type FileLogStorage struct {
// Feast project name
project string
path string
}

func GetFileConfig(config *registry.RepoConfig) (*OfflineLogStoreConfig, error) {
fileConfig := OfflineLogStoreConfig{
storeType: "file",
}
if onlineStorePath, ok := config.OfflineStore["path"]; ok {
path, success := onlineStorePath.(string)
if !success {
return &fileConfig, fmt.Errorf("path, %s, cannot be converted to string", path)
}
fileConfig.path = path
} else {
return nil, errors.New("need path for file log storage")
}
return &fileConfig, nil
}

// This offline store is currently only used for testing. It will be instantiated during go unit tests to log to file
// and the parquet files will be cleaned up after the test is run.
func NewFileOfflineStore(project string, offlineStoreConfig *OfflineLogStoreConfig) (*FileLogStorage, error) {
store := FileLogStorage{project: project}
var absPath string
var err error
// TODO(kevjumba) remove this default catch.
if offlineStoreConfig.path != "" {
absPath, err = filepath.Abs(offlineStoreConfig.path)
} else {
return nil, errors.New("need path for file log storage")
}
if err != nil {
return nil, err
}
store.path = absPath
return &store, nil
}

func openLogFile(absPath string) (*os.File, error) {
var _, err = os.Stat(absPath)

// create file if not exists
if os.IsNotExist(err) {
var file, err = os.Create(absPath)
if err != nil {
return nil, err
}
return file, nil
} else {
return nil, fmt.Errorf("path %s already exists", absPath)
}
}

func (f *FileLogStorage) FlushToStorage(tbl array.Table) error {
w, err := openLogFile(f.path)
var writer io.Writer = w
if err != nil {
return err
}
props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false))
arrProps := pqarrow.DefaultWriterProps()
err = pqarrow.WriteTable(tbl, writer, 100, props, arrProps)
if err != nil {
return err
}
return nil

}
70 changes: 70 additions & 0 deletions go/cmd/server/logging/filelogstorage_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package logging

import (
"context"
"path/filepath"

"testing"

"github.com/apache/arrow/go/v8/arrow/array"
"github.com/apache/arrow/go/v8/arrow/memory"
"github.com/apache/arrow/go/v8/parquet/file"
"github.com/apache/arrow/go/v8/parquet/pqarrow"
"github.com/feast-dev/feast/go/internal/test"
"github.com/stretchr/testify/assert"
)

func TestFlushToStorage(t *testing.T) {
ctx := context.Background()
table, expectedSchema, expectedColumns, err := GetTestArrowTableAndExpectedResults()
defer table.Release()
assert.Nil(t, err)
offlineStoreConfig := OfflineLogStoreConfig{
storeType: "file",
path: "./log.parquet",
}
fileStore, err := NewFileOfflineStore("test", &offlineStoreConfig)
assert.Nil(t, err)
err = fileStore.FlushToStorage(array.Table(table))
assert.Nil(t, err)
logPath, err := filepath.Abs(offlineStoreConfig.path)
assert.Nil(t, err)
pf, err := file.OpenParquetFile(logPath, false)
assert.Nil(t, err)

reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator)
assert.Nil(t, err)

tbl, err := reader.ReadTable(ctx)
assert.Nil(t, err)
tr := array.NewTableReader(tbl, -1)
defer tbl.Release()

defer tr.Release()
for tr.Next() {
rec := tr.Record()
assert.NotNil(t, rec)
for _, field := range rec.Schema().Fields() {
assert.Contains(t, expectedSchema, field.Name)
assert.Equal(t, field.Type, expectedSchema[field.Name])
}
values, err := test.GetProtoFromRecord(rec)

assert.Nil(t, err)
for name, val := range values {
if name == "RequestId" {
// Ensure there are request ids in record.
assert.Greater(t, len(val.Val), 0)
} else {
assert.Equal(t, len(val.Val), len(expectedColumns[name].Val))
for idx, featureVal := range val.Val {
assert.Equal(t, featureVal.Val, expectedColumns[name].Val[idx].Val)
}
}
}
}

err = test.CleanUpFile(logPath)
assert.Nil(t, err)

}
Loading