Skip to content

Commit

Permalink
Revert "materialize-redshift: compress checkpoints"
Browse files Browse the repository at this point in the history
This reverts commit 17bef61.
  • Loading branch information
williamhbaker committed Oct 2, 2024
1 parent 17bef61 commit 47e2a5a
Show file tree
Hide file tree
Showing 10 changed files with 98 additions and 254 deletions.
3 changes: 2 additions & 1 deletion materialize-motherduck/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
stdsql "database/sql"
"encoding/base64"
"errors"
"fmt"
"net/http"
Expand Down Expand Up @@ -201,7 +202,7 @@ func (c *client) ExecStatements(ctx context.Context, statements []string) error
}

func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
return sql.StdInstallFence(ctx, c.db, checkpoints, fence)
return sql.StdInstallFence(ctx, c.db, checkpoints, fence, base64.StdEncoding.DecodeString)
}

func (c *client) Close() {
Expand Down
3 changes: 2 additions & 1 deletion materialize-mysql/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
stdsql "database/sql"
"encoding/base64"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -173,7 +174,7 @@ func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) er
}

func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
return sql.StdInstallFence(ctx, c.db, checkpoints, fence)
return sql.StdInstallFence(ctx, c.db, checkpoints, fence, base64.StdEncoding.DecodeString)
}

func (c *client) ExecStatements(ctx context.Context, statements []string) error {
Expand Down
3 changes: 2 additions & 1 deletion materialize-postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"context"
stdsql "database/sql"
"encoding/base64"
"errors"
"fmt"
"net"
Expand Down Expand Up @@ -178,7 +179,7 @@ func (c *client) ExecStatements(ctx context.Context, statements []string) error
}

func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
return sql.StdInstallFence(ctx, c.db, checkpoints, fence)
return sql.StdInstallFence(ctx, c.db, checkpoints, fence, base64.StdEncoding.DecodeString)
}

