Skip to content

Commit

Permalink
Merge pull request #26 from mimiro-io/feat/incremental
Browse files Browse the repository at this point in the history
Feat/incremental
  • Loading branch information
gra-moore authored Oct 16, 2023
2 parents 9055388 + 9807e83 commit 52dd0e4
Show file tree
Hide file tree
Showing 19 changed files with 403 additions and 312 deletions.
5 changes: 2 additions & 3 deletions cmd/flake/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func main() {
var cfg *internal.Config = &internal.Config{}
cfg := &internal.Config{}
if err := cfg.ServerFlags().Parse(os.Args); err != nil {
panic(err)
}
Expand Down Expand Up @@ -56,5 +56,4 @@ func main() {
if err := s.E.Shutdown(ctx); err != nil {
internal.LOG.Fatal().Err(err).Msg(err.Error())
}

}
}
24 changes: 12 additions & 12 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/juliangruber/go-intersect v1.1.0
github.com/labstack/echo/v4 v4.11.2
github.com/lestrrat-go/jwx/v2 v2.0.13
github.com/mimiro-io/common-datalayer v0.1.3
github.com/mimiro-io/common-datalayer v0.1.4
github.com/mimiro-io/entity-graph-data-model v0.7.0
github.com/mimiro-io/internal-go-util v0.0.0-20230307120700-77b59ac32055
github.com/onsi/ginkgo/v2 v2.12.0
Expand All @@ -30,25 +30,25 @@ require (
github.com/andybalholm/brotli v1.0.5 // indirect
github.com/apache/arrow/go/v12 v12.0.1 // indirect
github.com/apache/thrift v0.19.0 // indirect
github.com/aws/aws-sdk-go-v2 v1.21.1 // indirect
github.com/aws/aws-sdk-go-v2 v1.21.2 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.4.14 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.42 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.89 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.42 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.36 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.5 // indirect
github.com/aws/aws-sdk-go-v2/credentials v1.13.43 // indirect
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.11.90 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.1.43 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.4.37 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.1.6 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.9.15 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.37 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.36 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.5 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.40.1 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.1.38 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.9.37 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.15.6 // indirect
github.com/aws/aws-sdk-go-v2/service/s3 v1.40.2 // indirect
github.com/aws/smithy-go v1.15.0 // indirect
github.com/bcicen/jstream v1.0.1 // indirect
github.com/danieljoos/wincred v1.2.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dvsekhvalnov/jose2go v1.5.0 // indirect
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/gabriel-vasile/mimetype v1.4.2 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/goccy/go-json v0.10.2 // indirect
Expand Down
152 changes: 36 additions & 116 deletions go.sum

Large diffs are not rendered by default.

24 changes: 12 additions & 12 deletions internal/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,26 @@ type Config struct {
LogType string
LogLevel string
ServiceName string
PrivateCert string //deprecated
PrivateCert string // deprecated
SnowflakeUser string
SnowflakePassword string
SnowflakeAccount string
SnowflakeDb string
SnowflakeDB string
SnowflakeSchema string
SnowflakeWarehouse string
SnowflakeUri string
SnowflakeURI string
SnowflakePrivateKey string
Port int
JwtWellKnown string
TokenIssuer string
TokenAudience string
//NodePublicKey string
// NodePublicKey string
Authenticator string
MemoryHeadroom int
DsMappings []*common_datalayer.DatasetDefinition
ConfigLocation string
ConfigLoaderInterval int
ConfigLoaderClientId string
ConfigLoaderClientID string
ConfigLoaderClientSecret string
ConfigLoaderAudience string
ConfigLoaderGrantType string
Expand All @@ -50,15 +50,15 @@ func (c *Config) common() *flag.FlagSet {
fs.StringVar(&c.SnowflakeUser, "snowflake-user", "", "Snowflake username. Required.")
fs.StringVar(&c.SnowflakePassword, "snowflake-password", "", "Snowflake password. Required.")
fs.StringVar(&c.SnowflakeAccount, "snowflake-account", "", "Snowflake account to use.")
fs.StringVar(&c.SnowflakeDb, "snowflake-db", "", "Snowflake db to write to.")
fs.StringVar(&c.SnowflakeDB, "snowflake-db", "", "Snowflake db to write to.")
fs.StringVar(&c.SnowflakeSchema, "snowflake-schema", "", "Snowflake schema if set.")
fs.StringVar(&c.SnowflakeWarehouse, "snowflake-warehouse", "", "Snowflake warehouse")
fs.StringVar(&c.SnowflakeUri, "snowflake-connection-string", "", "Alternative if more parameters are needed.")
fs.StringVar(&c.SnowflakeURI, "snowflake-connection-string", "", "Alternative if more parameters are needed.")
fs.StringVar(&c.SnowflakePrivateKey, "snowflake-private-key", "", "base64 encoded private key.")
fs.StringVar(&c.PrivateCert, "private-cert", "", "deprecated, use snowflake-private-key.")
fs.IntVar(&c.ConfigLoaderInterval, "config-loader-interval", 60, "Interval in seconds to reload config file")
fs.StringVar(&c.ConfigLocation, "config-location", "", "Location of config file. file:// or http://")
fs.StringVar(&c.ConfigLoaderClientId, "config-loader-client-id", "", "Client id for config loader")
fs.StringVar(&c.ConfigLoaderClientID, "config-loader-client-id", "", "Client id for config loader")
fs.StringVar(&c.ConfigLoaderClientSecret, "config-loader-client-secret", "", "Client secret for config loader")
fs.StringVar(&c.ConfigLoaderAudience, "config-loader-audience", "", "Audience for config loader")
fs.StringVar(&c.ConfigLoaderGrantType, "config-loader-grant-type", "", "Grant type for config loader")
Expand All @@ -79,7 +79,7 @@ func (c *Config) ServerFlags() *flag.FlagSet {
fs.StringVar(&c.JwtWellKnown, "well-known", "", "url to well-known.json endpoint")
fs.StringVar(&c.TokenIssuer, "issuer", "", "either a jwt issuer or a node:<id> issuer if public key is set")
fs.StringVar(&c.TokenAudience, "audience", "", "either a jwt audience or a node:<id> audience if public key is set")
//fs.StringVar(&c.NodePublicKey, "public-key", "", "DataHub public key. Enables public key access.")
// fs.StringVar(&c.NodePublicKey, "public-key", "", "DataHub public key. Enables public key access.")
fs.StringVar(&c.Authenticator, "authenticator", "jwt", "middleware for authentication. 'noop' disables auth, jwt enables it")
return fs
}
Expand All @@ -94,9 +94,9 @@ func (c *Config) LoadEnv() error {
"SnowflakeUser:SNOWFLAKE_USER",
"SnowflakePassword:SNOWFLAKE_PASSWORD",
"SnowflakeAccount:SNOWFLAKE_ACCOUNT",
"SnowflakeDb:SNOWFLAKE_DB",
"SnowflakeDB:SNOWFLAKE_DB",
"SnowflakeSchema:SNOWFLAKE_SCHEMA",
"SnowflakeUri:SNOWFLAKE_CONNECTION_STRING",
"SnowflakeURI:SNOWFLAKE_CONNECTION_STRING",
"SnowflakePrivateKey:SNOWFLAKE_PRIVATE_KEY",
"Port:PORT",
"MemoryHeadroom:MEMORY_HEADROOM",
Expand All @@ -108,7 +108,7 @@ func (c *Config) LoadEnv() error {
"PrivateCert:PRIVATE_CERT",
"ConfigLocation:CONFIG_LOCATION",
"ConfigLoaderInterval:CONFIG_LOADER_INTERVAL",
"ConfigLoaderClientId:CONFIG_LOADER_CLIENT_ID",
"ConfigLoaderClientID:CONFIG_LOADER_CLIENT_ID",
"ConfigLoaderClientSecret:CONFIG_LOADER_CLIENT_SECRET",
"ConfigLoaderAudience:CONFIG_LOADER_AUDIENCE",
"ConfigLoaderGrantType:CONFIG_LOADER_GRANT_TYPE",
Expand Down
30 changes: 14 additions & 16 deletions internal/config_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ func StartConfigLoader(cfg *Config) *ConfigLoader {
}
LOG.Info().Msg("Starting config loader")
go func() {
c.update(cfg)
for {
select {
case <-c.ticker.C:
c.update(cfg)
}
<-c.ticker.C // block until signal
c.update(cfg)
}
}()
return c
Expand All @@ -48,8 +47,8 @@ func newConfigLoader(cfg *Config) *ConfigLoader {
c.httpClient = &http.Client{
Timeout: 10 * time.Second,
}
c.loadConfig = c.loadUrl(
cfg.ConfigLoaderClientId,
c.loadConfig = c.loadURL(
cfg.ConfigLoaderClientID,
cfg.ConfigLoaderClientSecret,
cfg.ConfigLoaderAudience,
cfg.ConfigLoaderGrantType,
Expand All @@ -76,22 +75,21 @@ func (c *ConfigLoader) update(cfg *Config) bool {
LOG.Debug().Msg("Config unchanged")
return false
}

}

func (c *ConfigLoader) Stop() {
c.ticker.Stop()
}

func (c *ConfigLoader) loadUrl(clientId, clientSecret, audience, grantType, endPoint string) func(configEndpoint string) ([]*common_datalayer.DatasetDefinition, error) {
func (c *ConfigLoader) loadURL(clientID, clientSecret, audience, grantType, endPoint string) func(configEndpoint string) ([]*common_datalayer.DatasetDefinition, error) {
return func(configEndpoint string) ([]*common_datalayer.DatasetDefinition, error) {
req, err := http.NewRequest("GET", configEndpoint, nil) //
if err != nil {
return nil, err
}
now := time.Now()
if c.cachedToken == "" || now.After(c.cacheUntil) {
res, err2 := c.fetchNewConfigToken(clientId, clientSecret, audience, grantType, endPoint)
res, err2 := c.fetchNewConfigToken(clientID, clientSecret, audience, grantType, endPoint)
if err2 != nil {
LOG.Error().Err(err2).Msg("Unable to fetch new config token")
return nil, err2
Expand Down Expand Up @@ -130,7 +128,7 @@ func (c *ConfigLoader) loadFile(location string) ([]*common_datalayer.DatasetDef
}

type content struct {
Id string `json:"id"`
ID string `json:"id"`
Data common_datalayer.Config `json:"data"`
}

Expand Down Expand Up @@ -167,9 +165,9 @@ type cnfAuthResponse struct {
TokenType string `json:"token_type"`
}

func (c *ConfigLoader) fetchNewConfigToken(clientId, clientSecret, audience, grantType, endpoint string) (*cnfAuthResponse, error) {
func (c *ConfigLoader) fetchNewConfigToken(clientID, clientSecret, audience, grantType, endpoint string) (*cnfAuthResponse, error) {
requestBody, err := json.Marshal(map[string]string{
"client_id": clientId,
"client_id": clientID,
"client_secret": clientSecret,
"audience": audience,
"grant_type": grantType,
Expand All @@ -192,9 +190,9 @@ func (c *ConfigLoader) fetchNewConfigToken(clientId, clientSecret, audience, gra
return nil, err
}
if res.StatusCode != 200 {
b, err := io.ReadAll(res.Body)
if err != nil {
return nil, err
b, err2 := io.ReadAll(res.Body)
if err2 != nil {
return nil, err2
}
return nil, fmt.Errorf("not authorized. status=%v, err=%v", res.Status, string(b))
}
Expand All @@ -205,4 +203,4 @@ func (c *ConfigLoader) fetchNewConfigToken(clientId, clientSecret, audience, gra
return nil, err
}
return response, nil
}
}
11 changes: 6 additions & 5 deletions internal/config_loader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
var _ = Describe("The config loader", func() {
It("should load config", func() {
c := &ConfigLoader{}
//LoadLogger("console", "test", "debug")
// LoadLogger("console", "test", "debug")
c.loadConfig = c.loadFile
cfg := &Config{ConfigLocation: "../testdata/config.json"}
res := c.update(cfg)
Expand All @@ -28,7 +28,7 @@ var _ = Describe("The config loader", func() {

It("should map all fields correctly", func() {
c := &ConfigLoader{}
//LoadLogger("console", "test", "debug")
// LoadLogger("console", "test", "debug")
c.loadConfig = c.loadFile
cfg := &Config{ConfigLocation: "../testdata/config.json"}
res := c.update(cfg)
Expand All @@ -40,12 +40,12 @@ var _ = Describe("The config loader", func() {
Expect(cfg.DsMappings[0].SourceConfig[Schema]).To(Equal("datahub"))
Expect(cfg.DsMappings[0].SourceConfig[Database]).To(Equal("raw"))
Expect(cfg.DsMappings[0].SourceConfig[RawColumn]).To(Equal("DB_ENTITY"))
//Expect(cfg.DsMappings[0].SourceConfig[DefaultType]).To(Equal("http://data.mimiro.io/Enthusiasm"))
// Expect(cfg.DsMappings[0].SourceConfig[DefaultType]).To(Equal("http://data.mimiro.io/Enthusiasm"))
})

It("should unpack datahub content format", func() {
c := &ConfigLoader{}
//LoadLogger("console", "test", "debug")
// LoadLogger("console", "test", "debug")
c.loadConfig = c.loadFile
cfg := &Config{ConfigLocation: "../testdata/content.json"}
res := c.update(cfg)
Expand All @@ -55,4 +55,5 @@ var _ = Describe("The config loader", func() {
Expect(cfg.DsMappings[0].DatasetName).To(Equal("customer"))
Expect(cfg.DsMappings[0].SourceConfig[TableName]).To(Equal("customers"))
})
})
})

47 changes: 23 additions & 24 deletions internal/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,38 +18,42 @@ type Dataset struct {
log zerolog.Logger
sf *Snowflake
lock sync.Mutex
//m statsd.ClientInterface
// m statsd.ClientInterface
}

func NewDataset(cfg *Config, sf *Snowflake) *Dataset {
return &Dataset{
cfg: cfg,
log: LOG.With().Str("logger", "dataset").Logger(),
sf: sf,
//m: m,
// m: m,
}
}

func (ds *Dataset) WriteFs(ctx context.Context, info dsInfo, reader io.Reader) error {
var stage string
if info.fsStart && info.fsId != "" {
if info.fsStart && info.fsID != "" {
var err error
stage, err = ds.sf.mkStage(info.fsId, info.name)
stage, err = ds.sf.mkStage(info.fsID, info.name)
if err != nil {
refreshed, err2 := ds.tryRefresh(err)
if err2 != nil {
return err2
}
if refreshed {
stage, err = ds.sf.mkStage(info.fsId, info.name)
stage, err = ds.sf.mkStage(info.fsID, info.name)
if err != nil {
ds.log.Error().Err(err).Str("stage", stage).Msg("Failed to create stage, even after login refresh")
return err
}
} else {
ds.log.Error().Str("stage", stage).Msg("Failed to create stage, even after login refresh")
ds.log.Error().Err(err).Str("stage", stage).Msg("Failed to create stage, tis is not a refresh issue")
return err
}
}
ds.log.Info().Str("stage", stage).Msg("Created stage")
} else {
stage = ds.sf.getStage(info.fsId, info.name)
stage = ds.sf.getStage(info.fsID, info.name)
}
var batchSize int64 = 50000
if s, ok := ctx.Value("batchSize").(int64); ok {
Expand All @@ -72,8 +76,7 @@ func (ds *Dataset) WriteFs(ctx context.Context, info dsInfo, reader io.Reader) e
read++
}
if read == batchSize {
var err error
err = ds.safePut(info.name, stage, entityContext, entities)
err := ds.safePut(info.name, stage, entityContext, entities)
if err != nil {
return err
}
Expand Down Expand Up @@ -176,17 +179,15 @@ func (ds *Dataset) safePut(dataset string, stage string, entityContext *uda.Cont
}
if refreshed {
if _, err3 := ds.sf.putEntities(dataset, stage, entities, entityContext); err3 != nil {
if err3 != nil {
return err3 // give up at this point
}
return err3 // give up at this point
}

} else {
return err
}
}
return nil
}

func (ds *Dataset) safeEnsureStageAndPut(ctx context.Context, dataset string, entityContext *uda.Context, entities []*Entity, files []string) (int64, []*Entity, []string, error) {
if f, err := ds.sf.EnsureStageAndPut(ctx, dataset, entityContext, entities); err != nil {
refreshed, err2 := ds.tryRefresh(err)
Expand All @@ -195,14 +196,11 @@ func (ds *Dataset) safeEnsureStageAndPut(ctx context.Context, dataset string, en
return 0, nil, nil, err2
}
if refreshed {
if f, err3 := ds.sf.EnsureStageAndPut(ctx, dataset, entityContext, entities); err3 != nil {
if err3 != nil {
return 0, nil, nil, err3 // give up at this point
}
if f2, err3 := ds.sf.EnsureStageAndPut(ctx, dataset, entityContext, entities); err3 != nil {
return 0, nil, nil, err3 // give up at this point
} else {
files = append(files, f...)
files = append(files, f2...)
}

} else {
return 0, nil, nil, err
}
Expand Down Expand Up @@ -257,11 +255,12 @@ func (ds *Dataset) ReadAll(ctx context.Context, writer io.Writer, dsInfo dsInfo)
}

const (
TableName = "table_name"
Schema = "schema"
Database = "database"
RawColumn = "raw_column"
//DefaultType = "default_type"
TableName = "table_name"
Schema = "schema"
Database = "database"
RawColumn = "raw_column"
SinceColumn = "since_column"
// DefaultType = "default_type"
)

// if there is no read config for the given dataset name, make an attempt
Expand Down
Loading

0 comments on commit 52dd0e4

Please sign in to comment.