Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement revision quantization for MySQL #582

Merged
merged 9 commits into from
May 4, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/datastore/common/revisions/optimized.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,13 @@ func (cor *CachedOptimizedRevisions) OptimizedRevision(ctx context.Context) (dat
lastRevision := cor.lastQuantizedRevision.get()
if localNow.Before(lastRevision.validThrough) {
log.Debug().Time("now", localNow).Time("valid", lastRevision.validThrough).Msg("returning cached revision")
span.AddEvent("returning cached revision")
return lastRevision.revision, nil
}

lastQuantizedRevision, err, _ := cor.updateGroup.Do("", func() (interface{}, error) {
log.Debug().Time("now", localNow).Time("valid", lastRevision.validThrough).Msg("computing new revision")
span.AddEvent("computing new revision")

optimized, validFor, err := cor.optimizedFunc(ctx)
if err != nil {
Expand Down
267 changes: 60 additions & 207 deletions internal/datastore/mysql/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,7 @@ package mysql
import (
"context"
"database/sql"
"errors"
"fmt"
"math/rand"
"time"

sq "github.com/Masterminds/squirrel"
Expand All @@ -14,14 +12,14 @@ import (
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"github.com/rs/zerolog/log"
"github.com/shopspring/decimal"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"

"github.com/authzed/spicedb/internal/datastore"
"github.com/authzed/spicedb/internal/datastore/common"
"github.com/authzed/spicedb/internal/datastore/common/revisions"
"github.com/authzed/spicedb/internal/datastore/mysql/migrations"
core "github.com/authzed/spicedb/pkg/proto/core/v1"
)
Expand All @@ -41,8 +39,6 @@ const (
colUsersetObjectID = "userset_object_id"
colUsersetRelation = "userset_relation"

errRevision = "unable to find revision: %w"
errCheckRevision = "unable to check revision: %w"
errUnableToInstantiate = "unable to instantiate datastore: %w"
errUnableToQueryTuples = "unable to query tuples: %w"
errUnableToWriteTuples = "unable to write tuples: %w"
Expand Down Expand Up @@ -123,32 +119,65 @@ func NewMySQLDatastore(uri string, options ...Option) (*Datastore, error) {

// used for seeding the initial relation_tuple_transaction. using INSERT IGNORE on a known
// ID value makes this idempotent (i.e. safe to execute concurrently).
createBaseTxn := fmt.Sprintf("INSERT IGNORE INTO %s (id) VALUES (1)", driver.RelationTupleTransaction())
createBaseTxn := fmt.Sprintf("INSERT IGNORE INTO %s (id, timestamp) VALUES (1, FROM_UNIXTIME(1))", driver.RelationTupleTransaction())

gcCtx, cancelGc := context.WithCancel(context.Background())
querySplitter := common.TupleQuerySplitter{
Executor: newMySQLExecutor(db),
UsersetBatchSize: config.splitAtUsersetCount,
}

store := &Datastore{
db: db,
driver: driver,
url: uri,
revisionFuzzingTimedelta: config.revisionFuzzingTimedelta,
gcWindowInverted: -1 * config.gcWindow,
gcInterval: config.gcInterval,
gcMaxOperationTime: config.gcMaxOperationTime,
gcCtx: gcCtx,
cancelGc: cancelGc,
watchBufferLength: config.watchBufferLength,
createTxn: createTxn,
createBaseTxn: createBaseTxn,
QueryBuilder: queryBuilder,
querySplitter: &querySplitter,
analyzeBeforeStats: config.analyzeBeforeStats,
maxRevisionStaleness := time.Duration(float64(config.revisionQuantization.Nanoseconds())*
config.maxRevisionStalenessPercent) * time.Nanosecond

quantizationPeriodNanos := config.revisionQuantization.Nanoseconds()
if quantizationPeriodNanos < 1 {
quantizationPeriodNanos = 1
}

gcWindowInverted := -1 * config.gcWindow

revisionQuery := fmt.Sprintf(
querySelectRevision,
colID,
driver.RelationTupleTransaction(),
colTimestamp,
quantizationPeriodNanos,
)

validTransactionQuery := fmt.Sprintf(
queryValidTransaction,
colID,
driver.RelationTupleTransaction(),
colTimestamp,
gcWindowInverted.Seconds(),
)

store := &Datastore{
db: db,
driver: driver,
url: uri,
revisionQuantization: config.revisionQuantization,
gcWindowInverted: gcWindowInverted,
gcInterval: config.gcInterval,
gcMaxOperationTime: config.gcMaxOperationTime,
gcCtx: gcCtx,
cancelGc: cancelGc,
watchBufferLength: config.watchBufferLength,
optimizedRevisionQuery: revisionQuery,
validTransactionQuery: validTransactionQuery,
createTxn: createTxn,
createBaseTxn: createBaseTxn,
QueryBuilder: queryBuilder,
querySplitter: &querySplitter,
analyzeBeforeStats: config.analyzeBeforeStats,
CachedOptimizedRevisions: revisions.NewCachedOptimizedRevisions(
maxRevisionStaleness,
),
}

store.SetOptimizedRevisionFunc(store.optimizedRevisionFunc)

ctx, cancel := context.WithTimeout(context.Background(), seedingTimeout)
defer cancel()
err = store.seedDatabase(ctx)
Expand Down Expand Up @@ -237,11 +266,14 @@ type Datastore struct {
url string
analyzeBeforeStats bool

revisionFuzzingTimedelta time.Duration
gcWindowInverted time.Duration
gcInterval time.Duration
gcMaxOperationTime time.Duration
watchBufferLength uint16
revisionQuantization time.Duration
gcWindowInverted time.Duration
gcInterval time.Duration
gcMaxOperationTime time.Duration
watchBufferLength uint16

optimizedRevisionQuery string
validTransactionQuery string

gcGroup *errgroup.Group
gcCtx context.Context
Expand All @@ -251,6 +283,7 @@ type Datastore struct {
createBaseTxn string

*QueryBuilder
*revisions.CachedOptimizedRevisions
}

// Close closes the data store.
Expand Down Expand Up @@ -538,186 +571,6 @@ func (mds *Datastore) seedDatabase(ctx context.Context) error {
return nil
}

func (mds *Datastore) HeadRevision(ctx context.Context) (datastore.Revision, error) {
// implementation deviates slightly from PSQL implementation in order to support
// database seeding in runtime, instead of through migrate command
ctx, span := tracer.Start(ctx, "HeadRevision")
defer span.End()

revision, err := mds.loadRevision(ctx)
if err != nil {
return datastore.NoRevision, err
}
if revision == 0 {
return datastore.NoRevision, nil
}

return revisionFromTransaction(revision), nil
}

func (mds *Datastore) OptimizedRevision(ctx context.Context) (datastore.Revision, error) {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(ctx, "OptimizedRevision")
defer span.End()

lower, upper, err := mds.computeRevisionRange(ctx, -1*mds.revisionFuzzingTimedelta)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return datastore.NoRevision, fmt.Errorf(errRevision, err)
}

if errors.Is(err, sql.ErrNoRows) {
revision, err := mds.loadRevision(ctx)
if err != nil {
return datastore.NoRevision, err
}

return revisionFromTransaction(revision), nil
}

if upper-lower == 0 {
return revisionFromTransaction(upper), nil
}

return revisionFromTransaction(uint64(rand.Intn(int(upper-lower))) + lower), nil
}

func (mds *Datastore) CheckRevision(ctx context.Context, revision datastore.Revision) error {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
ctx, span := tracer.Start(ctx, "CheckRevision")
defer span.End()

revisionTx := transactionFromRevision(revision)

lower, upper, err := mds.computeRevisionRange(ctx, mds.gcWindowInverted)
if err == nil {
if revisionTx < lower {
return datastore.NewInvalidRevisionErr(revision, datastore.RevisionStale)
} else if revisionTx > upper {
return datastore.NewInvalidRevisionErr(revision, datastore.RevisionInFuture)
}

return nil
}

if !errors.Is(err, sql.ErrNoRows) {
return fmt.Errorf(errCheckRevision, err)
}

// There are no unexpired rows
query, args, err := mds.GetLastRevision.ToSql()
if err != nil {
return fmt.Errorf(errCheckRevision, err)
}

var highest uint64
err = mds.db.QueryRowContext(
datastore.SeparateContextWithTracing(ctx), query, args...,
).Scan(&highest)
if errors.Is(err, sql.ErrNoRows) {
return datastore.NewInvalidRevisionErr(revision, datastore.CouldNotDetermineRevision)
}
if err != nil {
return fmt.Errorf(errCheckRevision, err)
}

if revisionTx < highest {
return datastore.NewInvalidRevisionErr(revision, datastore.RevisionStale)
} else if revisionTx > highest {
return datastore.NewInvalidRevisionErr(revision, datastore.RevisionInFuture)
}

return nil
}

func (mds *Datastore) loadRevision(ctx context.Context) (uint64, error) {
// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
// slightly changed to support no revisions at all, needed for runtime seeding of first transaction
ctx, span := tracer.Start(ctx, "loadRevision")
defer span.End()

query, args, err := mds.GetLastRevision.ToSql()
if err != nil {
return 0, fmt.Errorf(errRevision, err)
}

var revision sql.NullInt64
err = mds.db.QueryRowContext(datastore.SeparateContextWithTracing(ctx), query, args...).Scan(&revision)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return 0, nil
}
return 0, fmt.Errorf(errRevision, err)
}
if !revision.Valid {
// there are no rows in the relation tuple transaction table
return 0, nil
}

return uint64(revision.Int64), nil
}

// different from PSQL's implementation - it avoids one extra query.
// this could be potentially applied to the PSQL implementation as well
func (mds *Datastore) computeRevisionRange(ctx context.Context, windowInverted time.Duration) (uint64, uint64, error) {
ctx, span := tracer.Start(ctx, "computeRevisionRange")
defer span.End()

// .6f supports up to microsecond resolution window
timestampQuery := fmt.Sprintf("%s >= TIMESTAMPADD(SECOND, %.6f, NOW(6))", colTimestamp, windowInverted.Seconds())
query, args, err := mds.GetRevisionRange.Where(timestampQuery).Limit(1).ToSql()
if err != nil {
return 0, 0, err
}

var lower, upper sql.NullInt64
err = mds.db.QueryRowContext(
datastore.SeparateContextWithTracing(ctx), query, args...,
).Scan(&lower, &upper)
if err != nil {
return 0, 0, err
}

span.AddEvent("DB returned revision range")

if !lower.Valid || !upper.Valid {
return 0, 0, sql.ErrNoRows
}

return uint64(lower.Int64), uint64(upper.Int64), nil
}

func (mds *Datastore) createNewTransaction(ctx context.Context, tx *sql.Tx) (newTxnID uint64, err error) {
ctx, span := tracer.Start(ctx, "createNewTransaction")
defer span.End()

createQuery := mds.createTxn
if err != nil {
return 0, fmt.Errorf("createNewTransaction: %w", err)
}

result, err := tx.ExecContext(ctx, createQuery)
if err != nil {
return 0, fmt.Errorf("createNewTransaction: %w", err)
}

lastInsertID, err := result.LastInsertId()
if err != nil {
return 0, fmt.Errorf("createNewTransaction: failed to get last inserted id: %w", err)
}

return uint64(lastInsertID), nil
}

// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
func revisionFromTransaction(txID uint64) datastore.Revision {
return decimal.NewFromInt(int64(txID))
}

// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
func transactionFromRevision(revision datastore.Revision) uint64 {
return uint64(revision.IntPart())
}

// TODO (@vroldanbet) dupe from postgres datastore - need to refactor
func filterToLivingObjects(original sq.SelectBuilder, revision datastore.Revision) sq.SelectBuilder {
return original.Where(sq.LtOrEq{colCreatedTxn: transactionFromRevision(revision)}).
Expand Down
Loading