Skip to content

Commit

Permalink
fix: ignore processed relations
Browse files Browse the repository at this point in the history
  • Loading branch information
thoas committed Jun 4, 2024
1 parent 0764311 commit 97ee98b
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 16 deletions.
9 changes: 5 additions & 4 deletions etl/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,10 +187,11 @@ func (e *Engine) newLoader() *loader {

func (e *Engine) newExtractor() *extractor {
return &extractor{
extract: make(extract),
schema: e.schema,
dialect: e.dialect,
logger: e.logger,
extract: make(extract),
schema: e.schema,
dialect: e.dialect,
logger: e.logger,
processedRelations: make(map[string]struct{}),
}
}

Expand Down
42 changes: 30 additions & 12 deletions etl/extractor.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@ type (
extract map[string]entry
entry map[string]resultSet
extractor struct {
extract extract
dialect dialect.Dialect
schema map[string]config.Schema
logger *zap.Logger
extract extract
dialect dialect.Dialect
schema map[string]config.Schema
logger *zap.Logger
processedRelations map[string]struct{}
}
)

Expand Down Expand Up @@ -57,8 +58,7 @@ func (e *extractor) handleReferenceKeys(ctx context.Context, depth int, table di
Where(lk.Condition(referenceKey.ColumnName).Equal(value)).
Query()

e.logger.Debug(depthF(depth+1, "Fetch reference key"),
zap.String("reference_key", fmt.Sprintf("%s = %v", referenceKey, value)),
e.logger.Debug(depthF(depth+1, fmt.Sprintf("Fetch reference key %s = %v", referenceKey, value)),
zap.String("table_name", table.Name),
)

Expand Down Expand Up @@ -93,30 +93,46 @@ func (e *extractor) handleRow(ctx context.Context, depth int, table dialect.Tabl

primaryKey := primaryKeys[0]

e.logger.Debug(depthF(depth, "Retrieve relation"),
zap.String("relation", fmt.Sprintf("%s = %v", primaryKey, row[primaryKey.Name])))
relationKey := fmt.Sprintf("%s = %v", primaryKey, row[primaryKey.Name])

if _, ok := e.processedRelations[relationKey]; ok {
e.logger.Debug(depthF(depth, fmt.Sprintf("Relation %s already processed", relationKey)))
return nil
}

e.processedRelations[relationKey] = struct{}{}
e.logger.Debug(depthF(depth, fmt.Sprintf("Retrieve relation %s", relationKey)))

for k, v := range row {
if v == nil {
continue
}

if foreignKey, ok := foreignKeys[k]; ok {
foreignRelationKey := fmt.Sprintf("%s = %v", foreignKey, v)

if _, ok := e.processedRelations[foreignRelationKey]; ok {
e.logger.Debug(depthF(depth, fmt.Sprintf("Foreign relation %s already processed", foreignRelationKey)))
continue
}

e.logger.Debug(depthF(depth+1, fmt.Sprintf("Fetch foreign key %s = %v", foreignKey, v)))
query, args := lk.Select(lk.Raw("*")).
From(foreignKey.ReferencedTable.Name).
Where(lk.Condition(foreignKey.ReferencedColumnName).Equal(v)).
Query()

e.logger.Debug(depthF(depth+1, "Fetch foreign key"),
zap.String("foreign_key", fmt.Sprintf("%s = %v", foreignKey, v)))

if _, err := e.handle(ctx, depth+2, e.schema[foreignKey.ReferencedTable.Name], query, args...); err != nil {
return fmt.Errorf("unable to handle table %s from foreign key %s: %w", foreignKey.ReferencedTable.Name, foreignKey.Name, err)
}
}
}

return e.handleReferenceKeys(ctx, depth, table, row)
if err := e.handleReferenceKeys(ctx, depth, table, row); err != nil {
return err
}

return nil
}

func (e *extractor) Handle(ctx context.Context, schema config.Schema, query string, args ...interface{}) (extract, error) {
Expand Down Expand Up @@ -144,6 +160,8 @@ func (e *extractor) handle(ctx context.Context, depth int, schema config.Schema,
return nil, fmt.Errorf("unable to retrieve results: %w", err)
}

e.logger.Debug(depthF(depth, fmt.Sprintf("-> %d results", len(results))))

e.extract[tableName][cacheKey] = results

for i := range results {
Expand Down

0 comments on commit 97ee98b

Please sign in to comment.