Skip to content

Commit

Permalink
add coverage for phase2 synchronizer
Browse files Browse the repository at this point in the history
Signed-off-by: Matthias Bertschy <matthias.bertschy@gmail.com>
  • Loading branch information
matthyx committed Oct 3, 2024
1 parent 565261b commit 2bef543
Show file tree
Hide file tree
Showing 4 changed files with 146 additions and 108 deletions.
1 change: 1 addition & 0 deletions .github/workflows/pr-created.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
"TestSynchronizer_TC01_Backend",
"TestSynchronizer_TC02_InCluster",
"TestSynchronizer_TC02_Backend",
# "TestSynchronizer_TC03",
"TestSynchronizer_TC04_InCluster",
"TestSynchronizer_TC04_Backend",
"TestSynchronizer_TC05_InCluster",
Expand Down
80 changes: 80 additions & 0 deletions core/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package core

import (
"context"
"net"
"testing"
"time"

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/domain"
"github.com/stretchr/testify/assert"
)

var (
kindDeployment = domain.KindName{
Kind: domain.KindFromString(context.TODO(), "apps/v1/Deployment"),
Name: "name",
Namespace: "namespace",
}
kindKnownServers = domain.KindName{
Kind: domain.KindFromString(context.TODO(), "spdx.softwarecomposition.kubescape.io/v1beta1/KnownServers"),
Name: "name",
Namespace: "namespace",
}
object = []byte(`{"kind":"kind","metadata":{"name":"name","resourceVersion":"1"}}`)
objectClientV2 = []byte(`{"kind":"kind","metadata":{"name":"client","resourceVersion":"2"}}`)
objectServerV2 = []byte(`{"kind":"kind","metadata":{"name":"server","resourceVersion":"2"}}`)
)

func initTest(t *testing.T) (context.Context, *adapters.MockAdapter, *adapters.MockAdapter) {
ctx := context.WithValue(context.TODO(), domain.ContextKeyClientIdentifier, domain.ClientIdentifier{
Account: "11111111-2222-3333-4444-555555555555",
Cluster: "cluster",
})
err := logger.L().SetLevel(helpers.DebugLevel.String())
assert.NoError(t, err)
clientAdapter := adapters.NewMockAdapter(true)
serverAdapter := adapters.NewMockAdapter(false)
clientConn, serverConn := net.Pipe()
newConn := func() (net.Conn, error) {
return clientConn, nil
}
client, err := NewSynchronizerClient(ctx, []adapters.Adapter{clientAdapter}, clientConn, newConn)
assert.NoError(t, err)
server, err := NewSynchronizerServer(ctx, []adapters.Adapter{serverAdapter}, serverConn)
assert.NoError(t, err)
go func() {
_ = client.Start(ctx)
}()
go func() {
_ = server.Start(ctx)
}()
return ctx, clientAdapter, serverAdapter
}

func TestSynchronizer_ObjectModifiedOnBothSides(t *testing.T) {
ctx, clientAdapter, serverAdapter := initTest(t)
// pre: add object
clientAdapter.Resources[kindKnownServers.String()] = object
serverAdapter.Resources[kindKnownServers.String()] = object
// manually modify object on server (PutObject message will be sent later)
serverAdapter.Resources[kindKnownServers.String()] = objectServerV2
// we create a race condition here
// object is modified on client, but we don't know about server modification
err := clientAdapter.TestCallPutOrPatch(ctx, kindKnownServers, object, objectClientV2)
assert.NoError(t, err)
// server message arrives just now on client
err = clientAdapter.PutObject(ctx, kindKnownServers, objectServerV2)
assert.NoError(t, err)
time.Sleep(1 * time.Second)
// check both sides have the one from the server
clientObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectServerV2, clientObj)
serverObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectServerV2, serverObj)
}
72 changes: 0 additions & 72 deletions core/synchronizer_test.go
Original file line number Diff line number Diff line change
@@ -1,60 +1,12 @@
package core

import (
"context"
"net"
"testing"
"time"

"github.com/kubescape/go-logger"
"github.com/kubescape/go-logger/helpers"
"github.com/kubescape/synchronizer/adapters"
"github.com/kubescape/synchronizer/domain"
"github.com/stretchr/testify/assert"
)

