diff --git a/.gitignore b/.gitignore index b285234c73..0f3165e841 100644 --- a/.gitignore +++ b/.gitignore @@ -105,7 +105,7 @@ coverage.xml .hypothesis/ .pytest_cache/ infra/scripts/*.conf -go/internal/test/feature_repo +go/cmd/server/logging/feature_repo/data/ # Translations *.mo diff --git a/go.mod b/go.mod index a42179f7bc..585072e9fb 100644 --- a/go.mod +++ b/go.mod @@ -8,12 +8,12 @@ require ( github.com/go-python/gopy v0.4.0 github.com/go-redis/redis/v8 v8.11.4 github.com/golang/protobuf v1.5.2 - github.com/google/uuid v1.2.0 + github.com/google/uuid v1.3.0 github.com/mattn/go-sqlite3 v1.14.12 github.com/spaolacci/murmur3 v1.1.0 github.com/stretchr/testify v1.7.0 - google.golang.org/grpc v1.44.0 - google.golang.org/protobuf v1.27.1 + google.golang.org/grpc v1.45.0 + google.golang.org/protobuf v1.28.0 ) require ( @@ -23,28 +23,26 @@ require ( github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/goccy/go-json v0.7.10 // indirect + github.com/goccy/go-json v0.9.6 // indirect github.com/golang/snappy v0.0.4 // indirect - github.com/gonuts/commander v0.1.0 // indirect - github.com/gonuts/flag v0.1.0 // indirect - github.com/google/flatbuffers v2.0.5+incompatible // indirect - github.com/klauspost/asmfmt v1.3.1 // indirect + github.com/google/flatbuffers v2.0.6+incompatible // indirect + github.com/klauspost/asmfmt v1.3.2 // indirect github.com/klauspost/compress v1.15.1 // indirect - github.com/klauspost/cpuid/v2 v2.0.9 // indirect + github.com/klauspost/cpuid/v2 v2.0.12 // indirect github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 // indirect github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 // indirect - github.com/pierrec/lz4/v4 v4.1.12 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/pierrec/lz4/v4 v4.1.14 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/zeebo/xxh3 v1.0.1 // indirect - golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 // indirect + github.com/zeebo/xxh3 v1.0.2 // indirect + golang.org/x/exp v0.0.0-20220407100705-7b9b53b0aca4 // indirect golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 // indirect - golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd // indirect - golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 // indirect + golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3 // indirect + golang.org/x/sys v0.0.0-20220406163625-3f8b81556e12 // indirect golang.org/x/text v0.3.7 // indirect golang.org/x/tools v0.1.10 // indirect golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 // indirect - google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 // indirect + google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c // indirect ) diff --git a/go.sum b/go.sum index 0ee1c0b9c5..8d9b3bd7c5 100644 --- a/go.sum +++ b/go.sum @@ -115,8 +115,9 @@ github.com/go-redis/redis/v8 v8.11.4/go.mod h1:2Z2wHZXdQpCDXEGzqMockDpNyYvi2l4Px github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20210107165309-348f09dbbbc0/go.mod h1:fyg7847qk6SyHyPtNmDHnmrv/HOrqktSC+C9fM+CJOE= -github.com/goccy/go-json v0.7.10 h1:ulhbuNe1JqE68nMRXXTJRrUu0uhouf0VevLINxQq4Ec= github.com/goccy/go-json v0.7.10/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= +github.com/goccy/go-json v0.9.6 h1:5/4CtRQdtsX0sal8fdVhTaiMN01Ri8BExZZ8iRmHQ6E= +github.com/goccy/go-json v0.9.6/go.mod h1:6MelG93GURQebXPDq3khkgXZkazVtN9CRI+MGFi0w8I= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= @@ -145,14 +146,13 @@ github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= -github.com/gonuts/commander v0.1.0 h1:EcDTiVw9oAVORFjQOEOuHQqcl6OXMyTgELocTq6zJ0I= github.com/gonuts/commander v0.1.0/go.mod h1:qkb5mSlcWodYgo7vs8ulLnXhfinhZsZcm6+H/z1JjgY= -github.com/gonuts/flag v0.1.0 h1:fqMv/MZ+oNGu0i9gp0/IQ/ZaPIDoAZBOBaJoV7viCWM= github.com/gonuts/flag v0.1.0/go.mod h1:ZTmTGtrSPejTo/SRNhCqwLTmiAgyBdCkLYhHrAoBdz4= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= -github.com/google/flatbuffers v2.0.5+incompatible h1:ANsW0idDAXIY+mNHzIHxWRfabV2x5LUEEIIWcwsYgB8= github.com/google/flatbuffers v2.0.5+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= +github.com/google/flatbuffers v2.0.6+incompatible h1:XHFReMv7nFFusa+CEokzWbzaYocKXI6C7hdU5Kgh9Lw= +github.com/google/flatbuffers v2.0.6+incompatible/go.mod h1:1AeVuKshWv4vARoZatz6mlQ0JxURH0Kv5+zNeJKJCa8= github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= @@ -168,6 +168,8 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+ github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs= github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY= github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= @@ -212,17 +214,20 @@ github.com/jung-kurt/gofpdf v1.0.0/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+ github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/asmfmt v1.3.1 h1:7xZi1N7s9gTLbqiM8KUv8TLyysavbTRGBT5/ly0bRtw= github.com/klauspost/asmfmt v1.3.1/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= +github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= +github.com/klauspost/asmfmt v1.3.2/go.mod h1:AG8TuvYojzulgDAMCnYn50l/5QV3Bs/tp6j0HLHbNSE= github.com/klauspost/compress v1.14.2/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.1 h1:y9FcTHGyrebwfP0ZZqFiaxTaiDnUrGkJkI+f583BL1A= github.com/klauspost/compress v1.15.1/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.0.12 h1:p9dKCg8i4gmOxtv35DvrYoWqYzQrvEVdjQ762Y0OqZE= +github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515/go.mod h1:+0opPa2QZZtGFBFZlji/RkVcI2GknAs/DXo4wKdlNEc= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.2.1 h1:Fmg33tUaq4/8ym9TJN1x7sLJnHVwhP33CNkpYV/7rwI= +github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= @@ -294,11 +299,11 @@ github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= -github.com/pierrec/lz4/v4 v4.1.12 h1:44l88ehTZAUGW4VlO1QC4zkilL99M6Y9MXNwEs0uzP8= github.com/pierrec/lz4/v4 v4.1.12/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pierrec/lz4/v4 v4.1.14 h1:+fL8AQEZtz/ijeNnpduH0bROTu0O3NZAlPjQxGn8LwE= +github.com/pierrec/lz4/v4 v4.1.14/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= @@ -362,8 +367,11 @@ github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtX github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -github.com/zeebo/xxh3 v1.0.1 h1:FMSRIbkrLikb/0hZxmltpg84VkqDAT5M8ufXynuhXsI= +github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= +github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.0.1/go.mod h1:8VHV24/3AZLn3b6Mlp/KuC33LWH687Wq6EnziEB+rsA= +github.com/zeebo/xxh3 v1.0.2 h1:xZmwmqxHZA8AI603jOQ0tMqmBr9lPeFwGg6d+xy9DC0= +github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg= go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk= @@ -402,8 +410,9 @@ golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190731235908-ec7cb31e5a56/go.mod h1:JhuoJpWY28nO4Vef9tZUw9qufEGTyX1+7lmHxV5q5G4= golang.org/x/exp v0.0.0-20191002040644-a1355ae1e2c3/go.mod h1:NOZ3BPKG0ec/BKJQgnvsSFpcKLM5xXVWnvZS97DWHgE= -golang.org/x/exp v0.0.0-20211216164055-b2b84827b756 h1:/5Bs7sWi0i3rOVO5KnM55OwugpsD4bRW1zywKoZjbkI= golang.org/x/exp v0.0.0-20211216164055-b2b84827b756/go.mod h1:b9TAUYHmRtqA6klRHApnXMnj+OyLce4yF5cZCUbk2ps= +golang.org/x/exp v0.0.0-20220407100705-7b9b53b0aca4 h1:K3x+yU+fbot38x5bQbU2QqUAVyYLEktdNH2GxZLnM3U= +golang.org/x/exp v0.0.0-20220407100705-7b9b53b0aca4/go.mod h1:lgLbSvA5ygNOMpwM/9anMpWVlVJ7Z+cHWq/eFuinpGE= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= golang.org/x/image v0.0.0-20190802002840-cff245a6509b/go.mod h1:FeLwcggjj3mMvU+oOTbSwawSJRM1uh48EjtB4UJZlP0= @@ -427,7 +436,8 @@ golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzB golang.org/x/mod v0.1.1-0.20191209134235-331c550502dd/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= -golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57 h1:LQmS1nU0twXLA96Kt7U9qtHJEbBk3z6Q0V4UXjZkpr4= +golang.org/x/mod v0.5.1/go.mod h1:5OXOZSfqPIIbmVBIIKWRFfZjPR0E5r58TLhUjH0a2Ro= +golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20211013180041-c96bc1413d57/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3 h1:kQgndtyPBW/JIYERgdxfwMYh3AVStj88WQTlNDi2a+o= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= @@ -454,8 +464,9 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd h1:O7DYs+zxREGLKzKoMQrtrEacpb0ZVXA5rIwylE2Xchk= golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= +golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3 h1:EN5+DfgmRMvRUrMGERW2gQl3Vc+Z7ZMnI/xdEpPSf0c= +golang.org/x/net v0.0.0-20220407224826-aac1ed45d8e3/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -499,8 +510,9 @@ golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27 h1:XDXtA5hveEEV8JB2l7nhMTp3t3cHp9ZpwcdjqyEWLlo= golang.org/x/sys v0.0.0-20220128215802-99c3d69c2c27/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220406163625-3f8b81556e12 h1:QyVthZKMsyaQwBTJE04jdNN0Pp5Fn9Qga0mrgxyERQM= +golang.org/x/sys v0.0.0-20220406163625-3f8b81556e12/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -534,7 +546,6 @@ golang.org/x/tools v0.0.0-20200117012304-6edc0a871e69/go.mod h1:TB2adYChydJhpapK golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= golang.org/x/tools v0.1.8-0.20211029000441-d6a9af8af023/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= -golang.org/x/tools v0.1.9 h1:j9KsMiaP1c3B0OTQGth0/k+miLGTgLsAFUCrF2vLcF8= golang.org/x/tools v0.1.9/go.mod h1:nABZi5QlRsZVlzPpHl034qft6wpY4eDcsTt5AaioBiU= golang.org/x/tools v0.1.10 h1:QjFRCZxdOhBJ/UNgnBZLbNV13DlbnK0quyivTnXJM20= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= @@ -561,8 +572,9 @@ google.golang.org/genproto v0.0.0-20190530194941-fb225487d101/go.mod h1:z3L6/3dT google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= -google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350 h1:YxHp5zqIcAShDEvRr5/0rVESVS+njYF68PSdazrNLJo= google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac h1:qSNTkEN+L2mvWcLgJOR+8bdHX9rN/IdU3A1Ghpfb1Rg= +google.golang.org/genproto v0.0.0-20220407144326-9054f6ed7bac/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.0/go.mod h1:chYK+tFQF0nDUGJgXMSgLCQk3phJEuONr2DCgLDdAQM= @@ -577,8 +589,9 @@ google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8 google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= -google.golang.org/grpc v1.44.0 h1:weqSxi/TMs1SqFRMHCtBgXRs8k3X39QIDEZ0pRcttUg= google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/grpc v1.45.0 h1:NEpgUqV3Z+ZjkqMsxMg11IaDrXY4RY6CQukSGK0uI1M= +google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -590,12 +603,14 @@ google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpAD google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.27.1 h1:SnqbnDw1V7RiZcXPx5MEeqPv2s79L9i7BJUlG/+RurQ= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= +google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/cheggaaa/pb.v1 v1.0.25/go.mod h1:V/YB90LKu/1FcN3WVnfiiE5oMCibMjukxqG/qStrOgw= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/go/cmd/server/logging/feature_repo/__init__.py b/go/cmd/server/logging/feature_repo/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/go/cmd/server/logging/feature_repo/driver_stats.parquet b/go/cmd/server/logging/feature_repo/driver_stats.parquet new file mode 100644 index 0000000000..a1e196df26 Binary files /dev/null and b/go/cmd/server/logging/feature_repo/driver_stats.parquet differ diff --git a/go/cmd/server/logging/feature_repo/example.py b/go/cmd/server/logging/feature_repo/example.py new file mode 100644 index 0000000000..f3ca612308 --- /dev/null +++ b/go/cmd/server/logging/feature_repo/example.py @@ -0,0 +1,40 @@ +# This is an example feature definition file + +from google.protobuf.duration_pb2 import Duration + +from feast import Entity, Feature, FeatureView, FileSource, ValueType, FeatureService + +# Read data from parquet files. Parquet is convenient for local development mode. For +# production, you can use your favorite DWH, such as BigQuery. See Feast documentation +# for more info. +driver_hourly_stats = FileSource( + path="driver_stats.parquet", + event_timestamp_column="event_timestamp", + created_timestamp_column="created", +) + +# Define an entity for the driver. You can think of entity as a primary key used to +# fetch features. +driver = Entity(name="driver_id", value_type=ValueType.INT64, description="driver id",) + +# Our parquet files contain sample data that includes a driver_id column, timestamps and +# three feature column. Here we define a Feature View that will allow us to serve this +# data to our model online. +driver_hourly_stats_view = FeatureView( + name="driver_hourly_stats", + entities=["driver_id"], + ttl=Duration(seconds=86400 * 365 * 10), + features=[ + Feature(name="conv_rate", dtype=ValueType.FLOAT), + Feature(name="acc_rate", dtype=ValueType.FLOAT), + Feature(name="avg_daily_trips", dtype=ValueType.INT64), + ], + online=True, + batch_source=driver_hourly_stats, + tags={}, +) + +driver_stats_fs = FeatureService( + name="test_service", + features=[driver_hourly_stats_view] +) \ No newline at end of file diff --git a/go/cmd/server/logging/feature_repo/feature_store.yaml b/go/cmd/server/logging/feature_repo/feature_store.yaml new file mode 100644 index 0000000000..3b48f43287 --- /dev/null +++ b/go/cmd/server/logging/feature_repo/feature_store.yaml @@ -0,0 +1,5 @@ +project: feature_repo +registry: data/registry.db +provider: local +online_store: + path: data/online_store.db \ No newline at end of file diff --git a/go/cmd/server/logging/filelogstorage.go b/go/cmd/server/logging/filelogstorage.go new file mode 100644 index 0000000000..19e9569e69 --- /dev/null +++ b/go/cmd/server/logging/filelogstorage.go @@ -0,0 +1,86 @@ +package logging + +import ( + "errors" + "fmt" + "io" + "os" + "path/filepath" + + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/apache/arrow/go/v8/parquet" + "github.com/apache/arrow/go/v8/parquet/pqarrow" + "github.com/feast-dev/feast/go/internal/feast/registry" +) + +type FileLogStorage struct { + // Feast project name + project string + path string +} + +func GetFileConfig(config *registry.RepoConfig) (*OfflineLogStoreConfig, error) { + fileConfig := OfflineLogStoreConfig{ + storeType: "file", + } + if onlineStorePath, ok := config.OfflineStore["path"]; ok { + path, success := onlineStorePath.(string) + if !success { + return &fileConfig, fmt.Errorf("path, %s, cannot be converted to string", path) + } + fileConfig.path = path + } else { + return nil, errors.New("need path for file log storage") + } + return &fileConfig, nil +} + +// This offline store is currently only used for testing. It will be instantiated during go unit tests to log to file +// and the parquet files will be cleaned up after the test is run. +func NewFileOfflineStore(project string, offlineStoreConfig *OfflineLogStoreConfig) (*FileLogStorage, error) { + store := FileLogStorage{project: project} + var absPath string + var err error + // TODO(kevjumba) remove this default catch. + if offlineStoreConfig.path != "" { + absPath, err = filepath.Abs(offlineStoreConfig.path) + } else { + return nil, errors.New("need path for file log storage") + } + if err != nil { + return nil, err + } + store.path = absPath + return &store, nil +} + +func openLogFile(absPath string) (*os.File, error) { + var _, err = os.Stat(absPath) + + // create file if not exists + if os.IsNotExist(err) { + var file, err = os.Create(absPath) + if err != nil { + return nil, err + } + return file, nil + } else { + return nil, fmt.Errorf("path %s already exists", absPath) + } +} + +func (f *FileLogStorage) FlushToStorage(tbl array.Table) error { + w, err := openLogFile(f.path) + var writer io.Writer = w + if err != nil { + return err + } + props := parquet.NewWriterProperties(parquet.WithDictionaryDefault(false)) + arrProps := pqarrow.DefaultWriterProps() + err = pqarrow.WriteTable(tbl, writer, 100, props, arrProps) + if err != nil { + return err + } + return nil + +} diff --git a/go/cmd/server/logging/filelogstorage_test.go b/go/cmd/server/logging/filelogstorage_test.go new file mode 100644 index 0000000000..1da7dd38ad --- /dev/null +++ b/go/cmd/server/logging/filelogstorage_test.go @@ -0,0 +1,70 @@ +package logging + +import ( + "context" + "path/filepath" + + "testing" + + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/apache/arrow/go/v8/parquet/file" + "github.com/apache/arrow/go/v8/parquet/pqarrow" + "github.com/feast-dev/feast/go/internal/test" + "github.com/stretchr/testify/assert" +) + +func TestFlushToStorage(t *testing.T) { + ctx := context.Background() + table, expectedSchema, expectedColumns, err := GetTestArrowTableAndExpectedResults() + defer table.Release() + assert.Nil(t, err) + offlineStoreConfig := OfflineLogStoreConfig{ + storeType: "file", + path: "./log.parquet", + } + fileStore, err := NewFileOfflineStore("test", &offlineStoreConfig) + assert.Nil(t, err) + err = fileStore.FlushToStorage(array.Table(table)) + assert.Nil(t, err) + logPath, err := filepath.Abs(offlineStoreConfig.path) + assert.Nil(t, err) + pf, err := file.OpenParquetFile(logPath, false) + assert.Nil(t, err) + + reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + assert.Nil(t, err) + + tbl, err := reader.ReadTable(ctx) + assert.Nil(t, err) + tr := array.NewTableReader(tbl, -1) + defer tbl.Release() + + defer tr.Release() + for tr.Next() { + rec := tr.Record() + assert.NotNil(t, rec) + for _, field := range rec.Schema().Fields() { + assert.Contains(t, expectedSchema, field.Name) + assert.Equal(t, field.Type, expectedSchema[field.Name]) + } + values, err := test.GetProtoFromRecord(rec) + + assert.Nil(t, err) + for name, val := range values { + if name == "RequestId" { + // Ensure there are request ids in record. + assert.Greater(t, len(val.Val), 0) + } else { + assert.Equal(t, len(val.Val), len(expectedColumns[name].Val)) + for idx, featureVal := range val.Val { + assert.Equal(t, featureVal.Val, expectedColumns[name].Val[idx].Val) + } + } + } + } + + err = test.CleanUpFile(logPath) + assert.Nil(t, err) + +} diff --git a/go/cmd/server/logging/logging.go b/go/cmd/server/logging/logging.go new file mode 100644 index 0000000000..010644709a --- /dev/null +++ b/go/cmd/server/logging/logging.go @@ -0,0 +1,392 @@ +package logging + +import ( + "errors" + "fmt" + "log" + "time" + + "github.com/apache/arrow/go/v8/arrow" + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/feast-dev/feast/go/internal/feast" + "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/feast-dev/feast/go/protos/feast/types" + gotypes "github.com/feast-dev/feast/go/types" + "google.golang.org/protobuf/types/known/timestamppb" +) + +const DEFAULT_LOG_FLUSH_INTERVAL = 100 * time.Millisecond +const DEFAULT_LOG_INSERT_TIMEOUT = 20 * time.Millisecond + +type Log struct { + // Example: val{int64_val: 5017}, val{int64_val: 1003} + EntityValue []*types.Value + + FeatureValues []*types.Value + FeatureStatuses []serving.FieldStatus + EventTimestamps []*timestamppb.Timestamp + RequestContext map[string]*types.Value + RequestId string +} + +type MemoryBuffer struct { + featureService *model.FeatureService + logs []*Log +} + +type LoggingService struct { + memoryBuffer *MemoryBuffer + logChannel chan *Log + fs *feast.FeatureStore + offlineLogStorage OfflineLogStorage + logInsertTTl time.Duration + logFlushInterval time.Duration +} + +func NewLoggingService(fs *feast.FeatureStore, logChannelCapacity int, featureServiceName string, enableLogProcessing bool) (*LoggingService, error) { + var featureService *model.FeatureService = nil + var err error + if fs != nil { + featureService, err = fs.GetFeatureService(featureServiceName) + if err != nil { + return nil, err + } + + } + + loggingService := &LoggingService{ + logChannel: make(chan *Log, logChannelCapacity), + memoryBuffer: &MemoryBuffer{ + logs: make([]*Log, 0), + featureService: featureService, + }, + fs: fs, + logInsertTTl: DEFAULT_LOG_INSERT_TIMEOUT, + logFlushInterval: DEFAULT_LOG_FLUSH_INTERVAL, + } + + if fs != nil { + offlineLogStorage, err := NewOfflineStore(fs.GetRepoConfig()) + loggingService.offlineLogStorage = offlineLogStorage + if err != nil { + return nil, err + } + } + + // Start goroutine to process logs + if enableLogProcessing { + go loggingService.processLogs() + + } + return loggingService, nil +} + +func (s *LoggingService) EmitLog(l *Log) error { + select { + case s.logChannel <- l: + return nil + case <-time.After(s.logInsertTTl): + return fmt.Errorf("could not add to log channel with capacity %d. Operation timed out. Current log channel length is %d", cap(s.logChannel), len(s.logChannel)) + } +} + +func (s *LoggingService) processLogs() { + // start a periodic flush + // TODO(kevjumba): set param so users can configure flushing duration + ticker := time.NewTicker(s.logFlushInterval) + defer ticker.Stop() + + for { + s.PerformPeriodicAppendToMemoryBufferAndLogFlush(ticker) + } +} + +// Select that either ingests new logs that are added to the logging channel, one at a time to add +// to the in-memory buffer or flushes all of them synchronously to the OfflineStorage on a time interval. +func (s *LoggingService) PerformPeriodicAppendToMemoryBufferAndLogFlush(t *time.Ticker) { + select { + case t := <-t.C: + s.flushLogsToOfflineStorage(t) + case new_log := <-s.logChannel: + log.Printf("Adding %s to memory.\n", new_log.FeatureValues) + s.memoryBuffer.logs = append(s.memoryBuffer.logs, new_log) + } +} + +// Acquires the logging schema from the feature service, converts the memory buffer array of rows of logs and flushes +// them to the offline storage. +func (s *LoggingService) flushLogsToOfflineStorage(t time.Time) error { + offlineStoreType, ok := getOfflineStoreType(s.fs.GetRepoConfig().OfflineStore) + if !ok { + return fmt.Errorf("could not get offline storage type for config: %s", s.fs.GetRepoConfig().OfflineStore) + } + if offlineStoreType == "file" { + entityMap, featureViews, odfvs, err := s.GetFcos() + if err != nil { + return err + } + schema, err := GetSchemaFromFeatureService(s.memoryBuffer.featureService, entityMap, featureViews, odfvs) + if err != nil { + return err + } + table, err := ConvertMemoryBufferToArrowTable(s.memoryBuffer, schema) + if err != nil { + return err + } + s.offlineLogStorage.FlushToStorage(table) + if err != nil { + return err + } + s.memoryBuffer.logs = s.memoryBuffer.logs[:0] + } else { + // Currently don't support any other offline flushing. + return errors.New("currently only file type is supported for offline log storage") + } + return nil +} + +// Takes memory buffer of logs in array row and converts them to columnar with generated fcoschema generated by GetFcoSchema +// and writes them to arrow table. +// Returns arrow table that contains all of the logs in columnar format. +func ConvertMemoryBufferToArrowTable(memoryBuffer *MemoryBuffer, fcoSchema *Schema) (array.Table, error) { + arrowMemory := memory.NewGoAllocator() + + columnNameToProtoValueArray := make(map[string][]*types.Value) + columnNameToStatus := make(map[string][]int32) + columnNameToTimestamp := make(map[string][]int64) + entityNameToEntityValues := make(map[string][]*types.Value) + + strBuilder := array.NewStringBuilder(arrowMemory) + + for _, l := range memoryBuffer.logs { + // EntityTypes maps an entity name to the specific type and also which index in the entityValues array it is + // e.g if an Entity Key is {driver_id, customer_id}, then the driver_id entitytype would be dtype=int64, index=0. + // It's in the order of the entities as given by the schema. + for idx, joinKey := range fcoSchema.Entities { + if _, ok := entityNameToEntityValues[joinKey]; !ok { + entityNameToEntityValues[joinKey] = make([]*types.Value, 0) + } + entityNameToEntityValues[joinKey] = append(entityNameToEntityValues[joinKey], l.EntityValue[idx]) + } + + // Contains both fv and odfv feature value types => they are processed in order of how the appear in the featureService + for idx, featureName := range fcoSchema.Features { + // populate the proto value arrays with values from memory buffer in separate columns one for each feature name + if _, ok := columnNameToProtoValueArray[featureName]; !ok { + columnNameToProtoValueArray[featureName] = make([]*types.Value, 0) + columnNameToStatus[featureName] = make([]int32, 0) + columnNameToTimestamp[featureName] = make([]int64, 0) + } + columnNameToProtoValueArray[featureName] = append(columnNameToProtoValueArray[featureName], l.FeatureValues[idx]) + columnNameToStatus[featureName] = append(columnNameToStatus[featureName], int32(l.FeatureStatuses[idx])) + columnNameToTimestamp[featureName] = append(columnNameToTimestamp[featureName], l.EventTimestamps[idx].AsTime().UnixNano()/int64(time.Millisecond)) + } + strBuilder.Append(l.RequestId) + } + + fields := make([]arrow.Field, 0) + columns := make([]array.Interface, 0) + for _, entityName := range fcoSchema.Entities { + protoArr := entityNameToEntityValues[entityName] + if len(protoArr) == 0 { + break + } + valArrowArray, err := gotypes.ProtoValuesToArrowArray(protoArr, arrowMemory, len(columnNameToProtoValueArray)) + if err != nil { + return nil, err + } + arrowType, err := gotypes.ValueTypeEnumToArrowType(fcoSchema.EntityTypes[entityName]) + if err != nil { + return nil, err + } + fields = append(fields, arrow.Field{ + Name: entityName, + Type: arrowType, + }) + columns = append(columns, valArrowArray) + } + + for _, featureName := range fcoSchema.Features { + + protoArr := columnNameToProtoValueArray[featureName] + if len(protoArr) == 0 { + break + } + arrowArray, err := gotypes.ProtoValuesToArrowArray(protoArr, arrowMemory, len(columnNameToProtoValueArray)) + if err != nil { + return nil, err + } + + arrowType, err := gotypes.ValueTypeEnumToArrowType(fcoSchema.FeaturesTypes[featureName]) + + if err != nil { + return nil, err + } + fields = append(fields, arrow.Field{ + Name: featureName, + Type: arrowType, + }) + columns = append(columns, arrowArray) + } + fields = append(fields, arrow.Field{ + Name: "RequestId", + Type: &arrow.StringType{}, + }) + + columns = append(columns, strBuilder.NewArray()) + schema := arrow.NewSchema( + fields, + nil, + ) + + result := array.Record(array.NewRecord(schema, columns, int64(len(memoryBuffer.logs)))) + + tbl := array.NewTableFromRecords(schema, []array.Record{result}) + return array.Table(tbl), nil +} + +type Schema struct { + Entities []string + Features []string + EntityTypes map[string]types.ValueType_Enum + FeaturesTypes map[string]types.ValueType_Enum +} + +func GetSchemaFromFeatureService(featureService *model.FeatureService, entityMap map[string]*model.Entity, featureViews []*model.FeatureView, onDemandFeatureViews []*model.OnDemandFeatureView) (*Schema, error) { + fvs := make(map[string]*model.FeatureView) + odFvs := make(map[string]*model.OnDemandFeatureView) + + joinKeys := make([]string, 0) + // All joinkeys in the featureService are put in here + joinKeysSet := make(map[string]interface{}) + entityJoinKeyToType := make(map[string]types.ValueType_Enum) + var entities []string + for _, featureView := range featureViews { + fvs[featureView.Base.Name] = featureView + entities = featureView.Entities + } + + for _, onDemandFeatureView := range onDemandFeatureViews { + odFvs[onDemandFeatureView.Base.Name] = onDemandFeatureView + } + + allFeatureTypes := make(map[string]types.ValueType_Enum) + features := make([]string, 0) + for _, featureProjection := range featureService.Projections { + // Create copies of FeatureView that may contains the same *FeatureView but + // each differentiated by a *FeatureViewProjection + featureViewName := featureProjection.Name + if fv, ok := fvs[featureViewName]; ok { + for _, f := range featureProjection.Features { + features = append(features, GetFullFeatureName(featureViewName, f.Name)) + allFeatureTypes[GetFullFeatureName(featureViewName, f.Name)] = f.Dtype + } + for _, entityName := range fv.Entities { + entity := entityMap[entityName] + if joinKeyAlias, ok := featureProjection.JoinKeyMap[entity.JoinKey]; ok { + joinKeysSet[joinKeyAlias] = nil + } else { + joinKeysSet[entity.JoinKey] = nil + } + } + } else if _, ok := odFvs[featureViewName]; ok { + for _, f := range featureProjection.Features { + // TODO(kevjumba) check in test here. + features = append(features, GetFullFeatureName(featureViewName, f.Name)) + allFeatureTypes[GetFullFeatureName(featureViewName, f.Name)] = f.Dtype + } + } else { + return nil, fmt.Errorf("no such feature view found in feature service %s", featureViewName) + } + } + + // Only get entities in the current feature service. + for _, entity := range entities { + if _, ok := joinKeysSet[entity]; ok { + joinKeys = append(joinKeys, entityMap[entity].JoinKey) + entityJoinKeyToType[entityMap[entity].JoinKey] = entityMap[entity].ValueType + } + } + + schema := &Schema{ + Entities: joinKeys, + Features: features, + EntityTypes: entityJoinKeyToType, + FeaturesTypes: allFeatureTypes, + } + return schema, nil +} + +func GetFullFeatureName(featureViewName string, featureName string) string { + return fmt.Sprintf("%s__%s", featureViewName, featureName) +} + +func (s *LoggingService) GetFcos() (map[string]*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView, error) { + odfvs, err := s.fs.ListOnDemandFeatureViews() + if err != nil { + return nil, nil, nil, err + } + fvs, err := s.fs.ListFeatureViews() + if err != nil { + return nil, nil, nil, err + } + entities, err := s.fs.ListEntities(true) + if err != nil { + return nil, nil, nil, err + } + entityMap := make(map[string]*model.Entity) + for _, entity := range entities { + entityMap[entity.Name] = entity + } + return entityMap, fvs, odfvs, nil +} + +func (l *LoggingService) GenerateLogs(featureService *model.FeatureService, joinKeyToEntityValues map[string][]*types.Value, features []*serving.GetOnlineFeaturesResponse_FeatureVector, requestData map[string]*types.RepeatedValue, requestId string) error { + if len(features) <= 0 { + return nil + } + + entitySet, featureViews, odfvs, err := l.GetFcos() + if err != nil { + return err + } + schema, err := GetSchemaFromFeatureService(featureService, entitySet, featureViews, odfvs) + + if err != nil { + return err + } + + numFeatures := len(schema.Features) + // Should be equivalent to how many entities there are(each feature row has (entity) number of features) + numRows := len(features[0].Values) + + for row_idx := 0; row_idx < numRows; row_idx++ { + featureValueLogRow := make([]*types.Value, numFeatures) + featureStatusLogRow := make([]serving.FieldStatus, numFeatures) + eventTimestampLogRow := make([]*timestamppb.Timestamp, numFeatures) + for idx := 0; idx < len(features); idx++ { + featureValueLogRow[idx] = features[idx].Values[row_idx] + featureStatusLogRow[idx] = features[idx].Statuses[row_idx] + eventTimestampLogRow[idx] = features[idx].EventTimestamps[row_idx] + } + valuesPerEntityRow := make([]*types.Value, 0) + // ensure that the entity values are in the order that the schema defines which is the order that ListEntities returns the entities + for _, joinKey := range schema.Entities { + valuesPerEntityRow = append(valuesPerEntityRow, joinKeyToEntityValues[joinKey][row_idx]) + } + newLog := Log{ + EntityValue: valuesPerEntityRow, + FeatureValues: featureValueLogRow, + FeatureStatuses: featureStatusLogRow, + EventTimestamps: eventTimestampLogRow, + RequestId: requestId, + } + err := l.EmitLog(&newLog) + if err != nil { + return err + } + } + return nil +} diff --git a/go/cmd/server/logging/logging_test.go b/go/cmd/server/logging/logging_test.go new file mode 100644 index 0000000000..68da0bf498 --- /dev/null +++ b/go/cmd/server/logging/logging_test.go @@ -0,0 +1,402 @@ +package logging + +import ( + "math/rand" + "reflect" + "testing" + "time" + + "github.com/apache/arrow/go/v8/arrow" + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/internal/test" + "github.com/feast-dev/feast/go/protos/feast/serving" + "github.com/feast-dev/feast/go/protos/feast/types" + gotypes "github.com/feast-dev/feast/go/types" + "github.com/stretchr/testify/assert" + "google.golang.org/protobuf/types/known/timestamppb" +) + +func TestLoggingChannelTimeout(t *testing.T) { + // Pregenerated using `feast init`. + loggingService, err := NewLoggingService(nil, 1, "", false) + assert.Nil(t, err) + assert.Empty(t, loggingService.memoryBuffer.logs) + ts := timestamppb.New(time.Now()) + newLog := Log{ + FeatureStatuses: []serving.FieldStatus{serving.FieldStatus_PRESENT}, + EventTimestamps: []*timestamppb.Timestamp{ts, ts}, + } + loggingService.EmitLog(&newLog) + newTs := timestamppb.New(time.Now()) + + newLog2 := Log{ + FeatureStatuses: []serving.FieldStatus{serving.FieldStatus_PRESENT}, + EventTimestamps: []*timestamppb.Timestamp{newTs, newTs}, + } + err = loggingService.EmitLog(&newLog2) + // The channel times out and doesn't hang. + assert.NotNil(t, err) +} + +func TestSchemaTypeRetrieval(t *testing.T) { + featureService, entities, featureViews, odfvs := InitializeFeatureRepoVariablesForTest() + entityMap := make(map[string]*model.Entity) + expectedEntityNames := make([]string, 0) + expectedFeatureNames := make([]string, 0) + for _, entity := range entities { + entityMap[entity.Name] = entity + expectedEntityNames = append(expectedEntityNames, entity.Name) + } + for _, featureView := range featureViews { + for _, f := range featureView.Base.Features { + expectedFeatureNames = append(expectedFeatureNames, GetFullFeatureName(featureView.Base.Name, f.Name)) + } + } + for _, featureView := range odfvs { + for _, f := range featureView.Base.Features { + expectedFeatureNames = append(expectedFeatureNames, GetFullFeatureName(featureView.Base.Name, f.Name)) + } + } + + schema, err := GetSchemaFromFeatureService(featureService, entityMap, featureViews, odfvs) + assert.Nil(t, err) + + assert.Equal(t, expectedFeatureNames, schema.Features) + assert.Equal(t, expectedEntityNames, schema.Entities) + for _, entityName := range expectedEntityNames { + assert.Contains(t, schema.EntityTypes, entityName) + } + assert.True(t, reflect.DeepEqual(schema.EntityTypes["driver_id"], types.ValueType_INT64)) + + types := []types.ValueType_Enum{*types.ValueType_INT64.Enum(), *types.ValueType_FLOAT.Enum(), *types.ValueType_INT32.Enum(), *types.ValueType_DOUBLE.Enum(), *types.ValueType_INT32.Enum(), *types.ValueType_DOUBLE.Enum()} + for idx, featureName := range expectedFeatureNames { + assert.Contains(t, schema.FeaturesTypes, featureName) + assert.Equal(t, schema.FeaturesTypes[featureName], types[idx]) + } +} + +func TestSchemaRetrievalIgnoresEntitiesNotInFeatureService(t *testing.T) { + featureService, entities, featureViews, odfvs := InitializeFeatureRepoVariablesForTest() + //Remove entities in featureservice + for _, featureView := range featureViews { + featureView.Entities = []string{} + } + entityMap := make(map[string]*model.Entity) + for _, entity := range entities { + entityMap[entity.Name] = entity + } + schema, err := GetSchemaFromFeatureService(featureService, entityMap, featureViews, odfvs) + assert.Nil(t, err) + assert.Empty(t, schema.EntityTypes) +} + +func TestSchemaUsesOrderInFeatureService(t *testing.T) { + featureService, entities, featureViews, odfvs := InitializeFeatureRepoVariablesForTest() + expectedEntityNames := make([]string, 0) + expectedFeatureNames := make([]string, 0) + entityMap := make(map[string]*model.Entity) + for _, entity := range entities { + entityMap[entity.Name] = entity + } + for _, entity := range entities { + entityMap[entity.Name] = entity + expectedEntityNames = append(expectedEntityNames, entity.Name) + } + // Source of truth for order of featureNames + for _, featureView := range featureViews { + for _, f := range featureView.Base.Features { + expectedFeatureNames = append(expectedFeatureNames, GetFullFeatureName(featureView.Base.Name, f.Name)) + } + } + for _, featureView := range odfvs { + for _, f := range featureView.Base.Features { + expectedFeatureNames = append(expectedFeatureNames, GetFullFeatureName(featureView.Base.Name, f.Name)) + } + } + + rand.Seed(time.Now().UnixNano()) + // Shuffle the featureNames in incorrect order + for _, featureView := range featureViews { + rand.Shuffle(len(featureView.Base.Features), func(i, j int) { + featureView.Base.Features[i], featureView.Base.Features[j] = featureView.Base.Features[j], featureView.Base.Features[i] + }) + } + for _, featureView := range odfvs { + rand.Shuffle(len(featureView.Base.Features), func(i, j int) { + featureView.Base.Features[i], featureView.Base.Features[j] = featureView.Base.Features[j], featureView.Base.Features[i] + }) + } + + schema, err := GetSchemaFromFeatureService(featureService, entityMap, featureViews, odfvs) + assert.Nil(t, err) + + // Ensure the same results + assert.Equal(t, expectedFeatureNames, schema.Features) + assert.Equal(t, expectedEntityNames, schema.Entities) + for _, entityName := range expectedEntityNames { + assert.Contains(t, schema.EntityTypes, entityName) + } + assert.True(t, reflect.DeepEqual(schema.EntityTypes["driver_id"], types.ValueType_INT64)) + + types := []types.ValueType_Enum{*types.ValueType_INT64.Enum(), *types.ValueType_FLOAT.Enum(), *types.ValueType_INT32.Enum(), *types.ValueType_DOUBLE.Enum(), *types.ValueType_INT32.Enum(), *types.ValueType_DOUBLE.Enum()} + for idx, featureName := range expectedFeatureNames { + assert.Contains(t, schema.FeaturesTypes, featureName) + assert.Equal(t, schema.FeaturesTypes[featureName], types[idx]) + } +} + +func TestSerializeToArrowTable(t *testing.T) { + table, expectedSchema, expectedColumns, err := GetTestArrowTableAndExpectedResults() + assert.Nil(t, err) + defer table.Release() + tr := array.NewTableReader(table, -1) + + defer tr.Release() + for tr.Next() { + rec := tr.Record() + assert.NotNil(t, rec) + for _, field := range rec.Schema().Fields() { + assert.Contains(t, expectedSchema, field.Name) + assert.Equal(t, field.Type, expectedSchema[field.Name]) + } + values, err := test.GetProtoFromRecord(rec) + + assert.Nil(t, err) + for name, val := range values { + if name == "RequestId" { + continue + } + assert.Equal(t, len(val.Val), len(expectedColumns[name].Val)) + for idx, featureVal := range val.Val { + assert.Equal(t, featureVal.Val, expectedColumns[name].Val[idx].Val) + } + } + } +} + +// Initialize all dummy featureservice, entities and featureviews/on demand featureviews for testing. +func InitializeFeatureRepoVariablesForTest() (*model.FeatureService, []*model.Entity, []*model.FeatureView, []*model.OnDemandFeatureView) { + f1 := test.CreateNewFeature( + "int64", + types.ValueType_INT64, + ) + f2 := test.CreateNewFeature( + "float32", + types.ValueType_FLOAT, + ) + projection1 := test.CreateNewFeatureViewProjection( + "featureView1", + "", + []*model.Feature{f1, f2}, + map[string]string{}, + ) + baseFeatureView1 := test.CreateBaseFeatureView( + "featureView1", + []*model.Feature{f1, f2}, + projection1, + ) + featureView1 := test.CreateFeatureView(baseFeatureView1, nil, []string{"driver_id"}) + entity1 := test.CreateNewEntity("driver_id", types.ValueType_INT64, "driver_id") + f3 := test.CreateNewFeature( + "int32", + types.ValueType_INT32, + ) + f4 := test.CreateNewFeature( + "double", + types.ValueType_DOUBLE, + ) + projection2 := test.CreateNewFeatureViewProjection( + "featureView2", + "", + []*model.Feature{f3, f4}, + map[string]string{}, + ) + baseFeatureView2 := test.CreateBaseFeatureView( + "featureView2", + []*model.Feature{f3, f4}, + projection2, + ) + featureView2 := test.CreateFeatureView(baseFeatureView2, nil, []string{"driver_id"}) + + f5 := test.CreateNewFeature( + "odfv_f1", + types.ValueType_INT32, + ) + f6 := test.CreateNewFeature( + "odfv_f2", + types.ValueType_DOUBLE, + ) + projection3 := test.CreateNewFeatureViewProjection( + "od_bf1", + "", + []*model.Feature{f5, f6}, + map[string]string{}, + ) + od_bf1 := test.CreateBaseFeatureView( + "od_bf1", + []*model.Feature{f5, f6}, + projection3, + ) + odfv := model.NewOnDemandFeatureViewFromBase(od_bf1) + featureService := test.CreateNewFeatureService( + "test_service", + "test_project", + nil, + nil, + []*model.FeatureViewProjection{projection1, projection2, projection3}, + ) + return featureService, []*model.Entity{entity1}, []*model.FeatureView{featureView1, featureView2}, []*model.OnDemandFeatureView{odfv} +} + +// Create dummy FeatureService, Entities, and FeatureViews add them to the logger and convert the logs to Arrow table. +// Returns arrow table, expected test schema, and expected columns. +func GetTestArrowTableAndExpectedResults() (array.Table, map[string]arrow.DataType, map[string]*types.RepeatedValue, error) { + featureService, entities, featureViews, odfvs := InitializeFeatureRepoVariablesForTest() + entityMap := make(map[string]*model.Entity) + for _, entity := range entities { + entityMap[entity.Name] = entity + } + schema, err := GetSchemaFromFeatureService(featureService, entityMap, featureViews, odfvs) + if err != nil { + return nil, nil, nil, err + } + + ts := timestamppb.New(time.Now()) + log1 := Log{ + EntityValue: []*types.Value{ + {Val: &types.Value_Int64Val{Int64Val: 1001}}, + }, + FeatureValues: []*types.Value{ + /* normal feature values */ + {Val: &types.Value_Int64Val{Int64Val: rand.Int63()}}, + {Val: &types.Value_FloatVal{FloatVal: rand.Float32()}}, + {Val: &types.Value_Int32Val{Int32Val: rand.Int31()}}, + {Val: &types.Value_DoubleVal{DoubleVal: rand.Float64()}}, + /* odfv values */ + {Val: &types.Value_Int32Val{Int32Val: rand.Int31()}}, + {Val: &types.Value_DoubleVal{DoubleVal: rand.Float64()}}, + }, + FeatureStatuses: []serving.FieldStatus{ + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + }, + EventTimestamps: []*timestamppb.Timestamp{ + ts, ts, ts, ts, ts, ts, + }, + } + log2 := Log{ + EntityValue: []*types.Value{ + {Val: &types.Value_Int64Val{Int64Val: 1003}}, + }, + FeatureValues: []*types.Value{ + /* normal feature values */ + {Val: &types.Value_Int64Val{Int64Val: rand.Int63()}}, + {Val: &types.Value_FloatVal{FloatVal: rand.Float32()}}, + {Val: &types.Value_Int32Val{Int32Val: rand.Int31()}}, + {Val: &types.Value_DoubleVal{DoubleVal: rand.Float64()}}, + /* odfv values */ + {Val: &types.Value_Int32Val{Int32Val: rand.Int31()}}, + {Val: &types.Value_DoubleVal{DoubleVal: rand.Float64()}}, + }, + FeatureStatuses: []serving.FieldStatus{ + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + serving.FieldStatus_PRESENT, + }, + EventTimestamps: []*timestamppb.Timestamp{ + ts, ts, ts, ts, ts, ts, + }, + } + + expectedSchema := make(map[string]arrow.DataType) + for joinKey, entityType := range schema.EntityTypes { + arrowType, err := gotypes.ValueTypeEnumToArrowType(entityType) + if err != nil { + return nil, nil, nil, err + } + expectedSchema[joinKey] = arrowType + } + expectedSchema["RequestId"] = arrow.BinaryTypes.String + for featureName, featureType := range schema.FeaturesTypes { + arrowType, err := gotypes.ValueTypeEnumToArrowType(featureType) + if err != nil { + return nil, nil, nil, err + } + expectedSchema[featureName] = arrowType + } + + expectedColumns := map[string]*types.RepeatedValue{ + "driver_id": { + Val: []*types.Value{ + log1.EntityValue[0], + log2.EntityValue[0]}, + }, + "featureView1__int64": { + Val: []*types.Value{ + log1.FeatureValues[0], + log2.FeatureValues[0]}, + }, + "featureView1__float32": { + Val: []*types.Value{ + log1.FeatureValues[1], + log2.FeatureValues[1]}, + }, + "featureView2__int32": { + Val: []*types.Value{ + log1.FeatureValues[2], + log2.FeatureValues[2]}, + }, + "featureView2__double": { + Val: []*types.Value{ + log1.FeatureValues[3], + log2.FeatureValues[3]}, + }, + "od_bf1__odfv_f1": { + Val: []*types.Value{ + log1.FeatureValues[4], + log2.FeatureValues[4]}, + }, + "od_bf1__odfv_f2": { + Val: []*types.Value{ + log1.FeatureValues[5], + log2.FeatureValues[5]}, + }, + } + loggingService, err := SetupLoggingServiceWithLogs([]*Log{&log1, &log2}) + if err != nil { + return nil, nil, nil, err + } + + table, err := ConvertMemoryBufferToArrowTable(loggingService.memoryBuffer, schema) + + if err != nil { + return nil, nil, nil, err + } + return table, expectedSchema, expectedColumns, nil +} + +func SetupLoggingServiceWithLogs(logs []*Log) (*LoggingService, error) { + loggingService, err := NewLoggingService(nil, len(logs), "", false) + if err != nil { + return nil, err + } + dummyTicker := time.NewTicker(10 * time.Second) + // stop the ticker so that the logs are not flushed to offline storage + dummyTicker.Stop() + for _, log := range logs { + loggingService.EmitLog(log) + } + // manually handle flushing logs + for i := 0; i < len(logs); i++ { + loggingService.PerformPeriodicAppendToMemoryBufferAndLogFlush(dummyTicker) + } + return loggingService, nil +} diff --git a/go/cmd/server/logging/offlinelogstorage.go b/go/cmd/server/logging/offlinelogstorage.go new file mode 100644 index 0000000000..1a0f414255 --- /dev/null +++ b/go/cmd/server/logging/offlinelogstorage.go @@ -0,0 +1,46 @@ +package logging + +import ( + "errors" + + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/feast-dev/feast/go/internal/feast/registry" +) + +type OfflineLogStoreConfig struct { + storeType string + project string + path string +} + +type OfflineLogStorage interface { + // Todo: Maybe we can add a must implement function that retrieves the correct config based on type + FlushToStorage(array.Table) error +} + +func getOfflineStoreType(offlineStoreConfig map[string]interface{}) (string, bool) { + if onlineStoreType, ok := offlineStoreConfig["storeType"]; !ok { + // Assume file for case of no specified. + return "", true + } else { + result, ok := onlineStoreType.(string) + return result, ok + } +} + +func NewOfflineStore(config *registry.RepoConfig) (OfflineLogStorage, error) { + offlineStoreType, _ := getOfflineStoreType(config.OfflineStore) + if offlineStoreType == "" { + // No offline store specified. + return nil, nil + } else if offlineStoreType == "file" { + fileConfig, err := GetFileConfig(config) + if err != nil { + return nil, err + } + offlineStore, err := NewFileOfflineStore(config.Project, fileConfig) + return offlineStore, err + } else { + return nil, errors.New("no offline storage besides file is currently supported") + } +} diff --git a/go/cmd/server/main.go b/go/cmd/server/main.go index 9dc6a5b966..33d56e0a7a 100644 --- a/go/cmd/server/main.go +++ b/go/cmd/server/main.go @@ -2,13 +2,15 @@ package main import ( "fmt" + "log" + "net" + "os" + + "github.com/feast-dev/feast/go/cmd/server/logging" "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/protos/feast/serving" "google.golang.org/grpc" - "log" - "net" - "os" ) const ( @@ -46,12 +48,17 @@ func main() { if err != nil { log.Fatalln(err) } + // Disable logging for now + loggingService, err := logging.NewLoggingService(fs, 1000, "", false) + if err != nil { + log.Fatalln(err) + } defer fs.DestructOnlineStore() - startGrpcServer(fs, sockFile) + startGrpcServer(fs, loggingService, sockFile) } -func startGrpcServer(fs *feast.FeatureStore, sockFile string) { - server := newServingServiceServer(fs) +func startGrpcServer(fs *feast.FeatureStore, loggingService *logging.LoggingService, sockFile string) { + server := newServingServiceServer(fs, loggingService) log.Printf("Starting a gRPC server listening on %s\n", sockFile) lis, err := net.Listen("unix", sockFile) if err != nil { diff --git a/go/cmd/server/server.go b/go/cmd/server/server.go index 0d8c4362a2..3708689268 100644 --- a/go/cmd/server/server.go +++ b/go/cmd/server/server.go @@ -3,18 +3,22 @@ package main import ( "context" + "github.com/feast-dev/feast/go/cmd/server/logging" "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/protos/feast/serving" + prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" + "github.com/google/uuid" ) type servingServiceServer struct { - fs *feast.FeatureStore + fs *feast.FeatureStore + loggingService *logging.LoggingService serving.UnimplementedServingServiceServer } -func newServingServiceServer(fs *feast.FeatureStore) *servingServiceServer { - return &servingServiceServer{fs: fs} +func newServingServiceServer(fs *feast.FeatureStore, loggingService *logging.LoggingService) *servingServiceServer { + return &servingServiceServer{fs: fs, loggingService: loggingService} } func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request *serving.GetFeastServingInfoRequest) (*serving.GetFeastServingInfoResponse, error) { @@ -27,11 +31,11 @@ func (s *servingServiceServer) GetFeastServingInfo(ctx context.Context, request // Metadata contains featurenames that corresponds to the number of rows in response.Results. // Results contains values including the value of the feature, the event timestamp, and feature status in a columnar format. func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *serving.GetOnlineFeaturesRequest) (*serving.GetOnlineFeaturesResponse, error) { + requestId := GenerateRequestId() featuresOrService, err := s.fs.ParseFeatures(request.GetKind()) if err != nil { return nil, err } - featureVectors, err := s.fs.GetOnlineFeatures( ctx, featuresOrService.FeaturesRefs, @@ -49,14 +53,20 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s FeatureNames: &serving.FeatureList{Val: make([]string, 0)}, }, } - - for _, vector := range featureVectors { + // Entities are currently part of the features as a value and the order that we add it to the resp MetaData + // Need to figure out a way to map the correct entities to the correct ordering + entityValuesMap := make(map[string][]*prototypes.Value, 0) + featureNames := make([]string, len(featureVectors)) + for idx, vector := range featureVectors { resp.Metadata.FeatureNames.Val = append(resp.Metadata.FeatureNames.Val, vector.Name) - + featureNames[idx] = vector.Name values, err := types.ArrowValuesToProtoValues(vector.Values) if err != nil { return nil, err } + if _, ok := request.Entities[vector.Name]; ok { + entityValuesMap[vector.Name] = values + } resp.Results = append(resp.Results, &serving.GetOnlineFeaturesResponse_FeatureVector{ Values: values, @@ -64,6 +74,13 @@ func (s *servingServiceServer) GetOnlineFeatures(ctx context.Context, request *s EventTimestamps: vector.Timestamps, }) } - + if featuresOrService.FeatureService != nil { + go s.loggingService.GenerateLogs(featuresOrService.FeatureService, entityValuesMap, resp.Results[len(request.Entities):], request.RequestContext, requestId) + } return resp, nil } + +func GenerateRequestId() string { + id := uuid.New() + return id.String() +} diff --git a/go/cmd/server/server_test.go b/go/cmd/server/server_test.go index 730558f1b6..9d4ffb50bf 100644 --- a/go/cmd/server/server_test.go +++ b/go/cmd/server/server_test.go @@ -2,13 +2,21 @@ package main import ( "context" - "github.com/feast-dev/feast/go/internal/feast/registry" "net" + "os" "path/filepath" "reflect" "runtime" "testing" + "time" + + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/apache/arrow/go/v8/arrow/memory" + "github.com/apache/arrow/go/v8/parquet/file" + "github.com/apache/arrow/go/v8/parquet/pqarrow" + "github.com/feast-dev/feast/go/cmd/server/logging" "github.com/feast-dev/feast/go/internal/feast" "github.com/feast-dev/feast/go/internal/test" "github.com/feast-dev/feast/go/protos/feast/serving" @@ -33,12 +41,27 @@ func getRepoPath(basePath string) string { } // Starts a new grpc server, registers the serving service and returns a client. -func getClient(ctx context.Context, basePath string) (serving.ServingServiceClient, func()) { +func getClient(ctx context.Context, offlineStoreType string, basePath string, enableLogging bool) (serving.ServingServiceClient, func()) { buffer := 1024 * 1024 listener := bufconn.Listen(buffer) server := grpc.NewServer() config, err := registry.NewRepoConfigFromFile(getRepoPath(basePath)) + + // TODO(kevjumba): either add this officially or talk in design review about what the correct solution for what do with path. + // Currently in python we use the path in FileSource but it is not specified in configuration unless it is using file_url? + if enableLogging { + if config.OfflineStore == nil { + config.OfflineStore = map[string]interface{}{} + } + absPath, err := filepath.Abs(filepath.Join(getRepoPath(basePath), "log.parquet")) + if err != nil { + panic(err) + } + config.OfflineStore["path"] = absPath + config.OfflineStore["storeType"] = offlineStoreType + } + if err != nil { panic(err) } @@ -46,7 +69,13 @@ func getClient(ctx context.Context, basePath string) (serving.ServingServiceClie if err != nil { panic(err) } - serving.RegisterServingServiceServer(server, &servingServiceServer{fs: fs}) + loggingService, err := logging.NewLoggingService(fs, 1000, "test_service", enableLogging) + if err != nil { + panic(err) + } + servingServiceServer := newServingServiceServer(fs, loggingService) + + serving.RegisterServingServiceServer(server, servingServiceServer) go func() { if err := server.Serve(listener); err != nil { panic(err) @@ -70,11 +99,12 @@ func getClient(ctx context.Context, basePath string) (serving.ServingServiceClie func TestGetFeastServingInfo(t *testing.T) { ctx := context.Background() // Pregenerated using `feast init`. - dir := "." - err := test.SetupFeatureRepo(dir) + dir := "logging/" + err := test.SetupInitializedRepo(dir) assert.Nil(t, err) - defer test.CleanUpRepo(dir) - client, closer := getClient(ctx, dir) + defer test.CleanUpInitializedRepo(dir) + + client, closer := getClient(ctx, "", dir, false) defer closer() response, err := client.GetFeastServingInfo(ctx, &serving.GetFeastServingInfoRequest{}) assert.Nil(t, err) @@ -84,11 +114,12 @@ func TestGetFeastServingInfo(t *testing.T) { func TestGetOnlineFeaturesSqlite(t *testing.T) { ctx := context.Background() // Pregenerated using `feast init`. - dir := "." - err := test.SetupFeatureRepo(dir) + dir := "logging/" + err := test.SetupInitializedRepo(dir) assert.Nil(t, err) - defer test.CleanUpRepo(dir) - client, closer := getClient(ctx, dir) + defer test.CleanUpInitializedRepo(dir) + + client, closer := getClient(ctx, "", dir, false) defer closer() entities := make(map[string]*types.RepeatedValue) entities["driver_id"] = &types.RepeatedValue{ @@ -107,15 +138,15 @@ func TestGetOnlineFeaturesSqlite(t *testing.T) { Entities: entities, } response, err := client.GetOnlineFeatures(ctx, request) + assert.Nil(t, err) + assert.NotNil(t, response) expectedEntityValuesResp := []*types.Value{ {Val: &types.Value_Int64Val{Int64Val: 1001}}, {Val: &types.Value_Int64Val{Int64Val: 1003}}, {Val: &types.Value_Int64Val{Int64Val: 1005}}, } expectedFeatureNamesResp := []string{"driver_id", "conv_rate", "acc_rate", "avg_daily_trips"} - assert.Nil(t, err) - assert.NotNil(t, response) - rows, err := test.ReadParquet(filepath.Join(dir, "feature_repo", "data", "driver_stats.parquet")) + rows, err := test.ReadParquet(filepath.Join(dir, "feature_repo", "driver_stats.parquet")) assert.Nil(t, err) entityKeys := map[int64]bool{1001: true, 1003: true, 1005: true} correctFeatures := test.GetLatestFeatures(rows, entityKeys) @@ -138,3 +169,111 @@ func TestGetOnlineFeaturesSqlite(t *testing.T) { assert.True(t, reflect.DeepEqual(response.Metadata.FeatureNames.Val, expectedFeatureNamesResp)) } + +func TestGetOnlineFeaturesSqliteWithLogging(t *testing.T) { + ctx := context.Background() + // Pregenerated using `feast init`. + dir := "logging/" + err := test.SetupInitializedRepo(dir) + assert.Nil(t, err) + defer test.CleanUpInitializedRepo(dir) + + client, closer := getClient(ctx, "file", dir, true) + defer closer() + entities := make(map[string]*types.RepeatedValue) + entities["driver_id"] = &types.RepeatedValue{ + Val: []*types.Value{ + {Val: &types.Value_Int64Val{Int64Val: 1001}}, + {Val: &types.Value_Int64Val{Int64Val: 1003}}, + {Val: &types.Value_Int64Val{Int64Val: 1005}}, + }, + } + + request := &serving.GetOnlineFeaturesRequest{ + Kind: &serving.GetOnlineFeaturesRequest_FeatureService{ + FeatureService: "test_service", + }, + Entities: entities, + FullFeatureNames: true, + } + response, err := client.GetOnlineFeatures(ctx, request) + + assert.Nil(t, err) + assert.NotNil(t, response) + + // Get the featurenames without the entity names that are appended at the front. + featureNames := response.Metadata.FeatureNames.Val[len(request.Entities):] + // Generated expected log rows and values + // TODO(kevjumba): implement for timestamp and status + expectedLogValues, _, _ := GetExpectedLogRows(featureNames, response.Results[len(request.Entities):]) + expectedLogValues["driver_id"] = entities["driver_id"] + logPath, err := filepath.Abs(filepath.Join(dir, "feature_repo", "log.parquet")) + // Wait for logger to flush. + assert.Eventually(t, func() bool { + var _, err = os.Stat(logPath) + if os.IsNotExist(err) { + return false + } else { + return true + } + }, 1*time.Second, logging.DEFAULT_LOG_FLUSH_INTERVAL) + assert.Nil(t, err) + pf, err := file.OpenParquetFile(logPath, false) + assert.Nil(t, err) + + reader, err := pqarrow.NewFileReader(pf, pqarrow.ArrowReadProperties{}, memory.DefaultAllocator) + assert.Nil(t, err) + + tbl, err := reader.ReadTable(ctx) + assert.Nil(t, err) + tr := array.NewTableReader(tbl, -1) + defer tbl.Release() + defer tr.Release() + for tr.Next() { + rec := tr.Record() + assert.NotNil(t, rec) + values, err := test.GetProtoFromRecord(rec) + + assert.Nil(t, err) + assert.Equal(t, len(values)-1 /*request id column not counted*/, len(expectedLogValues)) + // Need to iterate through and compare because certain values in types.RepeatedValues aren't accurately being compared. + for name, val := range values { + if name == "RequestId" { + // Ensure there are request ids for each entity. + assert.Equal(t, len(val.Val), len(response.Results[0].Values)) + } else { + assert.Equal(t, len(val.Val), len(expectedLogValues[name].Val)) + for idx, featureVal := range val.Val { + assert.Equal(t, featureVal.Val, expectedLogValues[name].Val[idx].Val) + } + } + + } + } + err = test.CleanUpFile(logPath) + assert.Nil(t, err) +} + +// Generate the expected log rows based on the resulting feature vector returned from GetOnlineFeatures. +func GetExpectedLogRows(featureNames []string, results []*serving.GetOnlineFeaturesResponse_FeatureVector) (map[string]*types.RepeatedValue, [][]int32, [][]int64) { + numFeatures := len(featureNames) + numRows := len(results[0].Values) + featureValueLogRows := make(map[string]*types.RepeatedValue) + featureStatusLogRows := make([][]int32, numRows) + eventTimestampLogRows := make([][]int64, numRows) + for idx := 0; idx < len(results); idx++ { + valArray := make([]*types.Value, 0) + for row_idx := 0; row_idx < numRows; row_idx++ { + featureStatusLogRows[row_idx] = make([]int32, numFeatures) + eventTimestampLogRows[row_idx] = make([]int64, numFeatures) + valArray = append(valArray, results[idx].Values[row_idx]) + featureStatusLogRows[row_idx][idx] = int32(serving.FieldStatus_PRESENT) + eventTimestampLogRows[row_idx][idx] = results[idx].EventTimestamps[row_idx].AsTime().UnixNano() / int64(time.Millisecond) + + } + featureValueLogRows[featureNames[idx]] = &types.RepeatedValue{ + Val: valArray, + } + } + return featureValueLogRows, featureStatusLogRows, eventTimestampLogRows +} diff --git a/go/embedded/online_features.go b/go/embedded/online_features.go index 217e53d060..24a5489430 100644 --- a/go/embedded/online_features.go +++ b/go/embedded/online_features.go @@ -3,6 +3,8 @@ package embedded import ( "context" "fmt" + "log" + "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/arrow/array" "github.com/apache/arrow/go/v8/arrow/cdata" @@ -14,7 +16,6 @@ import ( "github.com/feast-dev/feast/go/internal/feast/transformation" prototypes "github.com/feast-dev/feast/go/protos/feast/types" "github.com/feast-dev/feast/go/types" - "log" ) type OnlineFeatureService struct { @@ -69,7 +70,7 @@ func (s *OnlineFeatureService) GetEntityTypesMap(featureRefs []string) (map[stri // skip on demand feature views continue } - for entityName := range view.Entities { + for _, entityName := range view.Entities { entity := entitiesByName[entityName] joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) } @@ -98,7 +99,7 @@ func (s *OnlineFeatureService) GetEntityTypesMapByFeatureService(featureServiceN // skip on demand feature views continue } - for entityName := range view.Entities { + for _, entityName := range view.Entities { entity := entitiesByName[entityName] joinKeyTypes[entity.JoinKey] = int32(entity.ValueType.Number()) } diff --git a/go/internal/feast/featurestore.go b/go/internal/feast/featurestore.go index 8eacc4a943..5e10f4978e 100644 --- a/go/internal/feast/featurestore.go +++ b/go/internal/feast/featurestore.go @@ -3,6 +3,7 @@ package feast import ( "context" "errors" + "github.com/apache/arrow/go/v8/arrow/memory" "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/internal/feast/onlineserving" @@ -10,7 +11,6 @@ import ( "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/internal/feast/transformation" "github.com/feast-dev/feast/go/protos/feast/serving" - "github.com/feast-dev/feast/go/protos/feast/types" prototypes "github.com/feast-dev/feast/go/protos/feast/types" ) @@ -29,6 +29,14 @@ type Features struct { FeatureService *model.FeatureService } +func (fs *FeatureStore) Registry() *registry.Registry { + return fs.registry +} + +func (fs *FeatureStore) GetRepoConfig() *registry.RepoConfig { + return fs.config +} + // NewFeatureStore constructs a feature store fat client using the // repo config (contents of feature_store.yaml converted to JSON map). func NewFeatureStore(config *registry.RepoConfig, callback transformation.TransformationCallback) (*FeatureStore, error) { @@ -114,7 +122,7 @@ func (fs *FeatureStore) GetOnlineFeatures( entitylessCase := false for _, featureView := range featureViews { - if _, ok := featureView.Entities[model.DUMMY_ENTITY_NAME]; ok { + if featureView.HasEntity(model.DUMMY_ENTITY_NAME) { entitylessCase = true break } @@ -248,6 +256,10 @@ func (fs *FeatureStore) ListEntities(hideDummyEntity bool) ([]*model.Entity, err return entities, nil } +func (fs *FeatureStore) ListOnDemandFeatureViews() ([]*model.OnDemandFeatureView, error) { + return fs.registry.ListOnDemandFeatureViews(fs.config.Project) +} + /* Group feature views that share the same set of join keys. For each group, we store only unique rows and save indices to retrieve those rows for each requested feature @@ -258,20 +270,20 @@ func (fs *FeatureStore) GetFeatureView(featureViewName string, hideDummyEntity b if err != nil { return nil, err } - if _, ok := fv.Entities[model.DUMMY_ENTITY_NAME]; ok && hideDummyEntity { - fv.Entities = make(map[string]struct{}) + if fv.HasEntity(model.DUMMY_ENTITY_NAME) && hideDummyEntity { + fv.Entities = []string{} } return fv, nil } -func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*types.EntityKey, +func (fs *FeatureStore) readFromOnlineStore(ctx context.Context, entityRows []*prototypes.EntityKey, requestedFeatureViewNames []string, requestedFeatureNames []string, ) ([][]onlinestore.FeatureData, error) { numRows := len(entityRows) - entityRowsValue := make([]*types.EntityKey, numRows) + entityRowsValue := make([]*prototypes.EntityKey, numRows) for index, entityKey := range entityRows { - entityRowsValue[index] = &types.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues} + entityRowsValue[index] = &prototypes.EntityKey{JoinKeys: entityKey.JoinKeys, EntityValues: entityKey.EntityValues} } return fs.onlineStore.OnlineRead(ctx, entityRowsValue, requestedFeatureViewNames, requestedFeatureNames) } diff --git a/go/internal/feast/featurestore_test.go b/go/internal/feast/featurestore_test.go index 00babcc385..c8f9049c4a 100644 --- a/go/internal/feast/featurestore_test.go +++ b/go/internal/feast/featurestore_test.go @@ -2,13 +2,14 @@ package feast import ( "context" + "path/filepath" + "runtime" + "testing" + "github.com/feast-dev/feast/go/internal/feast/onlinestore" "github.com/feast-dev/feast/go/internal/feast/registry" "github.com/feast-dev/feast/go/protos/feast/types" "github.com/stretchr/testify/assert" - "path/filepath" - "runtime" - "testing" ) // Return absolute path to the test_repo registry regardless of the working directory diff --git a/go/internal/feast/model/basefeatureview.go b/go/internal/feast/model/basefeatureview.go index 46d8663609..28ef7231fd 100644 --- a/go/internal/feast/model/basefeatureview.go +++ b/go/internal/feast/model/basefeatureview.go @@ -2,6 +2,7 @@ package model import ( "fmt" + "github.com/feast-dev/feast/go/protos/feast/core" ) diff --git a/go/internal/feast/model/featureview.go b/go/internal/feast/model/featureview.go index bdb86f0ace..85fc7a60ee 100644 --- a/go/internal/feast/model/featureview.go +++ b/go/internal/feast/model/featureview.go @@ -15,10 +15,9 @@ const ( var DUMMY_ENTITY types.Value = types.Value{Val: &types.Value_StringVal{StringVal: DUMMY_ENTITY_VAL}} type FeatureView struct { - Base *BaseFeatureView - Ttl *durationpb.Duration - // Make entities set so that search for dummy entity is faster - Entities map[string]struct{} + Base *BaseFeatureView + Ttl *durationpb.Duration + Entities []string } func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView { @@ -26,12 +25,9 @@ func NewFeatureViewFromProto(proto *core.FeatureView) *FeatureView { Ttl: &(*proto.Spec.Ttl), } if len(proto.Spec.Entities) == 0 { - featureView.Entities = map[string]struct{}{DUMMY_ENTITY_NAME: {}} + featureView.Entities = []string{DUMMY_ENTITY_NAME} } else { - featureView.Entities = make(map[string]struct{}) - for _, entityName := range proto.Spec.Entities { - featureView.Entities[entityName] = struct{}{} - } + featureView.Entities = proto.Spec.Entities } return featureView } @@ -44,3 +40,12 @@ func (fs *FeatureView) NewFeatureViewFromBase(base *BaseFeatureView) *FeatureVie } return featureView } + +func (fs *FeatureView) HasEntity(lookup string) bool { + for _, entityName := range fs.Entities { + if entityName == lookup { + return true + } + } + return false +} diff --git a/go/internal/feast/model/ondemandfeatureview.go b/go/internal/feast/model/ondemandfeatureview.go index b8d28c6d7e..7d2f7a2206 100644 --- a/go/internal/feast/model/ondemandfeatureview.go +++ b/go/internal/feast/model/ondemandfeatureview.go @@ -48,6 +48,12 @@ func (fs *OnDemandFeatureView) NewWithProjection(projection *FeatureViewProjecti return featureView, nil } +func NewOnDemandFeatureViewFromBase(base *BaseFeatureView) *OnDemandFeatureView { + + featureView := &OnDemandFeatureView{Base: base} + return featureView +} + func (fs *OnDemandFeatureView) ProjectWithFeatures(featureNames []string) (*OnDemandFeatureView, error) { return fs.NewWithProjection(fs.Base.ProjectWithFeatures(featureNames)) } diff --git a/go/internal/feast/onlineserving/serving.go b/go/internal/feast/onlineserving/serving.go index ebf0f3f324..381ba5f0f2 100644 --- a/go/internal/feast/onlineserving/serving.go +++ b/go/internal/feast/onlineserving/serving.go @@ -4,6 +4,9 @@ import ( "crypto/sha256" "errors" "fmt" + "sort" + "strings" + "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/arrow/memory" "github.com/feast-dev/feast/go/internal/feast/model" @@ -14,8 +17,6 @@ import ( "github.com/golang/protobuf/proto" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" - "sort" - "strings" ) /* @@ -249,7 +250,7 @@ func GetEntityMaps(requestedFeatureViews []*FeatureViewAndRefs, entities []*mode joinKeyToAliasMap = map[string]string{} } - for entityName := range featureView.Entities { + for _, entityName := range featureView.Entities { joinKey := entitiesByName[entityName].JoinKey entityNameToJoinKeyMap[entityName] = joinKey @@ -516,8 +517,8 @@ func GroupFeatureRefs(requestedFeatureViews []*FeatureViewAndRefs, joinKeys := make([]string, 0) fv := featuresAndView.View featureNames := featuresAndView.FeatureRefs - for entity := range fv.Entities { - joinKeys = append(joinKeys, entityNameToJoinKeyMap[entity]) + for _, entityName := range fv.Entities { + joinKeys = append(joinKeys, entityNameToJoinKeyMap[entityName]) } groupKeyBuilder := make([]string, 0) diff --git a/go/internal/feast/onlineserving/serving_test.go b/go/internal/feast/onlineserving/serving_test.go index d33ec87b82..2f4cf8eaba 100644 --- a/go/internal/feast/onlineserving/serving_test.go +++ b/go/internal/feast/onlineserving/serving_test.go @@ -1,13 +1,14 @@ package onlineserving import ( + "testing" + "github.com/feast-dev/feast/go/internal/feast/model" "github.com/feast-dev/feast/go/protos/feast/core" "github.com/feast-dev/feast/go/protos/feast/types" "github.com/stretchr/testify/assert" "google.golang.org/protobuf/types/known/durationpb" "google.golang.org/protobuf/types/known/timestamppb" - "testing" ) func TestGroupingFeatureRefs(t *testing.T) { @@ -18,19 +19,19 @@ func TestGroupingFeatureRefs(t *testing.T) { NameAlias: "aliasViewA", }, }, - Entities: map[string]struct{}{"driver": {}, "customer": {}}, + Entities: []string{"driver", "customer"}, } viewB := &model.FeatureView{ Base: &model.BaseFeatureView{Name: "viewB"}, - Entities: map[string]struct{}{"driver": {}, "customer": {}}, + Entities: []string{"driver", "customer"}, } viewC := &model.FeatureView{ Base: &model.BaseFeatureView{Name: "viewC"}, - Entities: map[string]struct{}{"driver": {}}, + Entities: []string{"driver"}, } viewD := &model.FeatureView{ Base: &model.BaseFeatureView{Name: "viewD"}, - Entities: map[string]struct{}{"customer": {}}, + Entities: []string{"customer"}, } refGroups, _ := GroupFeatureRefs( []*FeatureViewAndRefs{ @@ -103,11 +104,11 @@ func TestGroupingFeatureRefsWithJoinKeyAliases(t *testing.T) { JoinKeyMap: map[string]string{"location_id": "destination_id"}, }, }, - Entities: map[string]struct{}{"location": {}}, + Entities: []string{"location"}, } viewB := &model.FeatureView{ Base: &model.BaseFeatureView{Name: "viewB"}, - Entities: map[string]struct{}{"location": {}}, + Entities: []string{"location"}, } refGroups, _ := GroupFeatureRefs( @@ -162,7 +163,7 @@ func TestGroupingFeatureRefsWithMissingKey(t *testing.T) { JoinKeyMap: map[string]string{"location_id": "destination_id"}, }, }, - Entities: map[string]struct{}{"location": {}}, + Entities: []string{"location"}, } _, err := GroupFeatureRefs( diff --git a/go/internal/feast/onlinestore/sqliteonlinestore.go b/go/internal/feast/onlinestore/sqliteonlinestore.go index 09c8eb97b4..f8c5325545 100644 --- a/go/internal/feast/onlinestore/sqliteonlinestore.go +++ b/go/internal/feast/onlinestore/sqliteonlinestore.go @@ -82,9 +82,6 @@ func (s *SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types. featureNamesToIdx[name] = idx } - for idx := 0; idx < len(entityKeys); idx++ { - results[idx] = make([]FeatureData, featureCount) - } for _, featureViewName := range featureViewNames { query_string := fmt.Sprintf(`SELECT entity_key, feature_name, Value, event_ts FROM %s @@ -108,7 +105,11 @@ func (s *SqliteOnlineStore) OnlineRead(ctx context.Context, entityKeys []*types. if err := proto.Unmarshal(valueString, &value); err != nil { return nil, errors.New("error converting parsed value to types.Value") } - results[entityNameToEntityIndex[hashSerializedEntityKey(&entity_key)]][featureNamesToIdx[feature_name]] = FeatureData{Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: feature_name}, + rowIdx := entityNameToEntityIndex[hashSerializedEntityKey(&entity_key)] + if results[rowIdx] == nil { + results[rowIdx] = make([]FeatureData, featureCount) + } + results[rowIdx][featureNamesToIdx[feature_name]] = FeatureData{Reference: serving.FeatureReferenceV2{FeatureViewName: featureViewName, FeatureName: feature_name}, Timestamp: *timestamppb.New(event_ts), Value: types.Value{Val: value.Val}, } diff --git a/go/internal/feast/onlinestore/sqliteonlinestore_test.go b/go/internal/feast/onlinestore/sqliteonlinestore_test.go index 9d45992d6e..cbee9cd91c 100644 --- a/go/internal/feast/onlinestore/sqliteonlinestore_test.go +++ b/go/internal/feast/onlinestore/sqliteonlinestore_test.go @@ -2,20 +2,21 @@ package onlinestore import ( "context" - "github.com/feast-dev/feast/go/internal/feast/registry" "path/filepath" "reflect" "testing" + "github.com/feast-dev/feast/go/internal/feast/registry" + "github.com/feast-dev/feast/go/internal/test" "github.com/feast-dev/feast/go/protos/feast/types" "github.com/stretchr/testify/assert" ) -func TestSqliteSetup(t *testing.T) { +func TestSqliteAndFeatureRepoSetup(t *testing.T) { dir := "../../test" feature_repo_path := filepath.Join(dir, "feature_repo") - err := test.SetupFeatureRepo(dir) + err := test.SetupCleanFeatureRepo(dir) assert.Nil(t, err) defer test.CleanUpRepo(dir) config, err := registry.NewRepoConfigFromFile(feature_repo_path) @@ -34,7 +35,7 @@ func TestSqliteSetup(t *testing.T) { func TestSqliteOnlineRead(t *testing.T) { dir := "../../test" feature_repo_path := filepath.Join(dir, "feature_repo") - test.SetupFeatureRepo(dir) + test.SetupCleanFeatureRepo(dir) defer test.CleanUpRepo(dir) config, err := registry.NewRepoConfigFromFile(feature_repo_path) assert.Nil(t, err) diff --git a/go/internal/feast/registry/registry.go b/go/internal/feast/registry/registry.go index e47034aaf7..38cf167a9f 100644 --- a/go/internal/feast/registry/registry.go +++ b/go/internal/feast/registry/registry.go @@ -3,11 +3,12 @@ package registry import ( "errors" "fmt" - "github.com/feast-dev/feast/go/internal/feast/model" "net/url" "sync" "time" + "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/protos/feast/core" ) diff --git a/go/internal/test/go_integration_test_utils.go b/go/internal/test/go_integration_test_utils.go index dcaa1bc3d5..d66a546193 100644 --- a/go/internal/test/go_integration_test_utils.go +++ b/go/internal/test/go_integration_test_utils.go @@ -3,8 +3,11 @@ package test import ( "context" "fmt" - "github.com/apache/arrow/go/v8/arrow/array" + "log" + "github.com/apache/arrow/go/v8/arrow/memory" + "google.golang.org/protobuf/types/known/durationpb" + "google.golang.org/protobuf/types/known/timestamppb" "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/parquet/file" @@ -14,6 +17,11 @@ import ( "os/exec" "path/filepath" "time" + + "github.com/apache/arrow/go/v8/arrow/array" + "github.com/feast-dev/feast/go/internal/feast/model" + "github.com/feast-dev/feast/go/protos/feast/types" + gotypes "github.com/feast-dev/feast/go/types" ) type Row struct { @@ -78,7 +86,7 @@ func GetLatestFeatures(Rows []*Row, entities map[int64]bool) map[int64]*Row { return correctFeatureRows } -func SetupFeatureRepo(basePath string) error { +func SetupCleanFeatureRepo(basePath string) error { cmd := exec.Command("feast", "init", "feature_repo") path, err := filepath.Abs(basePath) cmd.Env = os.Environ() @@ -93,15 +101,12 @@ func SetupFeatureRepo(basePath string) error { } applyCommand := exec.Command("feast", "apply") applyCommand.Env = os.Environ() - feature_repo_path, err := filepath.Abs(filepath.Join(path, "feature_repo")) - if err != nil { - return err - } - applyCommand.Dir = feature_repo_path - err = applyCommand.Run() + featureRepoPath, err := filepath.Abs(filepath.Join(path, "feature_repo")) if err != nil { return err } + applyCommand.Dir = featureRepoPath + applyCommand.Run() t := time.Now() formattedTime := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d", @@ -109,22 +114,140 @@ func SetupFeatureRepo(basePath string) error { t.Hour(), t.Minute(), t.Second()) materializeCommand := exec.Command("feast", "materialize-incremental", formattedTime) materializeCommand.Env = os.Environ() - materializeCommand.Dir = feature_repo_path + materializeCommand.Dir = featureRepoPath err = materializeCommand.Run() if err != nil { return err } + return nil } -func CleanUpRepo(basePath string) error { - feature_repo_path, err := filepath.Abs(filepath.Join(basePath, "feature_repo")) +func SetupInitializedRepo(basePath string) error { + path, err := filepath.Abs(basePath) + if err != nil { + return err + } + applyCommand := exec.Command("feast", "apply") + applyCommand.Env = os.Environ() + featureRepoPath, err := filepath.Abs(filepath.Join(path, "feature_repo")) + if err != nil { + return err + } + // var stderr bytes.Buffer + // var stdout bytes.Buffer + applyCommand.Dir = featureRepoPath + err = applyCommand.Run() if err != nil { return err + } - err = os.RemoveAll(feature_repo_path) + t := time.Now() + + formattedTime := fmt.Sprintf("%d-%02d-%02dT%02d:%02d:%02d", + t.Year(), t.Month(), t.Day(), + t.Hour(), t.Minute(), t.Second()) + + materializeCommand := exec.Command("feast", "materialize-incremental", formattedTime) + materializeCommand.Env = os.Environ() + materializeCommand.Dir = featureRepoPath + out, err := materializeCommand.Output() if err != nil { + log.Println(string(out)) return err } return nil } + +func CleanUpInitializedRepo(basePath string) { + featureRepoPath, err := filepath.Abs(filepath.Join(basePath, "feature_repo")) + if err != nil { + log.Fatal(err) + } + + err = os.Remove(filepath.Join(featureRepoPath, "data", "registry.db")) + if err != nil { + log.Fatal(err) + } + err = os.Remove(filepath.Join(featureRepoPath, "data", "online_store.db")) + if err != nil { + log.Fatal(err) + } +} + +func CleanUpRepo(basePath string) { + featureRepoPath, err := filepath.Abs(filepath.Join(basePath, "feature_repo")) + if err != nil { + log.Fatal(err) + } + err = os.RemoveAll(featureRepoPath) + if err != nil { + log.Fatal(err) + } +} + +func GetProtoFromRecord(rec array.Record) (map[string]*types.RepeatedValue, error) { + r := make(map[string]*types.RepeatedValue) + schema := rec.Schema() + for idx, column := range rec.Columns() { + field := schema.Field(idx) + values, err := gotypes.ArrowValuesToProtoValues(column) + if err != nil { + return nil, err + } + r[field.Name] = &types.RepeatedValue{Val: values} + } + return r, nil +} + +func CleanUpFile(absPath string) error { + return os.Remove(absPath) +} + +func CreateBaseFeatureView(name string, features []*model.Feature, projection *model.FeatureViewProjection) *model.BaseFeatureView { + return &model.BaseFeatureView{ + Name: name, + Features: features, + Projection: projection, + } +} + +func CreateNewEntity(name string, valueType types.ValueType_Enum, joinKey string) *model.Entity { + return &model.Entity{ + Name: name, + ValueType: valueType, + JoinKey: joinKey, + } +} + +func CreateNewFeature(name string, dtype types.ValueType_Enum) *model.Feature { + return &model.Feature{Name: name, + Dtype: dtype, + } +} + +func CreateNewFeatureService(name string, project string, createdTimestamp *timestamppb.Timestamp, lastUpdatedTimestamp *timestamppb.Timestamp, projections []*model.FeatureViewProjection) *model.FeatureService { + return &model.FeatureService{ + Name: name, + Project: project, + CreatedTimestamp: createdTimestamp, + LastUpdatedTimestamp: lastUpdatedTimestamp, + Projections: projections, + } +} + +func CreateNewFeatureViewProjection(name string, nameAlias string, features []*model.Feature, joinKeyMap map[string]string) *model.FeatureViewProjection { + return &model.FeatureViewProjection{Name: name, + NameAlias: nameAlias, + Features: features, + JoinKeyMap: joinKeyMap, + } +} + +func CreateFeatureView(base *model.BaseFeatureView, ttl *durationpb.Duration, entities []string) *model.FeatureView { + return &model.FeatureView{ + Base: base, + Ttl: ttl, + Entities: entities, + } +} diff --git a/go/types/typeconversion.go b/go/types/typeconversion.go index 798d352b37..c02768c755 100644 --- a/go/types/typeconversion.go +++ b/go/types/typeconversion.go @@ -2,6 +2,7 @@ package types import ( "fmt" + "github.com/apache/arrow/go/v8/arrow" "github.com/apache/arrow/go/v8/arrow/array" "github.com/apache/arrow/go/v8/arrow/memory" @@ -48,6 +49,46 @@ func ProtoTypeToArrowType(sample *types.Value) (arrow.DataType, error) { } } +func ValueTypeEnumToArrowType(t types.ValueType_Enum) (arrow.DataType, error) { + switch t { + case types.ValueType_BYTES: + return arrow.BinaryTypes.Binary, nil + case types.ValueType_STRING: + return arrow.BinaryTypes.String, nil + case types.ValueType_INT32: + return arrow.PrimitiveTypes.Int32, nil + case types.ValueType_INT64: + return arrow.PrimitiveTypes.Int64, nil + case types.ValueType_FLOAT: + return arrow.PrimitiveTypes.Float32, nil + case types.ValueType_DOUBLE: + return arrow.PrimitiveTypes.Float64, nil + case types.ValueType_BOOL: + return arrow.FixedWidthTypes.Boolean, nil + case types.ValueType_BOOL_LIST: + return arrow.ListOf(arrow.FixedWidthTypes.Boolean), nil + case types.ValueType_STRING_LIST: + return arrow.ListOf(arrow.BinaryTypes.String), nil + case types.ValueType_BYTES_LIST: + return arrow.ListOf(arrow.BinaryTypes.Binary), nil + case types.ValueType_INT32_LIST: + return arrow.ListOf(arrow.PrimitiveTypes.Int32), nil + case types.ValueType_INT64_LIST: + return arrow.ListOf(arrow.PrimitiveTypes.Int64), nil + case types.ValueType_FLOAT_LIST: + return arrow.ListOf(arrow.PrimitiveTypes.Float32), nil + case types.ValueType_DOUBLE_LIST: + return arrow.ListOf(arrow.PrimitiveTypes.Float64), nil + case types.ValueType_UNIX_TIMESTAMP: + return arrow.FixedWidthTypes.Time64ns, nil + case types.ValueType_UNIX_TIMESTAMP_LIST: + return arrow.ListOf(arrow.FixedWidthTypes.Time64ns), nil + default: + return nil, + fmt.Errorf("unsupported value type enum in enum to arrow type conversion: %s", t) + } +} + func copyProtoValuesToArrowArray(builder array.Builder, values []*types.Value) error { switch fieldBuilder := builder.(type) { case *array.BooleanBuilder: