Skip to content

Commit

Permalink
Merge pull request #582 from bryanhuhta/mysql-revision-quantization
Browse files Browse the repository at this point in the history
Implement revision quantization for MySQL
  • Loading branch information
ecordell authored May 4, 2022
2 parents a86dccf + 2000d3e commit 2fe0a05
Show file tree
Hide file tree
Showing 8 changed files with 464 additions and 241 deletions.
2 changes: 2 additions & 0 deletions internal/datastore/common/revisions/optimized.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (dat
lastRevision := cor.lastQuantizedRevision.get()
if localNow.Before(lastRevision.validThrough) {
log.Debug().Time("now", localNow).Time("valid", lastRevision.validThrough).Msg("returning cached revision")
span.AddEvent("returning cached revision")
return lastRevision.revision, nil
}

lastQuantizedRevision, err, _ := cor.updateGroup.Do("", func() (interface{}, error) {
log.Debug().Time("now", localNow).Time("valid", lastRevision.validThrough).Msg("computing new revision")
span.AddEvent("computing new revision")

optimized, validFor, err := cor.optimizedFunc(ctx)
if err != nil {
Expand Down
267 changes: 60 additions & 207 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package mysql
import (
"context"
"database/sql"
"errors"
"fmt"
"math/rand"
"time"

sq "github.com/Masterminds/squirrel"
Expand All @@ -14,14 +12,14 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"
"github.com/shopspring/decimal"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

"github.com/authzed/spicedb/internal/datastore"
"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/common/revisions"
"github.com/authzed/spicedb/internal/datastore/mysql/migrations"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)
Expand All @@ -41,8 +39,6 @@ const (
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"

errRevision = "unable to find revision: %w"
errCheckRevision = "unable to check revision: %w"
errUnableToInstantiate = "unable to instantiate datastore: %w"
errUnableToQueryTuples = "unable to query tuples: %w"
errUnableToWriteTuples = "unable to write tuples: %w"
Expand Down Expand Up @@ -123,32 +119,65 @@ func NewMySQLDatastore(uri string, options ...Option) (*Datastore, error) {

// used for seeding the initial relation_tuple_transaction. using INSERT IGNORE on a known
// ID value makes this idempotent (i.e. safe to execute concurrently).
createBaseTxn := fmt.Sprintf("INSERT IGNORE INTO %s (id) VALUES (1)", driver.RelationTupleTransaction())
createBaseTxn := fmt.Sprintf("INSERT IGNORE INTO %s (id, timestamp) VALUES (1, FROM_UNIXTIME(1))", driver.RelationTupleTransaction())

gcCtx, cancelGc := context.WithCancel(context.Background())
querySplitter := common.TupleQuerySplitter{
Executor: newMySQLExecutor(db),
UsersetBatchSize: config.splitAtUsersetCount,
}

store := &Datastore{
db: db,
driver: driver,
url: uri,
revisionFuzzingTimedelta: config.revisionFuzzingTimedelta,
gcWindowInverted: -1 * config.gcWindow,
gcInterval: config.gcInterval,
gcMaxOperationTime: config.gcMaxOperationTime,
gcCtx: gcCtx,
cancelGc: cancelGc,
watchBufferLength: config.watchBufferLength,
createTxn: createTxn,
createBaseTxn: createBaseTxn,
QueryBuilder: queryBuilder,
querySplitter: &querySplitter,
analyzeBeforeStats: config.analyzeBeforeStats,
maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())*
config.maxRevisionStalenessPercent) * time.Nanosecond

quantizationPeriodNanos := config.revisionQuantization.Nanoseconds()
if quantizationPeriodNanos < 1 {
quantizationPeriodNanos = 1
}

gcWindowInverted := -1 * config.gcWindow

revisionQuery := fmt.Sprintf(
querySelectRevision,
colID,
driver.RelationTupleTransaction(),
colTimestamp,
quantizationPeriodNanos,
)

validTransactionQuery := fmt.Sprintf(
queryValidTransaction,
colID,
driver.RelationTupleTransaction(),
colTimestamp,
gcWindowInverted.Seconds(),
)

store := &Datastore{
db: db,
driver: driver,
url: uri,
revisionQuantization: config.revisionQuantization,
gcWindowInverted: gcWindowInverted,
gcInterval: config.gcInterval,
gcMaxOperationTime: config.gcMaxOperationTime,
gcCtx: gcCtx,
cancelGc: cancelGc,
watchBufferLength: config.watchBufferLength,
optimizedRevisionQuery: revisionQuery,
validTransactionQuery: validTransactionQuery,
createTxn: createTxn,
createBaseTxn: createBaseTxn,
QueryBuilder: queryBuilder,
querySplitter: &querySplitter,
analyzeBeforeStats: config.analyzeBeforeStats,
CachedOptimizedRevisions: revisions.NewCachedOptimizedRevisions(
maxRevisionStaleness,
),
}

store.SetOptimizedRevisionFunc(store.optimizedRevisionFunc)

ctx, cancel := context.WithTimeout(context.Background(), seedingTimeout)
defer cancel()
err = store.seedDatabase(ctx)
Expand Down Expand Up @@ -237,11 +266,14 @@ type Datastore struct {
url string
analyzeBeforeStats bool

revisionFuzzingTimedelta time.Duration
gcWindowInverted time.Duration
gcInterval time.Duration
gcMaxOperationTime time.Duration
watchBufferLength uint16
revisionQuantization time.Duration
gcWindowInverted time.Duration
gcInterval time.Duration
gcMaxOperationTime time.Duration
watchBufferLength uint16

optimizedRevisionQuery string
validTransactionQuery string

gcGroup *errgroup.Group
gcCtx context.Context
Expand All @@ -251,6 +283,7 @@ type Datastore struct {
createBaseTxn string

*QueryBuilder
*revisions.CachedOptimizedRevisions
}

// Close closes the data store.
Expand Down Expand Up @@ -538,186 +571,6 @@ func (mds *Datastore) seedDatabase(ctx context.Context) error {
return nil
}

func (mds *Datastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
// implementation deviates slightly from PSQL implementation in order to support
// database seeding in runtime, instead of through migrate command
ctx, span := tracer.Start(ctx, "HeadRevision")
defer span.End()

revision, err := mds.loadRevision(ctx)
if err != nil {
return datastore.NoRevision, err
}
if revision == 0 {
return datastore.NoRevision, nil
}

return revisionFromTransaction(revision), nil
}

func (mds *Datastore) OptimizedRevision(ctx context.Context) (datastore.Revision, error) {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(ctx, "OptimizedRevision")
defer span.End()

lower, upper, err := mds.computeRevisionRange(ctx, -1*mds.revisionFuzzingTimedelta)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
}

if errors.Is(err, sql.ErrNoRows) {
revision, err := mds.loadRevision(ctx)
if err != nil {
return datastore.NoRevision, err
}

return revisionFromTransaction(revision), nil
}

if upper-lower == 0 {
return revisionFromTransaction(upper), nil
}

return revisionFromTransaction(uint64(rand.Intn(int(upper-lower))) + lower), nil
}

func (mds *Datastore) CheckRevision(ctx context.Context, revision datastore.Revision) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(ctx, "CheckRevision")
defer span.End()

revisionTx := transactionFromRevision(revision)

lower, upper, err := mds.computeRevisionRange(ctx, mds.gcWindowInverted)
if err == nil {
if revisionTx < lower {
return datastore.NewInvalidRevisionErr(revision, datastore.RevisionStale)
} else if revisionTx > upper {
return datastore.NewInvalidRevisionErr(revision, datastore.RevisionInFuture)
}

return nil
}

if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf(errCheckRevision, err)
}

// There are no unexpired rows
query, args, err := mds.GetLastRevision.ToSql()
if err != nil {
return fmt.Errorf(errCheckRevision, err)
}

var highest uint64
err = mds.db.QueryRowContext(
datastore.SeparateContextWithTracing(ctx), query, args...,
).Scan(&highest)
if errors.Is(err, sql.ErrNoRows) {
return datastore.NewInvalidRevisionErr(revision, datastore.CouldNotDetermineRevision)
}
if err != nil {
return fmt.Errorf(errCheckRevision, err)
}

if revisionTx < highest {
return datastore.NewInvalidRevisionErr(revision, datastore.RevisionStale)
} else if revisionTx > highest {
return datastore.NewInvalidRevisionErr(revision, datastore.RevisionInFuture)
}

return nil
}

func (mds *Datastore) loadRevision(ctx context.Context) (uint64, error) {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
// slightly changed to support no revisions at all, needed for runtime seeding of first transaction
ctx, span := tracer.Start(ctx, "loadRevision")
defer span.End()

query, args, err := mds.GetLastRevision.ToSql()
if err != nil {
return 0, fmt.Errorf(errRevision, err)
}

var revision sql.NullInt64
err = mds.db.QueryRowContext(datastore.SeparateContextWithTracing(ctx), query, args...).Scan(&revision)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, fmt.Errorf(errRevision, err)
}
if !revision.Valid {
// there are no rows in the relation tuple transaction table
return 0, nil
}

return uint64(revision.Int64), nil
}

// different from PSQL's implementation - it avoids one extra query.
// this could be potentially applied to the PSQL implementation as well
func (mds *Datastore) computeRevisionRange(ctx context.Context, windowInverted time.Duration) (uint64, uint64, error) {
ctx, span := tracer.Start(ctx, "computeRevisionRange")
defer span.End()

// .6f supports up to microsecond resolution window
timestampQuery := fmt.Sprintf("%s >= TIMESTAMPADD(SECOND, %.6f, NOW(6))", colTimestamp, windowInverted.Seconds())
query, args, err := mds.GetRevisionRange.Where(timestampQuery).Limit(1).ToSql()
if err != nil {
return 0, 0, err
}

var lower, upper sql.NullInt64
err = mds.db.QueryRowContext(
datastore.SeparateContextWithTracing(ctx), query, args...,
).Scan(&lower, &upper)
if err != nil {
return 0, 0, err
}

span.AddEvent("DB returned revision range")

if !lower.Valid || !upper.Valid {
return 0, 0, sql.ErrNoRows
}

return uint64(lower.Int64), uint64(upper.Int64), nil
}

func (mds *Datastore) createNewTransaction(ctx context.Context, tx *sql.Tx) (newTxnID uint64, err error) {
ctx, span := tracer.Start(ctx, "createNewTransaction")
defer span.End()

createQuery := mds.createTxn
if err != nil {
return 0, fmt.Errorf("createNewTransaction: %w", err)
}

result, err := tx.ExecContext(ctx, createQuery)
if err != nil {
return 0, fmt.Errorf("createNewTransaction: %w", err)
}

lastInsertID, err := result.LastInsertId()
if err != nil {
return 0, fmt.Errorf("createNewTransaction: failed to get last inserted id: %w", err)
}

return uint64(lastInsertID), nil
}

// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
func revisionFromTransaction(txID uint64) datastore.Revision {
return decimal.NewFromInt(int64(txID))
}

// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
func transactionFromRevision(revision datastore.Revision) uint64 {
return uint64(revision.IntPart())
}

// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
func filterToLivingObjects(original sq.SelectBuilder, revision datastore.Revision) sq.SelectBuilder {
return original.Where(sq.LtOrEq{colCreatedTxn: transactionFromRevision(revision)}).
Expand Down
Loading

0 comments on commit 2fe0a05

Please sign in to comment.