diff --git a/materialize-redshift/driver.go b/materialize-redshift/driver.go index c3f9389d5..45f051ee3 100644 --- a/materialize-redshift/driver.go +++ b/materialize-redshift/driver.go @@ -812,9 +812,10 @@ func (d *transactor) commit(ctx context.Context, varcharColumnUpdates map[string } for _, b := range d.bindings { - if b.hasDeletes || b.hasStores { - d.be.StartedResourceCommit(b.target.Path) + if !b.hasDeletes && !b.hasStores { + continue } + d.be.StartedResourceCommit(b.target.Path) if b.hasDeletes { // Create the temporary table for staging values to delete from the target table. @@ -831,30 +832,28 @@ func (d *transactor) commit(ctx context.Context, varcharColumnUpdates map[string } } - if !b.hasStores { - d.be.FinishedResourceCommit(b.target.Path) - continue - } - - if b.mustMerge { - // Create the temporary table for staging values to merge into the target table. - // Redshift actually supports transactional DDL for creating tables, so this can be - // executed within the transaction. - if _, err := txn.Exec(ctx, b.createStoreTableSQL); err != nil { - return fmt.Errorf("creating store table: %w", err) - } + if b.hasStores { + if b.mustMerge { + // Create the temporary table for staging values to merge into the target table. + // Redshift actually supports transactional DDL for creating tables, so this can be + // executed within the transaction. + if _, err := txn.Exec(ctx, b.createStoreTableSQL); err != nil { + return fmt.Errorf("creating store table: %w", err) + } - if _, err := txn.Exec(ctx, b.copyIntoMergeTableSQL); err != nil { - return handleCopyIntoErr(ctx, txn, d.cfg.Bucket, b.storeFile.prefix, b.target.Identifier, err) - } else if _, err := txn.Exec(ctx, b.mergeIntoSQL); err != nil { - return fmt.Errorf("merging to table '%s': %w", b.target.Identifier, err) - } - } else { - // Can copy directly into the target table since all values are new. - if _, err := txn.Exec(ctx, b.copyIntoTargetTableSQL); err != nil { - return handleCopyIntoErr(ctx, txn, d.cfg.Bucket, b.storeFile.prefix, b.target.Identifier, err) + if _, err := txn.Exec(ctx, b.copyIntoMergeTableSQL); err != nil { + return handleCopyIntoErr(ctx, txn, d.cfg.Bucket, b.storeFile.prefix, b.target.Identifier, err) + } else if _, err := txn.Exec(ctx, b.mergeIntoSQL); err != nil { + return fmt.Errorf("merging to table '%s': %w", b.target.Identifier, err) + } + } else { + // Can copy directly into the target table since all values are new. + if _, err := txn.Exec(ctx, b.copyIntoTargetTableSQL); err != nil { + return handleCopyIntoErr(ctx, txn, d.cfg.Bucket, b.storeFile.prefix, b.target.Identifier, err) + } } } + d.be.FinishedResourceCommit(b.target.Path) }