Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support dump workload bpf map #853

Merged
merged 5 commits into from
Sep 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 10 additions & 5 deletions pkg/controller/workload/bpfcache/auth_policy.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,25 +20,30 @@
"github.com/cilium/ebpf"
)

type WorkloadPolicy_key struct {
type WorkloadPolicyKey struct {
WorklodId uint32 // workloadIp to uint32
}

type WorkloadPolicy_value struct {
type WorkloadPolicyValue struct {
PolicyIds [4]uint32 // name length is [MAX_MEMBER_NUM_PER_POLICY]
}

func (c *Cache) WorkloadPolicyUpdate(key *WorkloadPolicy_key, value *WorkloadPolicy_value) error {
func (c *Cache) WorkloadPolicyUpdate(key *WorkloadPolicyKey, value *WorkloadPolicyValue) error {

Check warning on line 31 in pkg/controller/workload/bpfcache/auth_policy.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/auth_policy.go#L31

Added line #L31 was not covered by tests
log.Debugf("workload policy update: [%#v], [%#v]", *key, *value)
return c.bpfMap.MapOfWlPolicy.Update(key, value, ebpf.UpdateAny)
}

func (c *Cache) WorkloadPolicyDelete(key *WorkloadPolicy_key) error {
func (c *Cache) WorkloadPolicyDelete(key *WorkloadPolicyKey) error {

Check warning on line 36 in pkg/controller/workload/bpfcache/auth_policy.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/bpfcache/auth_policy.go#L36

Added line #L36 was not covered by tests
log.Debugf("workload policy delete: [%#v]", *key)
return c.bpfMap.MapOfWlPolicy.Delete(key)
}

func (c *Cache) WorkloadPolicyLookup(key *WorkloadPolicy_key, value *WorkloadPolicy_value) error {
func (c *Cache) WorkloadPolicyLookup(key *WorkloadPolicyKey, value *WorkloadPolicyValue) error {
log.Debugf("workload policy lookup: [%#v]", *key)
return c.bpfMap.MapOfWlPolicy.Lookup(key, value)
}

func (c *Cache) WorkloadPolicyLookupAll() []WorkloadPolicyValue {
log.Debugf("WorkloadPolicyLookupAll")
return LookupAll[WorkloadPolicyKey, WorkloadPolicyValue](c.bpfMap.MapOfWlPolicy)
}
15 changes: 5 additions & 10 deletions pkg/controller/workload/bpfcache/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,15 +56,10 @@ func (c *Cache) BackendLookup(key *BackendKey, value *BackendValue) error {
// BackendCount returns the length of backend map
// Note only used for testing
func (c *Cache) BackendCount() int {
var (
key = BackendKey{}
value = BackendValue{}
)
return len(c.BackendLookupAll())
}

res := 0
iter := c.bpfMap.KmeshBackend.Iterate()
for iter.Next(&key, &value) {
res++
}
return res
func (c *Cache) BackendLookupAll() []BackendValue {
log.Debugf("BackendLookupAll")
return LookupAll[BackendKey, BackendValue](c.bpfMap.KmeshBackend)
}
35 changes: 35 additions & 0 deletions pkg/controller/workload/bpfcache/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright The Kmesh Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at:
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package bpfcache
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

copyright is missing


import (
"github.com/cilium/ebpf"
)

func LookupAll[K any, V any](bpfMap *ebpf.Map) []V {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good generic function

var (
key K
value V
ret []V
)

iter := bpfMap.Iterate()
for iter.Next(&key, &value) {
ret = append(ret, value)
}
return ret
}
15 changes: 5 additions & 10 deletions pkg/controller/workload/bpfcache/endpoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,10 @@ func (c *Cache) GetAllEndpointsForService(serviceId uint32) []EndpointValue {
// EndpointCount returns the length of endpoint map
// Note only used for testing
func (c *Cache) EndpointCount() int {
var (
key = EndpointKey{}
value = EndpointValue{}
)
return len(c.EndpointLookupAll())
}

res := 0
iter := c.bpfMap.KmeshEndpoint.Iterate()
for iter.Next(&key, &value) {
res++
}
return res
func (c *Cache) EndpointLookupAll() []EndpointValue {
log.Debugf("EndpointLookupAll")
return LookupAll[EndpointKey, EndpointValue](c.bpfMap.KmeshEndpoint)
}
4 changes: 2 additions & 2 deletions pkg/controller/workload/bpfcache/fake_map.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func NewFakeWorkloadMap(t *testing.T) bpf2go.KmeshCgroupSockWorkloadMaps {
wlPolicyMap, err := ebpf.NewMap(&ebpf.MapSpec{
Name: "workload_policy",
Type: ebpf.Hash,
KeySize: uint32(unsafe.Sizeof(WorkloadPolicy_key{})),
ValueSize: uint32(unsafe.Sizeof(WorkloadPolicy_value{})),
KeySize: uint32(unsafe.Sizeof(WorkloadPolicyKey{})),
ValueSize: uint32(unsafe.Sizeof(WorkloadPolicyValue{})),
MaxEntries: 512,
})
if err != nil {
Expand Down
15 changes: 5 additions & 10 deletions pkg/controller/workload/bpfcache/frontend.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,15 +68,10 @@ func (c *Cache) FrontendIterFindKey(upstreamId uint32) []FrontendKey {
// FrontendCount returns the length of frontend map
// Note only used for testing
func (c *Cache) FrontendCount() int {
var (
key = FrontendKey{}
value = FrontendValue{}
)
return len(c.FrontendLookupAll())
}

res := 0
iter := c.bpfMap.KmeshFrontend.Iterate()
for iter.Next(&key, &value) {
res++
}
return res
func (c *Cache) FrontendLookupAll() []FrontendValue {
log.Debugf("FrontendLookupAll")
return LookupAll[FrontendKey, FrontendValue](c.bpfMap.KmeshFrontend)
}
15 changes: 5 additions & 10 deletions pkg/controller/workload/bpfcache/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,15 +58,10 @@ func (c *Cache) ServiceLookup(key *ServiceKey, value *ServiceValue) error {
// ServiceCount returns the length of service map
// Note only used for testing
func (c *Cache) ServiceCount() int {
var (
key = ServiceKey{}
value = ServiceValue{}
)
return len(c.ServiceLookupAll())
}

res := 0
iter := c.bpfMap.KmeshService.Iterate()
for iter.Next(&key, &value) {
res++
}
return res
func (c *Cache) ServiceLookupAll() []ServiceValue {
log.Debugf("ServiceLookupAll")
return LookupAll[ServiceKey, ServiceValue](c.bpfMap.KmeshService)
}
2 changes: 1 addition & 1 deletion pkg/controller/workload/workload_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@

func NewController(bpfWorkload *bpf.BpfKmeshWorkload) *Controller {
c := &Controller{
Processor: newProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps),
Processor: NewProcessor(bpfWorkload.SockConn.KmeshCgroupSockWorkloadObjects.KmeshCgroupSockWorkloadMaps),

Check warning on line 48 in pkg/controller/workload/workload_controller.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_controller.go#L48

Added line #L48 was not covered by tests
bpfWorkloadObj: bpfWorkload,
}
// do some initialization when restart
Expand Down
4 changes: 2 additions & 2 deletions pkg/controller/workload/workload_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func TestWorkloadStreamCreateAndSend(t *testing.T) {
beforeFunc: func(t *testing.T) {
patches1.ApplyMethodReturn(fakeClient.Client, "DeltaAggregatedResources", fakeClient.DeltaClient, nil)

workloadController.Processor = newProcessor(workloadMap)
workloadController.Processor = NewProcessor(workloadMap)
workload := createFakeWorkload("10.10.10.1", workloadapi.NetworkMode_STANDARD)
workloadController.Processor.WorkloadCache.AddOrUpdateWorkload(workload)
patches2.ApplyMethodFunc(fakeClient.DeltaClient, "Send",
Expand Down Expand Up @@ -152,7 +152,7 @@ func TestWorkloadStreamCreateAndSend(t *testing.T) {
beforeFunc: func(t *testing.T) {
patches0.ApplyFuncReturn(maps_v2.AuthorizationUpdate, nil)
patches1.ApplyMethodReturn(fakeClient.Client, "DeltaAggregatedResources", fakeClient.DeltaClient, nil)
workloadController.Processor = newProcessor(workloadMap)
workloadController.Processor = NewProcessor(workloadMap)
workloadController.Rbac = auth.NewRbac(nil)
workloadController.Rbac.UpdatePolicy(&security.Authorization{
Name: "p1",
Expand Down
14 changes: 9 additions & 5 deletions pkg/controller/workload/workload_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@
authzOnce sync.Once
}

func newProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor {
func NewProcessor(workloadMap bpf2go.KmeshCgroupSockWorkloadMaps) *Processor {
return &Processor{
hashName: NewHashName(),
bpf: bpf.NewCache(workloadMap),
Expand Down Expand Up @@ -93,6 +93,10 @@
}
}

func (p *Processor) GetBpfCache() *bpf.Cache {
return p.bpf

Check warning on line 97 in pkg/controller/workload/workload_processor.go

View check run for this annotation

Codecov / codecov/patch

pkg/controller/workload/workload_processor.go#L96-L97

Added lines #L96 - L97 were not covered by tests
}

func (p *Processor) processWorkloadResponse(rsp *service_discovery_v3.DeltaDiscoveryResponse, rbac *auth.Rbac) {
var err error

Expand Down Expand Up @@ -172,7 +176,7 @@
var (
err error
bkDelete = bpf.BackendKey{}
wpkDelete = bpf.WorkloadPolicy_key{}
wpkDelete = bpf.WorkloadPolicyKey{}
)

backendUid := p.hashName.Hash(uid)
Expand All @@ -198,7 +202,7 @@
}

// 4. delete auth policy of workload
wpkValue := bpf.WorkloadPolicy_value{}
wpkValue := bpf.WorkloadPolicyValue{}
wpkDelete.WorklodId = backendUid
if err = p.bpf.WorkloadPolicyLookup(&wpkDelete, &wpkValue); err == nil {
if err = p.bpf.WorkloadPolicyDelete(&wpkDelete); err != nil {
Expand Down Expand Up @@ -746,8 +750,8 @@

func (p *Processor) storeWorkloadPolicies(uid string, polices []string) {
var (
key = bpf.WorkloadPolicy_key{}
value = bpf.WorkloadPolicy_value{}
key = bpf.WorkloadPolicyKey{}
value = bpf.WorkloadPolicyValue{}
)
if len(polices) == 0 {
return
Expand Down
10 changes: 5 additions & 5 deletions pkg/controller/workload/workload_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ func Test_handleWorkload(t *testing.T) {
workloadMap := bpfcache.NewFakeWorkloadMap(t)
defer bpfcache.CleanupFakeWorkloadMap(workloadMap)

p := newProcessor(workloadMap)
p := NewProcessor(workloadMap)

var (
ek bpfcache.EndpointKey
Expand Down Expand Up @@ -175,7 +175,7 @@ func Test_handleServiceWithWaypoint(t *testing.T) {
// Mainly used to test whether processor can correctly handle
// different types of waypoint address without panic.
workloadMap := bpfcache.NewFakeWorkloadMap(t)
p := newProcessor(workloadMap)
p := NewProcessor(workloadMap)

// Waypoint with network address.
svc1 := createFakeService("svc1", "10.240.10.1", "10.240.10.200")
Expand All @@ -188,7 +188,7 @@ func Test_handleServiceWithWaypoint(t *testing.T) {

func Test_hostnameNetworkMode(t *testing.T) {
workloadMap := bpfcache.NewFakeWorkloadMap(t)
p := newProcessor(workloadMap)
p := NewProcessor(workloadMap)
workload := createFakeWorkload("1.2.3.4", workloadapi.NetworkMode_STANDARD)
workloadWithoutService := createFakeWorkload("1.2.3.5", workloadapi.NetworkMode_STANDARD)
workloadWithoutService.Services = nil
Expand Down Expand Up @@ -485,7 +485,7 @@ func TestRestart(t *testing.T) {
workloadMap := bpfcache.NewFakeWorkloadMap(t)
defer bpfcache.CleanupFakeWorkloadMap(workloadMap)

p := newProcessor(workloadMap)
p := NewProcessor(workloadMap)

res := &service_discovery_v3.DeltaDiscoveryResponse{}

Expand Down Expand Up @@ -545,7 +545,7 @@ func TestRestart(t *testing.T) {
// Set a restart label and simulate missing data in the cache
bpf.SetStartType(bpf.Restart)
// reconstruct a new processor
p = newProcessor(workloadMap)
p = NewProcessor(workloadMap)
p.bpf.RestoreEndpointKeys()
// 2.1 simulate workload add/delete during restart
// simulate workload update during restart
Expand Down
Loading
Loading