Skip to content

Commit

Permalink
datastore/postgres: write only xid, drop support for ids
Browse files Browse the repository at this point in the history
  • Loading branch information
jakedt committed Sep 28, 2022
1 parent 742b83d commit 2fc97bf
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 28 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package migrations

import (
"context"

"github.com/jackc/pgx/v4"
)

var dropIDStmts = []string{
`ALTER TABLE relation_tuple_transaction
DROP COLUMN id;`,
`ALTER TABLE namespace_config
DROP COLUMN id,
DROP COLUMN created_transaction,
DROP COLUMN deleted_transaction;`,
`ALTER TABLE relation_tuple
DROP COLUMN id,
DROP COLUMN created_transaction,
DROP COLUMN deleted_transaction;`,
}

func init() {
if err := DatabaseMigrations.Register("drop-bigserial-ids", "add-xid-constraints",
noNonatomicMigration,
func(ctx context.Context, tx pgx.Tx) error {
for _, stmt := range dropIDStmts {
if _, err := tx.Exec(ctx, stmt); err != nil {
return err
}
}

return nil
}); err != nil {
panic("failed to register migration: " + err.Error())
}
}
11 changes: 3 additions & 8 deletions internal/datastore/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ const (
colTimestamp = "timestamp"
colNamespace = "namespace"
colConfig = "serialized_config"
colCreatedTxn = "created_transaction"
colDeletedTxn = "deleted_transaction"
colCreatedXid = "created_xid"
colDeletedXid = "deleted_xid"
colSnapshot = "snapshot"
Expand All @@ -53,7 +51,7 @@ const (

errUnableToInstantiate = "unable to instantiate datastore: %w"

createTxn = "INSERT INTO relation_tuple_transaction DEFAULT VALUES RETURNING id, xid"
createTxn = "INSERT INTO relation_tuple_transaction DEFAULT VALUES RETURNING xid"

snapshotAlive = "pg_visible_in_snapshot(%s, (SELECT %s FROM %s WHERE %s = %s)) = %s"

Expand All @@ -67,8 +65,7 @@ const (
pgSerializationFailure = "40001"
pgUniqueConstraintViolation = "23505"

livingTupleConstraintOld = "uq_relation_tuple_living"
livingTupleConstraint = "uq_relation_tuple_living_xid"
livingTupleConstraint = "uq_relation_tuple_living_xid"
)

func init() {
Expand Down Expand Up @@ -270,11 +267,10 @@ func (pgd *pgDatastore) ReadWriteTx(
) (datastore.Revision, error) {
var err error
for i := uint8(0); i <= pgd.maxRetries; i++ {
var newTxnID uint64
var newXID XID8
err = pgd.dbpool.BeginTxFunc(ctx, pgx.TxOptions{IsoLevel: pgx.Serializable}, func(tx pgx.Tx) error {
var err error
newTxnID, newXID, err = createNewTransaction(ctx, tx)
newXID, err = createNewTransaction(ctx, tx)
if err != nil {
return err
}
Expand All @@ -296,7 +292,6 @@ func (pgd *pgDatastore) ReadWriteTx(
},
ctx,
tx,
newTxnID,
newXID,
}

Expand Down
2 changes: 1 addition & 1 deletion internal/datastore/postgres/postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ func TransactionTimestampsTest(t *testing.T, ds datastore.Datastore) {
tx, err := pgd.dbpool.Begin(ctx)
require.NoError(err)

_, txXID, err := createNewTransaction(ctx, tx)
txXID, err := createNewTransaction(ctx, tx)
require.NoError(err)

err = tx.Commit(ctx)
Expand Down
22 changes: 5 additions & 17 deletions internal/datastore/postgres/readwrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ var (
writeNamespace = psql.Insert(tableNamespace).Columns(
colNamespace,
colConfig,
colCreatedTxn,
)

deleteNamespace = psql.Update(tableNamespace).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
Expand All @@ -43,18 +42,16 @@ var (
colUsersetNamespace,
colUsersetObjectID,
colUsersetRelation,
colCreatedTxn,
)

deleteTuple = psql.Update(tableTuple).Where(sq.Eq{colDeletedXid: liveDeletedTxnID})
)

type pgReadWriteTXN struct {
*pgReader
ctx context.Context
tx pgx.Tx
newTxnID uint64
newXID XID8
ctx context.Context
tx pgx.Tx
newXID XID8
}

func (rwt *pgReadWriteTXN) WriteRelationships(mutations []*core.RelationTupleUpdate) error {
Expand Down Expand Up @@ -86,7 +83,6 @@ func (rwt *pgReadWriteTXN) WriteRelationships(mutations []*core.RelationTupleUpd
tpl.Subject.Namespace,
tpl.Subject.ObjectId,
tpl.Subject.Relation,
rwt.newTxnID,
)
bulkWriteHasValues = true
}
Expand All @@ -95,7 +91,6 @@ func (rwt *pgReadWriteTXN) WriteRelationships(mutations []*core.RelationTupleUpd
if len(deleteClauses) > 0 {
sql, args, err := deleteTuple.
Where(deleteClauses).
Set(colDeletedTxn, rwt.newTxnID).
Set(colDeletedXid, rwt.newXID).
ToSql()
if err != nil {
Expand All @@ -120,10 +115,6 @@ func (rwt *pgReadWriteTXN) WriteRelationships(mutations []*core.RelationTupleUpd
return cerr
}

if cerr := pgxcommon.ConvertToWriteConstraintError(livingTupleConstraintOld, err); cerr != nil {
return cerr
}

return fmt.Errorf(errUnableToWriteRelationships, err)
}
}
Expand Down Expand Up @@ -163,7 +154,7 @@ func (rwt *pgReadWriteTXN) DeleteRelationships(filter *v1.RelationshipFilter) er

span.SetAttributes(tracerAttributes...)

sql, args, err := query.Set(colDeletedTxn, rwt.newTxnID).Set(colDeletedXid, rwt.newXID).ToSql()
sql, args, err := query.Set(colDeletedXid, rwt.newXID).ToSql()
if err != nil {
return fmt.Errorf(errUnableToDeleteRelationships, err)
}
Expand Down Expand Up @@ -191,14 +182,13 @@ func (rwt *pgReadWriteTXN) WriteNamespaces(newConfigs ...*core.NamespaceDefiniti
span.AddEvent("Serialized namespace config")

deletedNamespaceClause = append(deletedNamespaceClause, sq.Eq{colNamespace: newNamespace.Name})
writeQuery = writeQuery.Values(newNamespace.Name, serialized, rwt.newTxnID)
writeQuery = writeQuery.Values(newNamespace.Name, serialized)
writtenNamespaceNames = append(writtenNamespaceNames, newNamespace.Name)
}

span.SetAttributes(common.ObjNamespaceNameKey.StringSlice(writtenNamespaceNames))

delSQL, delArgs, err := deleteNamespace.
Set(colDeletedTxn, rwt.newTxnID).
Set(colDeletedXid, rwt.newXID).
Where(sq.And{sq.Eq{colDeletedXid: liveDeletedTxnID}, deletedNamespaceClause}).
ToSql()
Expand Down Expand Up @@ -241,7 +231,6 @@ func (rwt *pgReadWriteTXN) DeleteNamespace(nsName string) error {
}

delSQL, delArgs, err := deleteNamespace.
Set(colDeletedTxn, rwt.newTxnID).
Set(colDeletedXid, rwt.newXID).
Where(sq.Eq{colNamespace: nsName, colCreatedXid: createdAt}).
ToSql()
Expand All @@ -255,7 +244,6 @@ func (rwt *pgReadWriteTXN) DeleteNamespace(nsName string) error {
}

deleteTupleSQL, deleteTupleArgs, err := deleteNamespaceTuples.
Set(colDeletedTxn, rwt.newTxnID).
Set(colDeletedXid, rwt.newXID).
Where(sq.Eq{colNamespace: nsName}).
ToSql()
Expand Down
4 changes: 2 additions & 2 deletions internal/datastore/postgres/revisions.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,10 @@ func transactionFromRevision(revision datastore.Revision) XID8 {
return xid
}

func createNewTransaction(ctx context.Context, tx pgx.Tx) (newTxnID uint64, newXID XID8, err error) {
func createNewTransaction(ctx context.Context, tx pgx.Tx) (newXID XID8, err error) {
ctx, span := tracer.Start(ctx, "createNewTransaction")
defer span.End()

err = tx.QueryRow(ctx, createTxn).Scan(&newTxnID, &newXID)
err = tx.QueryRow(ctx, createTxn).Scan(&newXID)
return
}

0 comments on commit 2fc97bf

Please sign in to comment.