Skip to content

Commit

Permalink
materialize-snowflake: verify deletion of files succeeds
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Oct 8, 2024
1 parent 18f70c5 commit 0e6f78a
Showing 1 changed file with 18 additions and 11 deletions.
29 changes: 18 additions & 11 deletions materialize-snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (d *transactor) addBinding(target sql.Table) error {

const MaxConcurrentQueries = 5

func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage) error) error {
func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage) error) (returnErr error) {
var ctx = it.Context()

log.Info("load: starting encoding and uploading of files")
Expand Down Expand Up @@ -361,7 +361,9 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)
filesToCleanup = append(filesToCleanup, dir)
}
}
defer d.deleteFiles(ctx, filesToCleanup)
defer func() {
returnErr = d.deleteFiles(ctx, filesToCleanup)
}()

log.Info("load: finished encoding and uploading of files")

Expand Down Expand Up @@ -423,7 +425,7 @@ func (d *transactor) Load(it *m.LoadIterator, loaded func(int, json.RawMessage)

log.Info("load: finished loading")

return nil
return returnErr
}

func (d *transactor) pipeExists(ctx context.Context, pipeName string) (bool, error) {
Expand Down Expand Up @@ -668,7 +670,9 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error
}

log.WithField("table", item.Table).Info("store: finished query")
d.deleteFiles(ctx, []string{item.StagedDir})
if err := d.deleteFiles(ctx, []string{item.StagedDir}); err != nil {
return fmt.Errorf("cleaning up files: %w", err)
}

return nil
})
Expand Down Expand Up @@ -800,7 +804,9 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error
}

// All files have been processed for this pipe, we can skip to the next pipe
d.deleteFiles(ctx, []string{pipe.dir})
if err := d.deleteFiles(ctx, []string{pipe.dir}); err != nil {
return nil, fmt.Errorf("cleaning up files: %w", err)
}
continue
}

Expand All @@ -819,7 +825,9 @@ func (d *transactor) Acknowledge(ctx context.Context) (*pf.ConnectorState, error
}

if len(pipe.files) == 0 {
d.deleteFiles(ctx, []string{pipe.dir})
if err := d.deleteFiles(ctx, []string{pipe.dir}); err != nil {
return nil, fmt.Errorf("cleaning up files: %w", err)
}
continue
} else {
time.Sleep(retryDelay)
Expand Down Expand Up @@ -884,15 +892,14 @@ func (d *transactor) hasStateKey(stateKey string) bool {
return false
}

func (d *transactor) deleteFiles(ctx context.Context, files []string) {
func (d *transactor) deleteFiles(ctx context.Context, files []string) error {
for _, f := range files {
if _, err := d.store.conn.ExecContext(ctx, fmt.Sprintf("REMOVE %s", f)); err != nil {
log.WithFields(log.Fields{
"file": f,
"err": err,
}).Debug("deleteFiles failed")
return err
}
}

return nil
}

func (d *transactor) Destroy() {
Expand Down

0 comments on commit 0e6f78a

Please sign in to comment.