Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Backport/1.8.x] Backport #10073 #10103

Merged
merged 2 commits into from
Apr 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .circleci/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ jobs:
- checkout
- add_ssh_keys: # needs a key to push updated static asset commit back to github
fingerprints:
- "3d:6b:98:55:78:4e:52:17:4e:17:ba:f3:bf:0b:96:2a"
- "fc:55:84:15:0a:1d:c8:e9:06:d0:e8:9c:7b:a9:b7:31"
- attach_workspace:
at: .
- run:
Expand All @@ -583,9 +583,9 @@ jobs:
# check if there are any changes in ui-v2/
# if there are, we commit the ui static asset file
# HEAD^! is shorthand for HEAD^..HEAD (parent of HEAD and HEAD)
if ! git diff --quiet --exit-code HEAD^! ui-v2/; then
git config --local user.email "hashicorp-ci@users.noreply.github.com"
git config --local user.name "hashicorp-ci"
if ! git diff --quiet --exit-code HEAD^! ui/; then
git config --local user.email "github-team-consul-core@hashicorp.com"
git config --local user.name "hc-github-team-consul-core"

short_sha=$(git rev-parse --short HEAD)
git add agent/bindata_assetfs.go
Expand Down Expand Up @@ -741,7 +741,7 @@ jobs:
- checkout
- add_ssh_keys: # needs a key to push cherry-picked commits back to github
fingerprints:
- "3d:6b:98:55:78:4e:52:17:4e:17:ba:f3:bf:0b:96:2a"
- "fc:55:84:15:0a:1d:c8:e9:06:d0:e8:9c:7b:a9:b7:31"
- run: .circleci/scripts/cherry-picker.sh

trigger-oss-merge:
Expand Down
4 changes: 2 additions & 2 deletions .circleci/scripts/cherry-picker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,8 @@ fi

# loop through all labels on the PR
for label in $labels; do
git config --local user.email "hashicorp-ci@users.noreply.github.com"
git config --local user.name "hashicorp-ci"
git config --local user.email "github-team-consul-core@hashicorp.com"
git config --local user.name "hc-github-team-consul-core"
status "checking label: $label"
# TODO: enable this when replatform is merged into stable-website
# if the label matches docs-cherrypick, it will attempt to cherry-pick to stable-website
Expand Down
25 changes: 22 additions & 3 deletions agent/consul/leader.go
Original file line number Diff line number Diff line change
Expand Up @@ -790,10 +790,19 @@ func (s *Server) runLegacyACLReplication(ctx context.Context) error {
}