var (
kindDeployment = domain.KindName{
Kind: domain.KindFromString(context.TODO(), "apps/v1/Deployment"),
Name: "name",
Namespace: "namespace",
}
kindKnownServers = domain.KindName{
Kind: domain.KindFromString(context.TODO(), "spdx.softwarecomposition.kubescape.io/v1beta1/KnownServers"),
Name: "name",
Namespace: "namespace",
}
object = []byte(`{"kind":"kind","metadata":{"name":"name","resourceVersion":"1"}}`)
objectClientV2 = []byte(`{"kind":"kind","metadata":{"name":"client","resourceVersion":"2"}}`)
objectServerV2 = []byte(`{"kind":"kind","metadata":{"name":"server","resourceVersion":"2"}}`)
)

func initTest(t *testing.T) (context.Context, *adapters.MockAdapter, *adapters.MockAdapter) {
ctx := context.WithValue(context.TODO(), domain.ContextKeyClientIdentifier, domain.ClientIdentifier{
Account: "11111111-2222-3333-4444-555555555555",
Cluster: "cluster",
})
err := logger.L().SetLevel(helpers.DebugLevel.String())
assert.NoError(t, err)
clientAdapter := adapters.NewMockAdapter(true)
serverAdapter := adapters.NewMockAdapter(false)
clientConn, serverConn := net.Pipe()
newConn := func() (net.Conn, error) {
return clientConn, nil
}
client, err := NewSynchronizerClient(ctx, []adapters.Adapter{clientAdapter}, clientConn, newConn)
assert.NoError(t, err)
server, err := NewSynchronizerServer(ctx, []adapters.Adapter{serverAdapter}, serverConn)
assert.NoError(t, err)
go func() {
_ = client.Start(ctx)
}()
go func() {
_ = server.Start(ctx)
}()
return ctx, clientAdapter, serverAdapter
}

