Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add coverage for phase2 synchronizer #81

Merged
merged 1 commit into from
Oct 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/pr-created.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
"TestSynchronizer_TC01_Backend",
"TestSynchronizer_TC02_InCluster",
"TestSynchronizer_TC02_Backend",
"TestSynchronizer_TC03",
"TestSynchronizer_TC04_InCluster",
"TestSynchronizer_TC04_Backend",
"TestSynchronizer_TC05_InCluster",
Expand Down
6 changes: 5 additions & 1 deletion adapters/incluster/v1/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,7 +450,11 @@ func (c *Client) PutObject(_ context.Context, id domain.KindName, object []byte)
}
// use apply to create or update object, we want to overwrite existing objects
// TODO for the moment we keep the dynamic client as we create fewer objects than we fetch
_, err = c.dynamicClient.Resource(c.res).Namespace(id.Namespace).Apply(context.Background(), id.Name, &obj, metav1.ApplyOptions{FieldManager: "application/apply-patch"})
options := metav1.ApplyOptions{
FieldManager: "application/apply-patch",
Force: true,
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@amirmalka I think I ran into the issue that we saw last month - do you think it's OK to force here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Eventually we discovered that the problem was trying to apply a namespaced object for a non-namespaced resource definition.
Why do we need to force? What is the issue you are seeing?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right... well in this case we really need it since it's an existing object, that was updated in both sides, but we want the backend to have the last word on it
I think mechanism 1 applies here: https://pkg.go.dev/k8s.io/client-go/applyconfigurations#hdr-Controller_Support

}
_, err = c.dynamicClient.Resource(c.res).Namespace(id.Namespace).Apply(context.Background(), id.Name, &obj, options)
if err != nil {
return fmt.Errorf("apply resource: %w", err)
}
Expand Down
80 changes: 80 additions & 0 deletions core/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package core

import (
"context"
"net"
"testing"
"time"

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/domain"
"github.com/stretchr/testify/assert"
)

var (
kindDeployment = domain.KindName{
Kind: domain.KindFromString(context.TODO(), "apps/v1/Deployment"),
Name: "name",
Namespace: "namespace",
}
kindKnownServers = domain.KindName{
Kind: domain.KindFromString(context.TODO(), "spdx.softwarecomposition.kubescape.io/v1beta1/KnownServers"),
Name: "name",
Namespace: "namespace",
}
object = []byte(`{"kind":"kind","metadata":{"name":"name","resourceVersion":"1"}}`)
objectClientV2 = []byte(`{"kind":"kind","metadata":{"name":"client","resourceVersion":"2"}}`)
objectServerV2 = []byte(`{"kind":"kind","metadata":{"name":"server","resourceVersion":"2"}}`)
)

func initTest(t *testing.T) (context.Context, *adapters.MockAdapter, *adapters.MockAdapter) {
ctx := context.WithValue(context.TODO(), domain.ContextKeyClientIdentifier, domain.ClientIdentifier{
Account: "11111111-2222-3333-4444-555555555555",
Cluster: "cluster",
})
err := logger.L().SetLevel(helpers.DebugLevel.String())
assert.NoError(t, err)
clientAdapter := adapters.NewMockAdapter(true)
serverAdapter := adapters.NewMockAdapter(false)
clientConn, serverConn := net.Pipe()
newConn := func() (net.Conn, error) {
return clientConn, nil
}
client, err := NewSynchronizerClient(ctx, []adapters.Adapter{clientAdapter}, clientConn, newConn)
assert.NoError(t, err)
server, err := NewSynchronizerServer(ctx, []adapters.Adapter{serverAdapter}, serverConn)
assert.NoError(t, err)
go func() {
_ = client.Start(ctx)
}()
go func() {
_ = server.Start(ctx)
}()
return ctx, clientAdapter, serverAdapter
}

func TestSynchronizer_ObjectModifiedOnBothSides(t *testing.T) {
ctx, clientAdapter, serverAdapter := initTest(t)
// pre: add object
clientAdapter.Resources[kindKnownServers.String()] = object
serverAdapter.Resources[kindKnownServers.String()] = object
// manually modify object on server (PutObject message will be sent later)
serverAdapter.Resources[kindKnownServers.String()] = objectServerV2
// we create a race condition here
// object is modified on client, but we don't know about server modification
err := clientAdapter.TestCallPutOrPatch(ctx, kindKnownServers, object, objectClientV2)
assert.NoError(t, err)
// server message arrives just now on client
err = clientAdapter.PutObject(ctx, kindKnownServers, objectServerV2)
assert.NoError(t, err)
time.Sleep(1 * time.Second)
// check both sides have the one from the server
clientObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectServerV2, clientObj)
serverObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectServerV2, serverObj)
}
72 changes: 0 additions & 72 deletions core/synchronizer_test.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,12 @@
package core