func (c *client) Close() {
Expand Down
9 changes: 9 additions & 0 deletions materialize-redshift/.snapshots/TestSQLGeneration
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ WHERE "a-schema".key_value.key1 = r.key1 AND "a-schema".key_value.key2 = r.key2

--- End "a-schema".delta_updates deleteQuery ---

--- Begin Fence Update ---
UPDATE path."to".checkpoints
SET checkpoint = 'AAECAwQFBgcICQ=='
WHERE materialization = 'some/Materialization'
AND key_begin = 1122867
AND key_end = 4293844428
AND fence = 123;
--- End Fence Update ---

--- Begin "a-schema".key_value createLoadTable (no varchar length) ---
CREATE TEMPORARY TABLE flow_temp_table_0 (
key1 BIGINT,
Expand Down
234 changes: 43 additions & 191 deletions materialize-redshift/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"context"
stdsql "database/sql"
"encoding/base64"
"encoding/hex"
"errors"
"fmt"
"io"
Expand All @@ -23,8 +24,9 @@ import (
pf "github.com/estuary/flow/go/protocols/flow"
"github.com/google/uuid"
"github.com/jackc/pgconn"
"github.com/jackc/pgx/v5"
log "github.com/sirupsen/logrus"

_ "github.com/jackc/pgx/v5/stdlib"
)

var _ sql.SchemaManager = (*client)(nil)
Expand Down Expand Up @@ -81,12 +83,14 @@ func (c *client) PutSpec(ctx context.Context, updateSpec sql.MetaSpecsUpdate) er
return fmt.Errorf("application logic error - specBytes was not a spec: %w", err)
}

compressed, err := compressBytes(specBytes)
if err != nil {
var gzb bytes.Buffer
w := gzip.NewWriter(&gzb)
if _, err := w.Write(specBytes); err != nil {
return fmt.Errorf("compressing spec bytes: %w", err)
} else if err := w.Close(); err != nil {
return fmt.Errorf("closing gzip writer: %w", err)
}

updateSpec.Parameters[1] = base64.StdEncoding.EncodeToString(compressed)
updateSpec.Parameters[1] = base64.StdEncoding.EncodeToString(gzb.Bytes())

_, err = c.db.ExecContext(ctx, updateSpec.ParameterizedQuery, updateSpec.Parameters...)
return err
Expand Down Expand Up @@ -255,46 +259,55 @@ func preReqs(ctx context.Context, conf any, tenant string) *sql.PrereqErr {
}

func (c *client) FetchSpecAndVersion(ctx context.Context, specs sql.Table, materialization pf.Materialization) (string, string, error) {
var version, spec string

if err := c.db.QueryRowContext(
ctx,
fmt.Sprintf(
"SELECT version, FROM_VARBYTE(spec, 'utf8') FROM %s WHERE materialization = %s;",
specs.Identifier,
specs.Keys[0].Placeholder,
),
materialization.String(),
).Scan(&version, &spec); err != nil {
specHex, version, err := sql.StdFetchSpecAndVersion(ctx, c.db, specs, materialization)
if err != nil {
return "", "", err
}

if specBytes, err := base64.StdEncoding.DecodeString(spec); err != nil {
specBytesFromHex, err := hex.DecodeString(specHex)
if err != nil {
return "", "", fmt.Errorf("hex.DecodeString: %w", err)
}

specBytes, err := base64.StdEncoding.DecodeString(string(specBytesFromHex))
if err != nil {
return "", "", fmt.Errorf("base64.DecodeString: %w", err)
} else if specBytes, err = maybeDecompressBytes(specBytes); err != nil {
return "", "", fmt.Errorf("decompressing spec: %w", err)
}

// Handle specs that were persisted prior to compressing their byte content, as well as current
// specs that are compressed.
if specBytes[0] == 0x1f && specBytes[1] == 0x8b { // Valid gzip header bytes
if r, err := gzip.NewReader(bytes.NewReader(specBytes)); err != nil {
return "", "", err
} else if specBytes, err = io.ReadAll(r); err != nil {
return "", "", fmt.Errorf("reading compressed specBytes: %w", err)
}
} else {
return base64.StdEncoding.EncodeToString(specBytes), version, nil
// Legacy spec that hasn't been re-persisted yet.
log.Info("loaded uncompressed spec")
}

return base64.StdEncoding.EncodeToString(specBytes), version, nil
}

func (c *client) ExecStatements(ctx context.Context, statements []string) error {
return c.withDB(func(db *stdsql.DB) error { return sql.StdSQLExecStatements(ctx, db, statements) })
}

func (c *client) InstallFence(ctx context.Context, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
if err := c.withDB(func(db *stdsql.DB) error {
var err = c.withDB(func(db *stdsql.DB) error {
var err error
fence, err = installFence(ctx, db, checkpoints, fence)
if err != nil {
return fmt.Errorf("installing fence: %w", err)
}
return nil
}); err != nil {
return sql.Fence{}, err
}
fence, err = sql.StdInstallFence(ctx, db, checkpoints, fence, func(fenceHex string) ([]byte, error) {
fenceHexBytes, err := hex.DecodeString(fenceHex)
if err != nil {
return nil, err
}

return fence, nil
return base64.StdEncoding.DecodeString(string(fenceHexBytes))
})
return err
})
return fence, err
}

func (c *client) Close() {
Expand All @@ -309,164 +322,3 @@ func (c *client) withDB(fn func(*stdsql.DB) error) error {
defer db.Close()
return fn(db)
}

func compressBytes(b []byte) ([]byte, error) {
var gzb bytes.Buffer
w := gzip.NewWriter(&gzb)
if _, err := w.Write(b); err != nil {
return nil, fmt.Errorf("compressing bytes: %w", err)
} else if err := w.Close(); err != nil {
return nil, fmt.Errorf("closing gzip writer: %w", err)
}
return gzb.Bytes(), nil
}

func maybeDecompressBytes(b []byte) ([]byte, error) {
if b[0] == 0x1f && b[1] == 0x8b { // Valid gzip header bytes
var out bytes.Buffer
if r, err := gzip.NewReader(bytes.NewReader(b)); err != nil {
return nil, fmt.Errorf("decompressing bytes: %w", err)
} else if _, err = io.Copy(&out, r); err != nil {
return nil, fmt.Errorf("reading decompressed bytes: %w", err)
} else if err := r.Close(); err != nil {
return nil, fmt.Errorf("closing gzip reader: %w", err)
}
return out.Bytes(), nil
} else {
log.Info("loaded uncompressed bytes")
return b, nil
}
}

// installFence is a modified version of sql.StdInstallFence that handles
// compression of the checkpoint and reading varbyte values from Redshift.
func installFence(ctx context.Context, db *stdsql.DB, checkpoints sql.Table, fence sql.Fence) (sql.Fence, error) {
// TODO(whb): With the historical usage of sql.StdInstallFence, we were actually
// base64 encoding the checkpoint bytes and then sending that UTF8 string to
// Redshift, which stores those characters as bytes in the VARBYTE column. A
// slightly more direct & efficient way to handle this would be to store the
// bytes directly using TO_VARBYTE(checkpoint, 'base64'). This would require
// handling for the pre-existing checkpoints that were encoded in the previous
// way, and is not being implemented right now.
var txn, err = db.BeginTx(ctx, nil)
if err != nil {
return sql.Fence{}, fmt.Errorf("db.BeginTx: %w", err)
}
defer func() {
if txn != nil {
_ = txn.Rollback()
}
}()

// Increment the fence value of _any_ checkpoint which overlaps our key range.
if _, err = txn.Exec(
fmt.Sprintf(`
UPDATE %s
SET fence=fence+1
WHERE materialization=%s
AND key_end>=%s
AND key_begin<=%s
;
`,
checkpoints.Identifier,
checkpoints.Keys[0].Placeholder,
checkpoints.Keys[1].Placeholder,
checkpoints.Keys[2].Placeholder,
),
fence.Materialization,
fence.KeyBegin,
fence.KeyEnd,
); err != nil {
return sql.Fence{}, fmt.Errorf("incrementing fence: %w", err)
}

// Read the checkpoint with the narrowest [key_begin, key_end] which fully overlaps our range.
var readBegin, readEnd uint32
var checkpoint string

if err = txn.QueryRow(
fmt.Sprintf(`
SELECT fence, key_begin, key_end, FROM_VARBYTE(checkpoint, 'utf8')
FROM %s
WHERE materialization=%s
AND key_begin<=%s
AND key_end>=%s
ORDER BY key_end - key_begin ASC
LIMIT 1
;
`,
checkpoints.Identifier,
checkpoints.Keys[0].Placeholder,
checkpoints.Keys[1].Placeholder,
checkpoints.Keys[2].Placeholder,
),
fence.Materialization,
fence.KeyBegin,
fence.KeyEnd,
).Scan(&fence.Fence, &readBegin, &readEnd, &checkpoint); err == stdsql.ErrNoRows {
// Set an invalid range, which compares as unequal to trigger an insertion below.
readBegin, readEnd = 1, 0
} else if err != nil {
return sql.Fence{}, fmt.Errorf("scanning fence and checkpoint: %w", err)
} else if base64Bytes, err := base64.StdEncoding.DecodeString(checkpoint); err != nil {
return sql.Fence{}, fmt.Errorf("base64.Decode(string(decompressed)): %w", err)
} else if fence.Checkpoint, err = maybeDecompressBytes(base64Bytes); err != nil {
return sql.Fence{}, fmt.Errorf("maybeDecompressBytes(fenceHexBytes): %w", err)
}

// If a checkpoint for this exact range doesn't exist then insert it now.
if readBegin == fence.KeyBegin && readEnd == fence.KeyEnd {
// Exists; no-op.
} else if compressedCheckpoint, err := compressBytes(fence.Checkpoint); err != nil {
return sql.Fence{}, fmt.Errorf("compressing checkpoint: %w", err)
} else if _, err = txn.Exec(
fmt.Sprintf(
"INSERT INTO %s (materialization, key_begin, key_end, fence, checkpoint) VALUES (%s, %s, %s, %s, %s);",
checkpoints.Identifier,
checkpoints.Keys[0].Placeholder,
checkpoints.Keys[1].Placeholder,
checkpoints.Keys[2].Placeholder,
checkpoints.Values[0].Placeholder,
checkpoints.Values[1].Placeholder,
),
fence.Materialization,
fence.KeyBegin,
fence.KeyEnd,
fence.Fence,
base64.StdEncoding.EncodeToString(compressedCheckpoint),
); err != nil {
return sql.Fence{}, fmt.Errorf("inserting fence: %w", err)
}

err = txn.Commit()
txn = nil // Disable deferred rollback.

if err != nil {
return sql.Fence{}, fmt.Errorf("txn.Commit: %w", err)
}
return fence, nil
}

// updateFence updates a fence and reports if the materialization instance was
// fenced off. It handles compression of the checkpoint, and is used instead of
// the typical templated fence update query because of that.
func updateFence(ctx context.Context, txn pgx.Tx, dialect sql.Dialect, fence sql.Fence) error {
if compressedCheckpoint, err := compressBytes(fence.Checkpoint); err != nil {
return fmt.Errorf("compressing checkpoint: %w", err)
} else if res, err := txn.Exec(ctx, fmt.Sprintf(
"UPDATE %s SET checkpoint = $1 WHERE materialization = $2 AND key_begin = $3 AND key_end = $4 AND fence = $5;",
dialect.Identifier(fence.TablePath...),
),
base64.StdEncoding.EncodeToString(compressedCheckpoint),
fence.Materialization,
fence.KeyBegin,
fence.KeyEnd,
fence.Fence,
); err != nil {
return fmt.Errorf("fetching fence update rows: %w", err)
} else if res.RowsAffected() != 1 {
return fmt.Errorf("this instance was fenced off by another")
}

return nil
}
19 changes: 15 additions & 4 deletions materialize-redshift/driver.go
Original file line number Diff line number Diff line change
Expand Up @@ -739,11 +739,20 @@ func (d *transactor) Store(it *m.StoreIterator) (m.StartCommitFunc, error) {
return nil, m.FinishedOperation(fmt.Errorf("marshalling checkpoint: %w", err))
}

return nil, pf.RunAsyncOperation(func() error { return d.commit(ctx, varcharColumnUpdates) })
var fenceUpdate strings.Builder
if err := d.templates.updateFence.Execute(&fenceUpdate, d.fence); err != nil {
return nil, m.FinishedOperation(fmt.Errorf("evaluating fence update template: %w", err))
}

return nil, pf.RunAsyncOperation(func() error { return d.commit(ctx, fenceUpdate.String(), varcharColumnUpdates) })
}, nil
}

func (d *transactor) commit(ctx context.Context, varcharColumnUpdates map[string][]string) error {
func (d *transactor) commit(
ctx context.Context,
fenceUpdate string,
varcharColumnUpdates map[string][]string,
) error {
defer func() {
for _, b := range d.bindings {
// Arrange to clean up any staged files once this commit attempt is
Expand Down Expand Up @@ -873,8 +882,10 @@ func (d *transactor) commit(ctx context.Context, varcharColumnUpdates map[string

log.Info("store: finished encoding and uploading of files")

if err := updateFence(ctx, txn, d.dialect, d.fence); err != nil {
return err
if fenceRes, err := txn.Exec(ctx, fenceUpdate); err != nil {
return fmt.Errorf("fetching fence update rows: %w", err)
} else if fenceRes.RowsAffected() != 1 {
return errors.New("this instance was fenced off by another")
} else if err := txn.Commit(ctx); err != nil {
return fmt.Errorf("committing store transaction: %w", err)
}
Expand Down
Loading

0 comments on commit 47e2a5a

Please sign in to comment.