func TestSynchronizer_ObjectAdded(t *testing.T) {
ctx, clientAdapter, serverAdapter := initTest(t)
// add object
Expand Down Expand Up @@ -95,27 +47,3 @@ func TestSynchronizer_ObjectModified(t *testing.T) {
assert.True(t, ok)
assert.Equal(t, objectClientV2, serverObj)
}

func TestSynchronizer_ObjectModifiedOnBothSides(t *testing.T) {
ctx, clientAdapter, serverAdapter := initTest(t)
// pre: add object
clientAdapter.Resources[kindKnownServers.String()] = object
serverAdapter.Resources[kindKnownServers.String()] = object
// manually modify object on server (PutObject message will be sent later)
serverAdapter.Resources[kindKnownServers.String()] = objectServerV2
// we create a race condition here
// object is modified on client, but we don't know about server modification
err := clientAdapter.TestCallPutOrPatch(ctx, kindKnownServers, object, objectClientV2)
assert.NoError(t, err)
// server message arrives just now on client
err = clientAdapter.PutObject(ctx, kindKnownServers, objectServerV2)
assert.NoError(t, err)
time.Sleep(1 * time.Second)
// check both sides have the one from the server
clientObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectServerV2, clientObj)
serverObj, ok := clientAdapter.Resources[kindKnownServers.String()]
assert.True(t, ok)
assert.Equal(t, objectServerV2, serverObj)
}
101 changes: 65 additions & 36 deletions tests/synchronizer_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ type TestKubernetesCluster struct {
// synchronizer
syncClient *core.Synchronizer
syncClientAdapter *incluster.Adapter
syncHTTPAdpater *httpendpoint.Adapter
syncHTTPAdapter *httpendpoint.Adapter
syncClientContextCancelFn context.CancelFunc

clientConn net.Conn
Expand All @@ -128,11 +128,11 @@ func randomPorts(n int) []string {
port := strconv.Itoa(rand.Intn(highPort-lowPort+1) + lowPort)
isFreePort := true
address := fmt.Sprintf("localhost:%s", port)
// Trying to listen on the port - cause the port to be in use and it takes some time for the OS to release it,
// Trying to listen on the port - cause the port to be in use, and it takes some time for the OS to release it,
// So we need to check if the port is available by trying to connect to it
conn, err := net.DialTimeout("tcp", address, 1*time.Second)
if conn != nil {
conn.Close()
_ = conn.Close()
}
isFreePort = err != nil // port is available since we got no response
if isFreePort && !ports.Contains(port) {
Expand All @@ -145,6 +145,7 @@ func randomPorts(n int) []string {
}
return ports.ToSlice()
}

func createK8sCluster(t *testing.T, cluster, account string) *TestKubernetesCluster {
ctx := context.WithValue(context.TODO(), domain.ContextKeyClientIdentifier, domain.ClientIdentifier{
Account: account,
Expand Down Expand Up @@ -434,8 +435,8 @@ func createK8sCluster(t *testing.T, cluster, account string) *TestKubernetesClus
return kubernetesCluster
}

func createPulsar(t *testing.T, ctx context.Context, brokerPort, adminPort string) (pulsarC testcontainers.Container, pulsarUrl, pulsarAdminUrl string) {
pulsarC, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
func createPulsar(t *testing.T, ctx context.Context, brokerPort, adminPort string) (testcontainers.Container, string, string) {
pulsarC, _ := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "apachepulsar/pulsar:2.11.0",
Cmd: []string{"bin/pulsar", "standalone"},
Expand All @@ -451,17 +452,17 @@ func createPulsar(t *testing.T, ctx context.Context, brokerPort, adminPort strin
},
Started: true,
})
require.NotNil(t, pulsarC)
pulsarUrl, err := pulsarC.PortEndpoint(ctx, "6650", "pulsar")
require.NoError(t, err)
pulsarUrl, err = pulsarC.PortEndpoint(ctx, "6650", "pulsar")
require.NoError(t, err)
pulsarAdminUrl, err = pulsarC.PortEndpoint(ctx, "8080", "http")
pulsarAdminUrl, err := pulsarC.PortEndpoint(ctx, "8080", "http")
require.NoError(t, err)
return pulsarC, pulsarUrl, pulsarAdminUrl
}

func createPostgres(t *testing.T, ctx context.Context, port string) (postgresC testcontainers.Container, pgHostPort string) {
func createPostgres(t *testing.T, ctx context.Context, port string) (testcontainers.Container, string) {
// postgres
postgresC, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
postgresC, _ := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "postgres:16.1",
Env: map[string]string{
Expand All @@ -474,8 +475,8 @@ func createPostgres(t *testing.T, ctx context.Context, port string) (postgresC t
},
Started: true,
})
require.NoError(t, err)
pgHostPort, err = postgresC.PortEndpoint(ctx, "5432", "")
require.NotNil(t, postgresC)
pgHostPort, err := postgresC.PortEndpoint(ctx, "5432", "")
require.NoError(t, err)
return postgresC, pgHostPort
}
Expand Down Expand Up @@ -542,7 +543,7 @@ func createAndStartSynchronizerClient(t *testing.T, cluster *TestKubernetesClust
cluster.syncClient, err = core.NewSynchronizerClient(ctx, []adapters.Adapter{clientAdapter, httpAdapter}, clientConn, newConn)
require.NoError(t, err)
cluster.syncClientAdapter = clientAdapter
cluster.syncHTTPAdpater = httpAdapter
cluster.syncHTTPAdapter = httpAdapter
cluster.clientConn = clientConn
cluster.syncClientContextCancelFn = cancel

Expand Down Expand Up @@ -634,11 +635,11 @@ func initIntegrationTest(t *testing.T) *Test {
postgresConnector.WithUseDebugConnection(ingesterConf.Logger.Level == "debug"),
}
ingesterPgClient := postgresConnector.NewPostgresClient(*ingesterConf.Postgres, pgOpts...)
time.Sleep(5 * time.Second)
err = ingesterPgClient.Connect()
require.NoError(t, err)
// run migrations
err = migration.DbMigrations(ingesterPgClient.GetClient(), migration.HotMigrationsTargetDbVersion)
require.NoError(t, err)
_ = migration.DbMigrations(ingesterPgClient.GetClient(), migration.HotMigrationsTargetDbVersion)
pulsarClient, err := pulsarconnector.NewClient(
pulsarconnector.WithConfig(&ingesterConf.Pulsar),
pulsarconnector.WithRetryAttempts(20),
Expand Down Expand Up @@ -841,23 +842,50 @@ func TestSynchronizer_TC02_Backend(t *testing.T) {
tearDown(td)
}

// FIXME phase 2, bi-directional synchronization needed
// TestSynchronizer_TC03_InCluster: Conflict resolution for a single entity
//func TestSynchronizer_TC03_InCluster(t *testing.T) {
// td := initIntegrationTest(t)
//
// // tear down
// tearDown(td)
//}

// FIXME phase 2, bi-directional synchronization needed
// TestSynchronizer_TC03_Backend: Conflict resolution for a single entity
//func TestSynchronizer_TC03_Backend(t *testing.T) {
// td := initIntegrationTest(t)
//
// // tear down
// tearDown(td)
//}
// TestSynchronizer_TC03: Conflict resolution for a single entity
// This is similar to TC02, but we modify the object simultaneously on both sides right after a connection
// failure is simulated, since versions are the same the backend modification should win
func TestSynchronizer_TC03(t *testing.T) {
td := initIntegrationTest(t)
pulsarProducer, err := eventingester.NewPulsarProducer(td.pulsarClient, "synchronizer")
require.NoError(t, err)
// add configmap to k8s
_, err = td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Create(context.TODO(), td.clusters[0].cm, metav1.CreateOptions{})
require.NoError(t, err)

_ = waitForObjectInPostgres(t, td, td.clusters[0].account, td.clusters[0].cluster, "/v1/configmaps", namespace, name)

time.Sleep(10 * time.Second)

// kill network connection on client side
dead, _ := net.Pipe()
err = dead.Close()
require.NoError(t, err)
*td.clusters[0].syncClient.Conn = dead

// modify configmap in k8s
k8sCm, err := td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
require.NoError(t, err)
k8sCm.Data["test"] = "cluster"
_, err = td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Update(context.TODO(), k8sCm, metav1.UpdateOptions{})
require.NoError(t, err)

// modify configmap in backend
cm2 := td.clusters[0].cm.DeepCopy()
cm2.Data["test"] = "test2"
b, err := json.Marshal(cm2)
require.NoError(t, err)
err = pulsarProducer.SendPutObjectMessage(td.ctx, td.clusters[0].account, td.clusters[0].cluster, "/v1/configmaps", namespace, name, "", 0, b)
require.NoError(t, err)
time.Sleep(10 * time.Second)

// check object in k8s
k8sCm2, err := td.clusters[0].k8sclient.CoreV1().ConfigMaps(namespace).Get(context.TODO(), name, metav1.GetOptions{})
assert.NoError(t, err)
assert.Equal(t, "test2", k8sCm2.Data["test"])
// tear down
tearDown(td)
}

// TestSynchronizer_TC04_InCluster: Deletion of a single entity
func TestSynchronizer_TC04_InCluster(t *testing.T) {
Expand Down Expand Up @@ -1248,17 +1276,17 @@ func TestSynchronizer_TC13_HTTPEndpoint(t *testing.T) {
require.NoError(t, err)
// http.DefaultClient.Timeout = 10 * time.Second
for httpAdapterIdx := range td.clusters {
syncHTTPAdpaterPort := td.clusters[httpAdapterIdx].syncHTTPAdpater.GetConfig().HTTPEndpoint.ServerPort
syncHTTPAdapterPort := td.clusters[httpAdapterIdx].syncHTTPAdapter.GetConfig().HTTPEndpoint.ServerPort
// check that the endpoint is up
for i := 0; i < 10; i++ {
resp, err := http.Get(fmt.Sprintf("http://localhost:%s/readyz", syncHTTPAdpaterPort))
resp, err := http.Get(fmt.Sprintf("http://localhost:%s/readyz", syncHTTPAdapterPort))
if err == nil && resp.StatusCode == http.StatusOK {
break
}
time.Sleep(1 * time.Second)
}

resp, err := http.Post(fmt.Sprintf("http://localhost:%s/apis/v1/test-ks/v1/alerts", syncHTTPAdpaterPort), "application/json", bytes.NewReader(alertBytes))
resp, err := http.Post(fmt.Sprintf("http://localhost:%s/apis/v1/test-ks/v1/alerts", syncHTTPAdapterPort), "application/json", bytes.NewReader(alertBytes))
require.NoError(t, err)
require.Equal(t, http.StatusAccepted, resp.StatusCode)
}
Expand Down Expand Up @@ -1330,7 +1358,7 @@ func waitForObjectInPostgresWithDifferentChecksum(t *testing.T, td *Test, accoun
break
}

if objMetadata.Checksum != checksum {
if objMetadata != nil && objMetadata.Checksum != checksum {
break
}
}
Expand All @@ -1340,6 +1368,7 @@ func waitForObjectInPostgresWithDifferentChecksum(t *testing.T, td *Test, accoun
require.NoError(t, err)
require.NotNil(t, objMetadata)
if checksum != "" {
//goland:noinspection GoDfaErrorMayBeNotNil,GoDfaNilDereference
require.NotEqual(t, checksum, objMetadata.Checksum, "expected object with different checksum in postgres")
}

Expand Down

0 comments on commit 2bef543

Please sign in to comment.