Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): consistent gets for read replica #7923

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/core/resources/manager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var ErrSkipUpsert = errors.New("don't do upsert")
func Upsert(ctx context.Context, manager ResourceManager, key model.ResourceKey, resource model.Resource, fn func(resource model.Resource) error, fs ...UpsertFunc) error {
upsert := func(ctx context.Context) error {
create := false
err := manager.Get(ctx, resource, store.GetBy(key))
err := manager.Get(ctx, resource, store.GetBy(key), store.GetConsistent())
jakubdyszkiewicz marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
if store.IsResourceNotFound(err) {
create = true
Expand Down
14 changes: 11 additions & 3 deletions pkg/core/resources/store/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ func NewDeleteAllOptions(fs ...DeleteAllOptionsFunc) *DeleteAllOptions {
}

type GetOptions struct {
Name string
Mesh string
Version string
Name string
Mesh string
Version string
Consistent bool
}

type GetOptionsFunc func(*GetOptions)
Expand Down Expand Up @@ -146,6 +147,13 @@ func GetByVersion(version string) GetOptionsFunc {
}
}

// GetConsistent forces consistency if storage provides eventual consistency like read replica for Postgres.
func GetConsistent() GetOptionsFunc {
return func(opts *GetOptions) {
opts.Consistent = true
}
}

func (g *GetOptions) HashCode() string {
return fmt.Sprintf("%s:%s", g.Name, g.Mesh)
}
Expand Down
6 changes: 5 additions & 1 deletion pkg/plugins/resources/postgres/pgx_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,11 @@ func (r *pgxResourceStore) Get(ctx context.Context, resource core_model.Resource
opts := store.NewGetOptions(fs...)

statement := `SELECT spec, version, creation_time, modification_time FROM resources WHERE name=$1 AND mesh=$2 AND type=$3;`
row := r.pickRoPool().QueryRow(ctx, statement, opts.Name, opts.Mesh, resource.Descriptor().Name)
pool := r.pickRoPool()
if opts.Consistent {
pool = r.pool
}
row := pool.QueryRow(ctx, statement, opts.Name, opts.Mesh, resource.Descriptor().Name)

var spec string
var version int
Expand Down
Loading