From 9637ef0efd936ca0e1b4e16ee7de56600cc68c14 Mon Sep 17 00:00:00 2001 From: Amir Malka Date: Thu, 22 Feb 2024 16:20:36 +0200 Subject: [PATCH] added version check before starting reconciliation Signed-off-by: Amir Malka --- adapters/backend/v1/adapter.go | 8 +++++++ utils/utils.go | 19 +++++++++++++++- utils/utils_test.go | 41 ++++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/adapters/backend/v1/adapter.go b/adapters/backend/v1/adapter.go index d0cf46c..8020aaa 100644 --- a/adapters/backend/v1/adapter.go +++ b/adapters/backend/v1/adapter.go @@ -219,6 +219,14 @@ func (a *Adapter) startReconciliationPeriodicTask(mainCtx context.Context, cfg * logger.L().Error("expected to find client for reconciliation in clients map", helpers.String("clientId", clientId.String())) continue } + + if !utils.IsBatchMessageSupported(clientId.Version) { + logger.L().Info("skipping reconciliation request for client because it does not support batch messages", + helpers.String("version", clientId.Version), + helpers.Interface("clientId", clientId.String())) + continue + } + clientCtx := utils.ContextFromIdentifiers(mainCtx, clientId) err := client.SendReconciliationRequestMessage(clientCtx) if err != nil { diff --git a/utils/utils.go b/utils/utils.go index fb7c17c..b6b24fd 100644 --- a/utils/utils.go +++ b/utils/utils.go @@ -7,7 +7,6 @@ import ( "errors" "flag" "fmt" - "github.com/cenkalti/backoff/v4" "net/http" "net/http/pprof" "path/filepath" @@ -15,6 +14,9 @@ import ( "strconv" "time" + "github.com/cenkalti/backoff/v4" + "golang.org/x/mod/semver" + "github.com/SergJa/jsonhash" "github.com/apache/pulsar-client-go/pulsar" "github.com/davecgh/go-spew/spew" @@ -272,3 +274,18 @@ func NewBackOff() backoff.BackOff { b.MaxElapsedTime = 0 return b } + +// GreaterOrEqualVersion returns true if a version is greater or equal to b +func GreaterOrEqualVersion(a string, b string) bool { + return semver.Compare(a, b) >= 0 +} + +func IsBatchMessageSupported(version string) bool { + const minimumSupportedVersion = "v0.0.57" + + if version == "" { + return false + } + + return GreaterOrEqualVersion(version, minimumSupportedVersion) +} diff --git a/utils/utils_test.go b/utils/utils_test.go index bff714a..91e19b0 100644 --- a/utils/utils_test.go +++ b/utils/utils_test.go @@ -134,3 +134,44 @@ func TestRemoveManagedFields(t *testing.T) { }) } } + +func TestGreaterOrEqualVersion(t *testing.T) { + testCases := []struct { + a string + b string + expected bool + }{ + {"v0.0.2", "v0.0.1", true}, + {"v0.0.1", "v0.0.2", false}, + {"v0.0.1", "v0.0.1", true}, + } + + for _, tc := range testCases { + result := GreaterOrEqualVersion(tc.a, tc.b) + if result != tc.expected { + t.Errorf("For version %s >= %s, expected %v but got %v", tc.a, tc.b, tc.expected, result) + } + } +} + +func TestIsBatchMessageSupported(t *testing.T) { + testCases := []struct { + version string + expected bool + }{ + {"", false}, // Empty version should return false + {"v0.0.56", false}, // Version less than the minimum supported version should return false + {"v0.0.57", true}, // Minimum supported version should return true + {"v0.0.58", true}, // Version greater than the minimum supported version should return true + {"v1.0.0", true}, // Version with a major version greater than 0 should return true + {"v1.2.3", true}, // Version with a major version greater than 0 should return true + {"invalid_version", false}, // Invalid version should return false + } + + for _, tc := range testCases { + result := IsBatchMessageSupported(tc.version) + if result != tc.expected { + t.Errorf("For version %s, expected %v but got %v", tc.version, tc.expected, result) + } + } +}