import (
"context"
"net"
"testing"
"time"

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/domain"
"github.com/stretchr/testify/assert"
)

var (
kindDeployment = domain.KindName{
Kind: domain.KindFromString(context.TODO(), "apps/v1/Deployment"),
Name: "name",
Namespace: "namespace",
}
kindKnownServers = domain.KindName{
Kind: domain.KindFromString(context.TODO(), "spdx.softwarecomposition.kubescape.io/v1beta1/KnownServers"),
Name: "name",
Namespace: "namespace",
}
object = []byte(`{"kind":"kind","metadata":{"name":"name","resourceVersion":"1"}}`)
objectClientV2 = []byte(`{"kind":"kind","metadata":{"name":"client","resourceVersion":"2"}}`)
objectServerV2 = []byte(`{"kind":"kind","metadata":{"name":"server","resourceVersion":"2"}}`)
)

func initTest(t *testing.T) (context.Context, *adapters.MockAdapter, *adapters.MockAdapter) {
ctx := context.WithValue(context.TODO(), domain.ContextKeyClientIdentifier, domain.ClientIdentifier{
Account: "11111111-2222-3333-4444-555555555555",
Cluster: "cluster",
})
err := logger.L().SetLevel(helpers.DebugLevel.String())
assert.NoError(t, err)
clientAdapter := adapters.NewMockAdapter(true)
serverAdapter := adapters.NewMockAdapter(false)
clientConn, serverConn := net.Pipe()
newConn := func() (net.Conn, error) {
return clientConn, nil
}
client, err := NewSynchronizerClient(ctx, []adapters.Adapter{clientAdapter}, clientConn, newConn)
assert.NoError(t, err)
server, err := NewSynchronizerServer(ctx, []adapters.Adapter{serverAdapter}, serverConn)
assert.NoError(t, err)
go func() {
_ = client.Start(ctx)
}()
go func() {
_ = server.Start(ctx)
}()
return ctx, clientAdapter, serverAdapter
}

