diff --git a/pkg/core/resources/manager/manager.go b/pkg/core/resources/manager/manager.go index c3d4ec1f2315..8148248628f2 100644 --- a/pkg/core/resources/manager/manager.go +++ b/pkg/core/resources/manager/manager.go @@ -130,7 +130,7 @@ var ErrSkipUpsert = errors.New("don't do upsert") func Upsert(ctx context.Context, manager ResourceManager, key model.ResourceKey, resource model.Resource, fn func(resource model.Resource) error, fs ...UpsertFunc) error { upsert := func(ctx context.Context) error { create := false - err := manager.Get(ctx, resource, store.GetBy(key)) + err := manager.Get(ctx, resource, store.GetBy(key), store.GetConsistent()) if err != nil { if store.IsResourceNotFound(err) { create = true diff --git a/pkg/core/resources/store/options.go b/pkg/core/resources/store/options.go index 95154dd83102..573f8d48c7fe 100644 --- a/pkg/core/resources/store/options.go +++ b/pkg/core/resources/store/options.go @@ -114,9 +114,10 @@ func NewDeleteAllOptions(fs ...DeleteAllOptionsFunc) *DeleteAllOptions { } type GetOptions struct { - Name string - Mesh string - Version string + Name string + Mesh string + Version string + Consistent bool } type GetOptionsFunc func(*GetOptions) @@ -146,6 +147,13 @@ func GetByVersion(version string) GetOptionsFunc { } } +// GetConsistent forces consistency if storage provides eventual consistency like read replica for Postgres. +func GetConsistent() GetOptionsFunc { + return func(opts *GetOptions) { + opts.Consistent = true + } +} + func (g *GetOptions) HashCode() string { return fmt.Sprintf("%s:%s", g.Name, g.Mesh) } diff --git a/pkg/plugins/resources/postgres/pgx_store.go b/pkg/plugins/resources/postgres/pgx_store.go index f8fa0396d896..73937b120723 100644 --- a/pkg/plugins/resources/postgres/pgx_store.go +++ b/pkg/plugins/resources/postgres/pgx_store.go @@ -166,7 +166,11 @@ func (r *pgxResourceStore) Get(ctx context.Context, resource core_model.Resource opts := store.NewGetOptions(fs...) statement := `SELECT spec, version, creation_time, modification_time FROM resources WHERE name=$1 AND mesh=$2 AND type=$3;` - row := r.pickRoPool().QueryRow(ctx, statement, opts.Name, opts.Mesh, resource.Descriptor().Name) + pool := r.pickRoPool() + if opts.Consistent { + pool = r.pool + } + row := pool.QueryRow(ctx, statement, opts.Name, opts.Mesh, resource.Descriptor().Name) var spec string var version int