Skip to content

Commit

Permalink
Fix bug in usage metrics that caused a negative count to occur
Browse files Browse the repository at this point in the history
There were a couple of instances were usage metrics would do the wrong
thing and result in incorrect counts, causing the count to attempt to
decrement below zero and return an error. The usage metrics did not
account for various places where a single transaction could
delete/update/add multiple service instances at once.

We also remove the error when attempting to decrement below zero, and
instead just make sure we do not accidentally underflow the unsigned
integer. This is a more graceful failure than returning an error and not
allowing a transaction to commit.
  • Loading branch information
crhino committed Jan 12, 2021
1 parent 3c3b672 commit 23cb383
Show file tree
Hide file tree
Showing 3 changed files with 138 additions and 63 deletions.
128 changes: 70 additions & 58 deletions agent/consul/state/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ const (
// usage metrics that we track.
func updateUsage(tx WriteTxn, changes Changes) error {
usageDeltas := make(map[string]int)
serviceNameChanges := make(map[structs.ServiceName]int)
for _, change := range changes.Changes {
var delta int
if change.Created() {
Expand All @@ -75,65 +76,27 @@ func updateUsage(tx WriteTxn, changes Changes) error {
case "services":
svc := changeObject(change).(*structs.ServiceNode)
usageDeltas[change.Table] += delta
serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.ServiceName, &svc.EnterpriseMeta)
if err != nil {
return err
}
addEnterpriseServiceInstanceUsage(usageDeltas, change)

var serviceState uniqueServiceState
if serviceIter.Next() == nil {
// If no services exist, we know we deleted the last service
// instance.
serviceState = Deleted
usageDeltas[serviceNamesUsageTable] -= 1
} else if serviceIter.Next() == nil {
// If a second call to Next() returns nil, we know only a single
// instance exists. If, in addition, a new service name has been
// registered, either via creating a new service instance or via
// renaming an existing service, than we update our service count.
//
// We only care about two cases here:
// 1. A new service instance has been created with a unique name
// 2. An existing service instance has been updated with a new unique name
//
// These are the only ways a new unique service can be created. The
// other valid cases here: an update that does not change the service
// name, and a deletion, both do not impact the count of unique service
// names in the system.

if change.Created() {
// Given a single existing service instance of the service: If a
// service has just been created, then we know this is a new unique
// service.
serviceState = Created
usageDeltas[serviceNamesUsageTable] += 1
} else if serviceNameChanged(change) {
// Given a single existing service instance of the service: If a
// service has been updated with a new service name, then we know
// this is a new unique service.
serviceState = Created
usageDeltas[serviceNamesUsageTable] += 1

// Check whether the previous name was deleted in this rename, this
// is a special case of renaming a service which does not result in
// changing the count of unique service names.
before := change.Before.(*structs.ServiceNode)
beforeSvc, err := firstWithTxn(tx, servicesTableName, "service", before.ServiceName, &before.EnterpriseMeta)
if err != nil {
return err
}
if beforeSvc == nil {
usageDeltas[serviceNamesUsageTable] -= 1
// set serviceState to NoChange since we have both gained and lost a
// service, cancelling each other out
serviceState = NoChange
}
}
// Construct a mapping of all of the various service names that were
// changed, in order to compare it with the finished memdb state.
// Make sure to account for the fact that services can change their names.
if serviceNameChanged(change) {
serviceNameChanges[svc.CompoundServiceName()] += 1
before := change.Before.(*structs.ServiceNode)
serviceNameChanges[before.CompoundServiceName()] -= 1
} else {
serviceNameChanges[svc.CompoundServiceName()] += delta
}
addEnterpriseServiceUsage(usageDeltas, change, serviceState)
}
}

serviceStates, err := updateServiceNameUsage(tx, usageDeltas, serviceNameChanges)
if err != nil {
return err
}
addEnterpriseServiceUsage(usageDeltas, serviceStates)

idx := changes.Index
// This will happen when restoring from a snapshot, just take the max index
// of the tables we are tracking.
Expand All @@ -144,6 +107,46 @@ func updateUsage(tx WriteTxn, changes Changes) error {
return writeUsageDeltas(tx, idx, usageDeltas)
}

func updateServiceNameUsage(tx WriteTxn, usageDeltas map[string]int, serviceNameChanges map[structs.ServiceName]int) (map[structs.ServiceName]uniqueServiceState, error) {
serviceStates := make(map[structs.ServiceName]uniqueServiceState, len(serviceNameChanges))
for svc, delta := range serviceNameChanges {
serviceIter, err := getWithTxn(tx, servicesTableName, "service", svc.Name, &svc.EnterpriseMeta)
if err != nil {
return nil, err
}

// Count the number of service instances associated with the given service
// name at the end of this transaction, and compare that with how many were
// added/removed during the transaction. This allows us to handle a single
// transaction committing multiple changes related to a single service
// name.
var svcCount int
for service := serviceIter.Next(); service != nil; service = serviceIter.Next() {
svcCount += 1
}

var serviceState uniqueServiceState
switch {
case svcCount == 0:
// If no services exist, we know we deleted the last service
// instance.
serviceState = Deleted
usageDeltas[serviceNamesUsageTable] -= 1
case svcCount == delta:
// If the current number of service instances equals the number added,
// than we know we created a new service name.
serviceState = Created
usageDeltas[serviceNamesUsageTable] += 1
default:
serviceState = NoChange
}

serviceStates[svc] = serviceState
}

return serviceStates, nil
}

// serviceNameChanged returns a boolean that indicates whether the
// provided change resulted in an update to the service's service name.
func serviceNameChanged(change memdb.Change) bool {
Expand All @@ -168,7 +171,11 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error

if u == nil {
if delta < 0 {
return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id)
// Don't return an error here, since we don't want to block updates
// from happening to the state store. But, set the delta to 0 so that
// we do not accidentally underflow the uint64 and begin reporting
// large numbers.
delta = 0
}
err := tx.Insert("usage", &UsageEntry{
ID: id,
Expand All @@ -179,12 +186,17 @@ func writeUsageDeltas(tx WriteTxn, idx uint64, usageDeltas map[string]int) error
return fmt.Errorf("failed to update usage entry: %s", err)
}
} else if cur, ok := u.(*UsageEntry); ok {
if cur.Count+delta < 0 {
return fmt.Errorf("failed to insert usage entry for %q: delta will cause a negative count", id)
updated := cur.Count + delta
if updated < 0 {
// Don't return an error here, since we don't want to block updates
// from happening to the state store. But, set the delta to 0 so that
// we do not accidentally underflow the uint64 and begin reporting
// large numbers.
updated = 0
}
err := tx.Insert("usage", &UsageEntry{
ID: id,
Count: cur.Count + delta,
Count: updated,
Index: idx,
})
if err != nil {
Expand Down
5 changes: 4 additions & 1 deletion agent/consul/state/usage_oss.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@
package state

import (
"github.com/hashicorp/consul/agent/structs"
memdb "github.com/hashicorp/go-memdb"
)

type EnterpriseServiceUsage struct{}

func addEnterpriseServiceUsage(map[string]int, memdb.Change, uniqueServiceState) {}
func addEnterpriseServiceInstanceUsage(map[string]int, memdb.Change) {}

func addEnterpriseServiceUsage(map[string]int, map[structs.ServiceName]uniqueServiceState) {}

func compileEnterpriseUsage(tx ReadTxn, usage ServiceUsage) (ServiceUsage, error) {
return usage, nil
Expand Down
68 changes: 64 additions & 4 deletions agent/consul/state/usage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,43 @@ func TestStateStore_Usage_ServiceUsageEmpty(t *testing.T) {
require.Equal(t, usage.ServiceInstances, 0)
}

func TestStateStore_Usage_ServiceUsage_DeleteNode(t *testing.T) {
s := testStateStore(t)
testRegisterNode(t, s, 1, "node1")

svc1 := &structs.NodeService{
ID: "service1",
Service: "test",
Address: "1.1.1.1",
Port: 1111,
}
svc2 := &structs.NodeService{
ID: "service2",
Service: "test",
Address: "1.1.1.1",
Port: 1111,
}

// Register multiple instances on a single node to test that we do not
// double count deletions within the same transaction.
require.NoError(t, s.EnsureService(1, "node1", svc1))
require.NoError(t, s.EnsureService(2, "node1", svc2))

idx, usage, err := s.ServiceUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(2))
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 2)

require.NoError(t, s.DeleteNode(3, "node1"))

idx, usage, err = s.ServiceUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(3))
require.Equal(t, usage.Services, 0)
require.Equal(t, usage.ServiceInstances, 0)
}

func TestStateStore_Usage_Restore(t *testing.T) {
s := testStateStore(t)
restore := s.Restore()
Expand All @@ -67,12 +104,27 @@ func TestStateStore_Usage_Restore(t *testing.T) {
Address: "198.18.0.2",
},
})
restore.Registration(9, &structs.RegisterRequest{
Node: "test-node",
Service: &structs.NodeService{
ID: "mysql1",
Service: "mysql",
Port: 8081,
Address: "198.18.0.2",
},
})
require.NoError(t, restore.Commit())

idx, count, err := s.NodeCount()
require.NoError(t, err)
require.Equal(t, idx, uint64(9))
require.Equal(t, count, 1)

idx, usage, err := s.ServiceUsage()
require.NoError(t, err)
require.Equal(t, idx, uint64(9))
require.Equal(t, usage.Services, 1)
require.Equal(t, usage.ServiceInstances, 2)
}

func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
Expand All @@ -92,8 +144,12 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
}

err := updateUsage(txn, changes)
require.Error(t, err)
require.Contains(t, err.Error(), "negative count")
require.NoError(t, err)

// Check that we do not underflow
u, err := txn.First("usage", "id", "nodes")
require.NoError(t, err)
require.Equal(t, 0, u.(*UsageEntry).Count)

// A insert a change to create a usage entry
changes = Changes{
Expand Down Expand Up @@ -128,8 +184,12 @@ func TestStateStore_Usage_updateUsage_Underflow(t *testing.T) {
}

err = updateUsage(txn, changes)
require.Error(t, err)
require.Contains(t, err.Error(), "negative count")
require.NoError(t, err)

// Check that we do not underflow
u, err = txn.First("usage", "id", "nodes")
require.NoError(t, err)
require.Equal(t, 0, u.(*UsageEntry).Count)
}

func TestStateStore_Usage_ServiceUsage_updatingServiceName(t *testing.T) {
Expand Down

0 comments on commit 23cb383

Please sign in to comment.