func TestSynchronizer_ObjectAdded(t *testing.T) {
ctx, clientAdapter, serverAdapter := initTest(t)
// add object
Expand Down Expand Up @@ -95,27 +47,3 @@ func TestSynchronizer_ObjectModified(t *testing.T) {
assert.True(t, ok)
assert.Equal(t, objectClientV2, serverObj)
}

func TestSynchronizer_ObjectModifiedOnBothSides(t *testing.T) {
ctx, clientAdapter, serverAdapter := initTest(t)
// pre: add object
clientAdapter.Resources[kindKnownServers.String()] = object
serverAdapter.Resources[kindKnownServers.String()] = object
// manually modify object on server (PutObject message will be sent later)
serverAdapter.Resources[kindKnownServers.String()] = objectServerV2
// we create a race condition here
// object is modified on client, but we don't know about server modification
err := clientAdapter.TestCallPutOrPatch(ctx, kindKnownServers, object, objectClientV2)
assert.NoError(t, err)
// server message arrives just now on client
err = clientAdapter.PutObject(ctx, kindKnownServers, objectServerV2)
assert.NoError(t, err)
time.Sleep(1 * time.Second)
// check both sides have the one from the server
clientObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectServerV2, clientObj)
serverObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectServerV2, serverObj)
}
101 changes: 65 additions & 36 deletions tests/synchronizer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type TestKubernetesCluster struct {
// synchronizer
syncClient *core.Synchronizer
syncClientAdapter *incluster.Adapter
syncHTTPAdpater *httpendpoint.Adapter
syncHTTPAdapter *httpendpoint.Adapter
syncClientContextCancelFn context.CancelFunc

clientConn net.Conn
Expand All @@ -128,11 +128,11 @@ func randomPorts(n int) []string {
port := strconv.Itoa(rand.Intn(highPort-lowPort+1) + lowPort)
isFreePort := true
address := fmt.Sprintf("localhost:%s", port)
// Trying to listen on the port - cause the port to be in use and it takes some time for the OS to release it,
// Trying to listen on the port - cause the port to be in use, and it takes some time for the OS to release it,
// So we need to check if the port is available by trying to connect to it
conn, err := net.DialTimeout("tcp", address, 1*time.Second)
if conn != nil {
conn.Close()
_ = conn.Close()
}
isFreePort = err != nil // port is available since we got no response
if isFreePort && !ports.Contains(port) {
Expand All @@ -145,6 +145,7 @@ func randomPorts(n int) []string {
}
return ports.ToSlice()
}

func createK8sCluster(t *testing.T, cluster, account string) *TestKubernetesCluster {
ctx := context.WithValue(context.TODO(), domain.ContextKeyClientIdentifier, domain.ClientIdentifier{
Account: account,
Expand Down Expand Up @@ -434,8 +435,8 @@ func createK8sCluster(t *testing.T, cluster, account string) *TestKubernetesClus
return kubernetesCluster
}

func createPulsar(t *testing.T, ctx context.Context, brokerPort, adminPort string) (pulsarC testcontainers.Container, pulsarUrl, pulsarAdminUrl string) {
pulsarC, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
func createPulsar(t *testing.T, ctx context.Context, brokerPort, adminPort string) (testcontainers.Container, string, string) {
pulsarC, _ := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "apachepulsar/pulsar:2.11.0",
Cmd: []string{"bin/pulsar", "standalone"},
Expand All @@ -451,17 +452,17 @@ func createPulsar(t *testing.T, ctx context.Context, brokerPort, adminPort strin
},
Started: true,
})
require.NotNil(t, pulsarC)
pulsarUrl, err := pulsarC.PortEndpoint(ctx, "6650", "pulsar")
require.NoError(t, err)
pulsarUrl, err = pulsarC.PortEndpoint(ctx, "6650", "pulsar")
require.NoError(t, err)
pulsarAdminUrl, err = pulsarC.PortEndpoint(ctx, "8080", "http")
pulsarAdminUrl, err := pulsarC.PortEndpoint(ctx, "8080", "http")
require.NoError(t, err)
return pulsarC, pulsarUrl, pulsarAdminUrl
}

func createPostgres(t *testing.T, ctx context.Context, port string) (postgresC testcontainers.Container, pgHostPort string) {
func createPostgres(t *testing.T, ctx context.Context, port string) (testcontainers.Container, string) {
// postgres
postgresC, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
postgresC, _ := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "postgres:16.1",
Env: map[string]string{
Expand All @@ -474,8 +475,8 @@ func createPostgres(t *testing.T, ctx context.Context, port string) (postgresC t
},
Started: true,
})
require.NoError(t, err)
pgHostPort, err = postgresC.PortEndpoint(ctx, "5432", "")
require.NotNil(t, postgresC)
pgHostPort, err := postgresC.PortEndpoint(ctx, "5432", "")
require.NoError(t, err)
return postgresC, pgHostPort
}
Expand Down Expand Up @@ -542,7 +543,7 @@ func createAndStartSynchronizerClient(t *testing.T, cluster *TestKubernetesClust
cluster.syncClient, err = core.NewSynchronizerClient(ctx, []adapters.Adapter{clientAdapter, httpAdapter}, clientConn, newConn)
require.NoError(t, err)
cluster.syncClientAdapter = clientAdapter
cluster.syncHTTPAdpater = httpAdapter
cluster.syncHTTPAdapter = httpAdapter
cluster.clientConn = clientConn
cluster.syncClientContextCancelFn = cancel

Expand Down Expand Up @@ -634,11 +635,11 @@ func initIntegrationTest(t *testing.T) *Test {
postgresConnector.WithUseDebugConnection(ingesterConf.Logger.Level == "debug"),
}
ingesterPgClient := postgresConnector.NewPostgresClient(*ingesterConf.Postgres, pgOpts...)
time.Sleep(5 * time.Second)
err = ingesterPgClient.Connect()
require.NoError(t, err)
// run migrations
err = migration.DbMigrations(ingesterPgClient.GetClient(), migration.HotMigrationsTargetDbVersion)
require.NoError(t, err)
_ = migration.DbMigrations(ingesterPgClient.GetClient(), migration.HotMigrationsTargetDbVersion)
pulsarClient, err := pulsarconnector.NewClient(
pulsarconnector.WithConfig(&ingesterConf.Pulsar),
pulsarconnector.WithRetryAttempts(20),
Expand Down Expand Up @@ -841,23 +842,50 @@ func TestSynchronizer_TC02_Backend(t *testing.T) {
tearDown(td)
}

// FIXME phase 2, bi-directional synchronization needed
// TestSynchronizer_TC03_InCluster: Conflict resolution for a single entity
//func TestSynchronizer_TC03_InCluster(t *testing.T) {
// td := initIntegrationTest(t)
//
// // tear down
// tearDown(td)
//}

// FIXME phase 2, bi-directional synchronization needed
// TestSynchronizer_TC03_Backend: Conflict resolution for a single entity
//func TestSynchronizer_TC03_Backend(t *testing.T) {
// td := initIntegrationTest(t)
//
// // tear down
// tearDown(td)
//}
// TestSynchronizer_TC03: Conflict resolution for a single entity
// This is similar to TC02, but we modify the object simultaneously on both sides right after a connection
// failure is simulated, since versions are the same the backend modification should win
func TestSynchronizer_TC03(t *testing.T) {
td := initIntegrationTest(t)
pulsarProducer, err := eventingester.NewPulsarProducer(td.pulsarClient, "synchronizer")
require.NoError(t, err)
// add configmap to k8s
_, err = td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Create(context.TODO(), td.clusters[0].cm, metav1.CreateOptions{})
require.NoError(t, err)

_ = waitForObjectInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "/v1/configmaps", namespace, name)

time.Sleep(10 * time.Second)

// kill network connection on client side
dead, _ := net.Pipe()
err = dead.Close()
require.NoError(t, err)
*td.clusters[0].syncClient.Conn = dead

// modify configmap in k8s
k8sCm, err := td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
require.NoError(t, err)
k8sCm.Data["test"] = "cluster"
_, err = td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Update(context.TODO(), k8sCm, metav1.UpdateOptions{})
require.NoError(t, err)

// modify configmap in backend
cm2 := td.clusters[0].cm.DeepCopy()
cm2.Data["test"] = "test2"
b, err := json.Marshal(cm2)
require.NoError(t, err)
err = pulsarProducer.SendPutObjectMessage(td.ctx, td.clusters[0].account, td.clusters[0].cluster, "/v1/configmaps", namespace, name, "", 0, b)
require.NoError(t, err)
time.Sleep(10 * time.Second)

// check object in k8s
k8sCm2, err := td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
assert.NoError(t, err)
assert.Equal(t, "test2", k8sCm2.Data["test"])
// tear down
tearDown(td)
}

// TestSynchronizer_TC04_InCluster: Deletion of a single entity
func TestSynchronizer_TC04_InCluster(t *testing.T) {
Expand Down Expand Up @@ -1248,17 +1276,17 @@ func TestSynchronizer_TC13_HTTPEndpoint(t *testing.T) {
require.NoError(t, err)
// http.DefaultClient.Timeout = 10 * time.Second
for httpAdapterIdx := range td.clusters {
syncHTTPAdpaterPort := td.clusters[httpAdapterIdx].syncHTTPAdpater.GetConfig().HTTPEndpoint.ServerPort
syncHTTPAdapterPort := td.clusters[httpAdapterIdx].syncHTTPAdapter.GetConfig().HTTPEndpoint.ServerPort
// check that the endpoint is up
for i := 0; i < 10; i++ {
resp, err := http.Get(fmt.Sprintf("http://localhost:%s/readyz", syncHTTPAdpaterPort))
resp, err := http.Get(fmt.Sprintf("http://localhost:%s/readyz", syncHTTPAdapterPort))
if err == nil && resp.StatusCode == http.StatusOK {
break
}
time.Sleep(1 * time.Second)
}

resp, err := http.Post(fmt.Sprintf("http://localhost:%s/apis/v1/test-ks/v1/alerts", syncHTTPAdpaterPort), "application/json", bytes.NewReader(alertBytes))
resp, err := http.Post(fmt.Sprintf("http://localhost:%s/apis/v1/test-ks/v1/alerts", syncHTTPAdapterPort), "application/json", bytes.NewReader(alertBytes))
require.NoError(t, err)
require.Equal(t, http.StatusAccepted, resp.StatusCode)
}
Expand Down Expand Up @@ -1330,7 +1358,7 @@ func waitForObjectInPostgresWithDifferentChecksum(t *testing.T, td *Test, accoun
break
}

if objMetadata.Checksum != checksum {
if objMetadata != nil && objMetadata.Checksum != checksum {
break
}
}
Expand All @@ -1340,6 +1368,7 @@ func waitForObjectInPostgresWithDifferentChecksum(t *testing.T, td *Test, accoun
require.NoError(t, err)
require.NotNil(t, objMetadata)
if checksum != "" {
//goland:noinspection GoDfaErrorMayBeNotNil,GoDfaNilDereference
require.NotEqual(t, checksum, objMetadata.Checksum, "expected object with different checksum in postgres")
}

Expand Down
Loading