Skip to content

Commit

Permalink
feat(store): consistent gets for read replica (#7923)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Dyszkiewicz <jakub.dyszkiewicz@gmail.com>
  • Loading branch information
jakubdyszkiewicz authored Oct 2, 2023
1 parent 5eb4547 commit 5071c89
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pkg/core/resources/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ var ErrSkipUpsert = errors.New("don't do upsert")
func Upsert(ctx context.Context, manager ResourceManager, key model.ResourceKey, resource model.Resource, fn func(resource model.Resource) error, fs ...UpsertFunc) error {
upsert := func(ctx context.Context) error {
create := false
err := manager.Get(ctx, resource, store.GetBy(key))
err := manager.Get(ctx, resource, store.GetBy(key), store.GetConsistent())
if err != nil {
if store.IsResourceNotFound(err) {
create = true
Expand Down
14 changes: 11 additions & 3 deletions pkg/core/resources/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ func NewDeleteAllOptions(fs ...DeleteAllOptionsFunc) *DeleteAllOptions {
}

type GetOptions struct {
Name string
Mesh string
Version string
Name string
Mesh string
Version string
Consistent bool
}

type GetOptionsFunc func(*GetOptions)
Expand Down Expand Up @@ -146,6 +147,13 @@ func GetByVersion(version string) GetOptionsFunc {
}
}

// GetConsistent forces consistency if storage provides eventual consistency like read replica for Postgres.
func GetConsistent() GetOptionsFunc {
return func(opts *GetOptions) {
opts.Consistent = true
}
}

func (g *GetOptions) HashCode() string {
return fmt.Sprintf("%s:%s", g.Name, g.Mesh)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/plugins/resources/postgres/pgx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ func (r *pgxResourceStore) Get(ctx context.Context, resource core_model.Resource
opts := store.NewGetOptions(fs...)

statement := `SELECT spec, version, creation_time, modification_time FROM resources WHERE name=$1 AND mesh=$2 AND type=$3;`
row := r.pickRoPool().QueryRow(ctx, statement, opts.Name, opts.Mesh, resource.Descriptor().Name)
pool := r.pickRoPool()
if opts.Consistent {
pool = r.pool
}
row := pool.QueryRow(ctx, statement, opts.Name, opts.Mesh, resource.Descriptor().Name)

var spec string
var version int
Expand Down

0 comments on commit 5071c89

Please sign in to comment.