if err != nil {
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
0,
)
lastRemoteIndex = 0
s.updateACLReplicationStatusError()
legacyACLLogger.Warn("Legacy ACL replication error (will retry if still leader)", "error", err)
} else {
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "status"},
1,
)
metrics.SetGauge([]string{"leader", "replication", "acl-legacy", "index"},
float32(index),
)
lastRemoteIndex = index
s.updateACLReplicationStatusIndex(structs.ACLReplicateLegacy, index)
legacyACLLogger.Debug("Legacy ACL replication completed through remote index", "index", index)
Expand Down Expand Up @@ -851,23 +860,23 @@ type replicateFunc func(ctx context.Context, logger hclog.Logger, lastRemoteInde
func (s *Server) runACLPolicyReplicator(ctx context.Context) error {
policyLogger := s.aclReplicationLogger(structs.ACLReplicatePolicies.SingularNoun())
policyLogger.Info("started ACL Policy replication")
return s.runACLReplicator(ctx, policyLogger, structs.ACLReplicatePolicies, s.replicateACLPolicies)
return s.runACLReplicator(ctx, policyLogger, structs.ACLReplicatePolicies, s.replicateACLPolicies, "acl-policies")
}

// This function is only intended to be run as a managed go routine, it will block until
// the context passed in indicates that it should exit.
func (s *Server) runACLRoleReplicator(ctx context.Context) error {
roleLogger := s.aclReplicationLogger(structs.ACLReplicateRoles.SingularNoun())
roleLogger.Info("started ACL Role replication")
return s.runACLReplicator(ctx, roleLogger, structs.ACLReplicateRoles, s.replicateACLRoles)
return s.runACLReplicator(ctx, roleLogger, structs.ACLReplicateRoles, s.replicateACLRoles, "acl-roles")
}

// This function is only intended to be run as a managed go routine, it will block until
// the context passed in indicates that it should exit.
func (s *Server) runACLTokenReplicator(ctx context.Context) error {
tokenLogger := s.aclReplicationLogger(structs.ACLReplicateTokens.SingularNoun())
tokenLogger.Info("started ACL Token replication")
return s.runACLReplicator(ctx, tokenLogger, structs.ACLReplicateTokens, s.replicateACLTokens)
return s.runACLReplicator(ctx, tokenLogger, structs.ACLReplicateTokens, s.replicateACLTokens, "acl-tokens")
}

// This function is only intended to be run as a managed go routine, it will block until
Expand All @@ -877,6 +886,7 @@ func (s *Server) runACLReplicator(
logger hclog.Logger,
replicationType structs.ACLReplicationType,
replicateFunc replicateFunc,
metricName string,
) error {
var failedAttempts uint
limiter := rate.NewLimiter(rate.Limit(s.config.ACLReplicationRate), s.config.ACLReplicationBurst)
Expand All @@ -897,6 +907,9 @@ func (s *Server) runACLReplicator(
}

if err != nil {
metrics.SetGauge([]string{"leader", "replication", metricName, "status"},
0,
)
lastRemoteIndex = 0
s.updateACLReplicationStatusError()
logger.Warn("ACL replication error (will retry if still leader)",
Expand All @@ -913,6 +926,12 @@ func (s *Server) runACLReplicator(
// do nothing
}
} else {
metrics.SetGauge([]string{"leader", "replication", metricName, "status"},
1,
)
metrics.SetGauge([]string{"leader", "replication", metricName, "index"},
float32(index),
)
lastRemoteIndex = index
s.updateACLReplicationStatusIndex(replicationType, index)
logger.Debug("ACL replication completed through remote index",
Expand Down
19 changes: 19 additions & 0 deletions agent/consul/replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (

type ReplicatorDelegate interface {
Replicate(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (index uint64, exit bool, err error)
MetricName() string
}

type ReplicatorConfig struct {
Expand Down Expand Up @@ -104,6 +105,9 @@ func (r *Replicator) Run(ctx context.Context) error {
}

if err != nil {
metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "status"},
0,
)
// reset the lastRemoteIndex when there is an RPC failure. This should cause a full sync to be done during
// the next round of replication
atomic.StoreUint64(&r.lastRemoteIndex, 0)
Expand All @@ -112,6 +116,12 @@ func (r *Replicator) Run(ctx context.Context) error {
r.logger.Warn("replication error (will retry if still leader)", "error", err)
}
} else {
metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "status"},
1,
)
metrics.SetGauge([]string{"leader", "replication", r.delegate.MetricName(), "index"},
float32(index),
)
atomic.StoreUint64(&r.lastRemoteIndex, index)
r.logger.Debug("replication completed through remote index", "index", index)
}
Expand All @@ -134,6 +144,11 @@ type ReplicatorFunc func(ctx context.Context, lastRemoteIndex uint64, logger hcl

type FunctionReplicator struct {
ReplicateFn ReplicatorFunc
Name string
}

func (r *FunctionReplicator) MetricName() string {
return r.Name
}

func (r *FunctionReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (uint64, bool, error) {
Expand Down Expand Up @@ -177,6 +192,10 @@ type IndexReplicator struct {
Logger hclog.Logger
}

func (r *IndexReplicator) MetricName() string {
return r.Delegate.MetricName()
}

func (r *IndexReplicator) Replicate(ctx context.Context, lastRemoteIndex uint64, _ hclog.Logger) (uint64, bool, error) {
fetchStart := time.Now()
lenRemote, remote, remoteIndex, err := r.Delegate.FetchRemote(lastRemoteIndex)
Expand Down
1 change: 1 addition & 0 deletions agent/consul/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ func TestReplicationRestart(t *testing.T) {
ReplicateFn: func(ctx context.Context, lastRemoteIndex uint64, logger hclog.Logger) (uint64, bool, error) {
return 1, false, nil
},
Name: "foo",
},

Rate: 1,
Expand Down
2 changes: 1 addition & 1 deletion agent/consul/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -412,7 +412,7 @@ func NewServer(config *Config, options ...ConsulOption) (*Server, error) {

configReplicatorConfig := ReplicatorConfig{
Name: logging.ConfigEntry,
Delegate: &FunctionReplicator{ReplicateFn: s.replicateConfig},
Delegate: &FunctionReplicator{ReplicateFn: s.replicateConfig, Name: "config-entries"},
Rate: s.config.ConfigReplicationRate,
Burst: s.config.ConfigReplicationBurst,
Logger: s.logger,
Expand Down