Skip to content

Commit

Permalink
Return serialized cursor along with unserialized cursor
Browse files Browse the repository at this point in the history
  • Loading branch information
notfelineit committed Apr 25, 2024
1 parent aec18df commit 75ca3f5
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 16 deletions.
19 changes: 15 additions & 4 deletions cmd/airbyte-source/read.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"os"

"github.com/planetscale/airbyte-source/cmd/internal"
psdbconnectv1alpha1 "github.com/planetscale/airbyte-source/proto/psdbconnect/v1alpha1"
"github.com/spf13/cobra"
)

Expand Down Expand Up @@ -107,10 +108,20 @@ func ReadCommand(ch *Helper) *cobra.Command {
}

for shardName, shardState := range streamState.Shards {
tc := shardState.Cursor
if err != nil {
ch.Logger.Error(fmt.Sprintf("invalid cursor for stream %v, failed with [%v]", streamStateKey, err))
os.Exit(1)
var tc *psdbconnectv1alpha1.TableCursor

if shardState.UnserializedCursor != nil {
// If the user specified an unserialized cursor starting point,
// prefer that over the serialized cursor
tc = shardState.UnserializedCursor
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("using unserialized cursor for stream %s", streamStateKey))
} else {
tc, err = shardState.SerializedCursorToTableCursor(table)
ch.Logger.Log(internal.LOGLEVEL_INFO, fmt.Sprintf("using serialized cursor for stream %s", streamStateKey))
if err != nil {
ch.Logger.Error(fmt.Sprintf("invalid serialized cursor for stream %v, failed with [%v]", streamStateKey, err))
os.Exit(1)
}
}

sc, err := ch.Database.Read(context.Background(), cmd.OutOrStdout(), psc, table, tc)
Expand Down
16 changes: 10 additions & 6 deletions cmd/internal/planetscale_connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,17 @@ func (psc PlanetScaleSource) GetInitialState(keyspaceOrDatabase string, shards [
}

for _, shard := range shards {
shardCursors.Shards[shard] = &SerializedCursor{
Cursor: &psdbconnect.TableCursor{
Shard: shard,
Keyspace: keyspaceOrDatabase,
Position: "",
},
cursor, _ := TableCursorToSerializedCursor(&psdbconnect.TableCursor{
Shard: shard,
Keyspace: keyspaceOrDatabase,
Position: "",
})
cursor.UnserializedCursor = &psdbconnect.TableCursor{
Shard: shard,
Keyspace: keyspaceOrDatabase,
Position: "",
}
shardCursors.Shards[shard] = cursor
}

return shardCursors, nil
Expand Down
11 changes: 6 additions & 5 deletions cmd/internal/planetscale_edge_database.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ func (p PlanetScaleEdgeDatabase) ListShards(ctx context.Context, psc PlanetScale
func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps PlanetScaleSource, s ConfiguredStream, lastKnownPosition *psdbconnect.TableCursor) (*SerializedCursor, error) {
var (
err error
sErr error
currentSerializedCursor *SerializedCursor
)

Expand All @@ -190,17 +191,17 @@ func (p PlanetScaleEdgeDatabase) Read(ctx context.Context, w io.Writer, ps Plane
// the current vgtid is the same as the last synced vgtid, no new rows.
if latestCursorPosition == currentPosition.Position {
p.Logger.Log(LOGLEVEL_INFO, preamble+"no new rows found, exiting")
return &SerializedCursor{
Cursor: currentPosition,
}, nil
return TableCursorToSerializedCursor(currentPosition)
}
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf("new rows found, syncing rows for %v", readDuration))
p.Logger.Log(LOGLEVEL_INFO, fmt.Sprintf(preamble+"syncing rows with cursor [%v]", currentPosition))

currentPosition, err = p.sync(ctx, currentPosition, latestCursorPosition, table, ps, tabletType, readDuration)
if currentPosition.Position != "" {
currentSerializedCursor = &SerializedCursor{
Cursor: currentPosition,
currentSerializedCursor, sErr = TableCursorToSerializedCursor(currentPosition)
if sErr != nil {
// if we failed to serialize here, we should bail.
return currentSerializedCursor, errors.Wrap(sErr, "unable to serialize current position")
}
}
if err != nil {
Expand Down
3 changes: 2 additions & 1 deletion cmd/internal/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func TableCursorToSerializedCursor(cursor *psdbconnect.TableCursor) (*Serialized
}

sc := &SerializedCursor{
Cursor: base64.StdEncoding.EncodeToString(d),
Cursor: base64.StdEncoding.EncodeToString(d),
UnserializedCursor: cursor,
}
return sc, nil
}
Expand Down

0 comments on commit 75ca3f5

Please sign in to comment.