Skip to content

Commit

Permalink
materialize-redshift: fix logging of Store bindings with no data
Browse files Browse the repository at this point in the history
We were always calling `FinishedResourceCommit` for bindings even if they had no
data, which was resulting in bogus log lines. This fixes the logic to not invoke
the logging mechanisms for bindings with no data.
  • Loading branch information
williamhbaker committed Oct 8, 2024
1 parent 420733a commit df74b8f
Showing 1 changed file with 22 additions and 23 deletions.
45 changes: 22 additions & 23 deletions materialize-redshift/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -812,9 +812,10 @@ func (d *transactor) commit(ctx context.Context, varcharColumnUpdates map[string
}

for _, b := range d.bindings {
if b.hasDeletes || b.hasStores {
d.be.StartedResourceCommit(b.target.Path)
if !b.hasDeletes && !b.hasStores {
continue
}
d.be.StartedResourceCommit(b.target.Path)

if b.hasDeletes {
// Create the temporary table for staging values to delete from the target table.
Expand All @@ -831,30 +832,28 @@ func (d *transactor) commit(ctx context.Context, varcharColumnUpdates map[string
}
}

if !b.hasStores {
d.be.FinishedResourceCommit(b.target.Path)
continue
}

if b.mustMerge {
// Create the temporary table for staging values to merge into the target table.
// Redshift actually supports transactional DDL for creating tables, so this can be
// executed within the transaction.
if _, err := txn.Exec(ctx, b.createStoreTableSQL); err != nil {
return fmt.Errorf("creating store table: %w", err)
}
if b.hasStores {
if b.mustMerge {
// Create the temporary table for staging values to merge into the target table.
// Redshift actually supports transactional DDL for creating tables, so this can be
// executed within the transaction.
if _, err := txn.Exec(ctx, b.createStoreTableSQL); err != nil {
return fmt.Errorf("creating store table: %w", err)
}

if _, err := txn.Exec(ctx, b.copyIntoMergeTableSQL); err != nil {
return handleCopyIntoErr(ctx, txn, d.cfg.Bucket, b.storeFile.prefix, b.target.Identifier, err)
} else if _, err := txn.Exec(ctx, b.mergeIntoSQL); err != nil {
return fmt.Errorf("merging to table '%s': %w", b.target.Identifier, err)
}
} else {
// Can copy directly into the target table since all values are new.
if _, err := txn.Exec(ctx, b.copyIntoTargetTableSQL); err != nil {
return handleCopyIntoErr(ctx, txn, d.cfg.Bucket, b.storeFile.prefix, b.target.Identifier, err)
if _, err := txn.Exec(ctx, b.copyIntoMergeTableSQL); err != nil {
return handleCopyIntoErr(ctx, txn, d.cfg.Bucket, b.storeFile.prefix, b.target.Identifier, err)
} else if _, err := txn.Exec(ctx, b.mergeIntoSQL); err != nil {
return fmt.Errorf("merging to table '%s': %w", b.target.Identifier, err)
}
} else {
// Can copy directly into the target table since all values are new.
if _, err := txn.Exec(ctx, b.copyIntoTargetTableSQL); err != nil {
return handleCopyIntoErr(ctx, txn, d.cfg.Bucket, b.storeFile.prefix, b.target.Identifier, err)
}
}
}

d.be.FinishedResourceCommit(b.target.Path)
}

Expand Down

0 comments on commit df74b8f

Please sign in to comment.