Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fixing concurrent map access panic in scaler #415

Merged
merged 3 commits into from
Mar 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions scaler/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,9 @@ func (e *impl) IsActive(
)
if !ok {
err := fmt.Errorf("host '%s' not found in counts", host)
allCounts := mergeCountsWithRoutingTable(e.pinger.counts(), e.routingTable)
allCounts := e.pinger.mergeCountsWithRoutingTable(
e.routingTable,
)
lggr.Error(err, "Given host was not found in queue count map", "host", host, "allCounts", allCounts)
return nil, err
}
Expand Down Expand Up @@ -173,7 +175,7 @@ func (e *impl) GetMetrics(
hostCount = e.pinger.aggregate()
} else {
err := fmt.Errorf("host '%s' not found in counts", host)
allCounts := mergeCountsWithRoutingTable(e.pinger.counts(), e.routingTable)
allCounts := e.pinger.mergeCountsWithRoutingTable(e.routingTable)
lggr.Error(err, "allCounts", allCounts)
return nil, err
}
Expand Down
16 changes: 0 additions & 16 deletions scaler/host_counts.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,22 +4,6 @@ import (
"github.com/kedacore/http-add-on/pkg/routing"
)

// mergeCountsWithRoutingTable ensures that all hosts in routing table
// are present in combined counts, if count is not present value is set to 0
func mergeCountsWithRoutingTable(
counts map[string]int,
table routing.TableReader,
) map[string]int {
mergedCounts := make(map[string]int)
for _, host := range table.Hosts() {
mergedCounts[host] = 0
}
for key, value := range counts {
mergedCounts[key] = value
}
return mergedCounts
}

// getHostCount gets proper count for given host regardless whether
// host is in counts or only in routerTable
func getHostCount(
Expand Down
113 changes: 50 additions & 63 deletions scaler/host_counts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,79 +14,66 @@ type testCase struct {
retCounts map[string]int
}

var cases = []testCase{
{
name: "empty queue",
table: newRoutingTable([]hostAndTarget{
{
host: "www.example.com",
target: routing.Target{},
func cases() []testCase {
return []testCase{
{
name: "empty queue",
table: newRoutingTable([]hostAndTarget{
{
host: "www.example.com",
target: routing.Target{},
},
{
host: "www.example2.com",
target: routing.Target{},
},
}),
counts: make(map[string]int),
retCounts: map[string]int{
"www.example.com": 0,
"www.example2.com": 0,
},
{
host: "www.example2.com",
target: routing.Target{},
},
}),
counts: make(map[string]int),
retCounts: map[string]int{
"www.example.com": 0,
"www.example2.com": 0,
},
},
{
name: "one entry in queue, same entry in routing table",
table: newRoutingTable([]hostAndTarget{
{
host: "example.com",
target: routing.Target{},
{
name: "one entry in queue, same entry in routing table",
table: newRoutingTable([]hostAndTarget{
{
host: "example.com",
target: routing.Target{},
},
}),
counts: map[string]int{
"example.com": 1,
},
retCounts: map[string]int{
"example.com": 1,
},
}),
counts: map[string]int{
"example.com": 1,
},
retCounts: map[string]int{
"example.com": 1,
},
},
{
name: "one entry in queue, two in routing table",
table: newRoutingTable([]hostAndTarget{
{
host: "example.com",
target: routing.Target{},
{
name: "one entry in queue, two in routing table",
table: newRoutingTable([]hostAndTarget{
{
host: "example.com",
target: routing.Target{},
},
{
host: "example2.com",
target: routing.Target{},
},
}),
counts: map[string]int{
"example.com": 1,
},
{
host: "example2.com",
target: routing.Target{},
retCounts: map[string]int{
"example.com": 1,
"example2.com": 0,
},
}),
counts: map[string]int{
"example.com": 1,
},
retCounts: map[string]int{
"example.com": 1,
"example2.com": 0,
},
},
}

func TestMergeCountsWithRoutingTable(t *testing.T) {

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
r := require.New(t)
ret := mergeCountsWithRoutingTable(
tc.counts,
tc.table,
)
r.Equal(tc.retCounts, ret)
})
}
}

}
func TestGetHostCount(t *testing.T) {

for _, tc := range cases {
for _, tc := range cases() {
for host, retCount := range tc.retCounts {
t.Run(tc.name, func(t *testing.T) {
r := require.New(t)
Expand Down
18 changes: 18 additions & 0 deletions scaler/queue_pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/go-logr/logr"
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/queue"
"github.com/kedacore/http-add-on/pkg/routing"
"github.com/pkg/errors"
"golang.org/x/sync/errgroup"
)
Expand Down Expand Up @@ -124,6 +125,23 @@ func (q *queuePinger) counts() map[string]int {
return q.allCounts
}

// mergeCountsWithRoutingTable ensures that all hosts in routing table
// are present in combined counts, if count is not present value is set to 0
func (q *queuePinger) mergeCountsWithRoutingTable(
table routing.TableReader,
) map[string]int {
q.pingMut.RLock()
defer q.pingMut.RUnlock()
mergedCounts := make(map[string]int)
for _, host := range table.Hosts() {
mergedCounts[host] = 0
}
for key, value := range q.allCounts {
mergedCounts[key] = value
}
return mergedCounts
}

func (q *queuePinger) aggregate() int {
q.pingMut.RLock()
defer q.pingMut.RUnlock()
Expand Down
46 changes: 46 additions & 0 deletions scaler/queue_pinger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/kedacore/http-add-on/pkg/k8s"
"github.com/kedacore/http-add-on/pkg/queue"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
v1 "k8s.io/api/core/v1"
)

Expand Down Expand Up @@ -215,3 +216,48 @@ func TestFetchCounts(t *testing.T) {
}
r.Equal(expectedCounts, cts)
}

func TestMergeCountsWithRoutingTable(t *testing.T) {
for _, tc := range cases() {
t.Run(tc.name, func(t *testing.T) {
ctx := context.Background()
grp, ctx := errgroup.WithContext(ctx)
r := require.New(t)
const C = 100
tickr, q, err := newFakeQueuePinger(
ctx,
logr.Discard(),
)
r.NoError(err)
defer tickr.Stop()
q.allCounts = tc.counts

retCh := make(chan map[string]int)
for i := 0; i < C; i++ {
grp.Go(func() error {
retCh <- q.mergeCountsWithRoutingTable(tc.table)
return nil
})
}

// ensure we receive from retCh C times
allRets := map[int]map[string]int{}
for i := 0; i < C; i++ {
allRets[i] = <-retCh
}

r.NoError(grp.Wait())

// ensure that all returned maps are the
// same
prev := allRets[0]
for i := 1; i < C; i++ {
r.Equal(prev, allRets[i])
prev = allRets[i]
}
// ensure that all the returned maps are
// equal to what we expected
r.Equal(tc.retCounts, prev)
})
}
}