From 008b001972a6c69bbbc0feabaa217b6ff7bcc27a Mon Sep 17 00:00:00 2001 From: Ales Pour Date: Tue, 19 Mar 2024 10:52:04 +0100 Subject: [PATCH] fix: do not migrate events with empty ID --- server/migrate_topic_store_test.go | 65 +++++++++++++++++++++++---- services/alert/migrate_topic_store.go | 6 +++ 2 files changed, 62 insertions(+), 9 deletions(-) diff --git a/server/migrate_topic_store_test.go b/server/migrate_topic_store_test.go index 591674280..963478a29 100644 --- a/server/migrate_topic_store_test.go +++ b/server/migrate_topic_store_test.go @@ -8,15 +8,19 @@ import ( "io" "os" "testing" + "time" + alertcore "github.com/influxdata/kapacitor/alert" "github.com/influxdata/kapacitor/services/alert" "github.com/influxdata/kapacitor/services/alert/alerttest" "github.com/influxdata/kapacitor/services/storage" ) type testData struct { - name string - topicEventStatesMap map[string]map[string]alert.EventState + name string + topicEventStatesMap map[string]map[string]alert.EventState + v2IsTheSame bool + topicEventStatesV2Map map[string]map[string]alert.EventState } var tests []testData = []testData{ @@ -25,6 +29,7 @@ var tests []testData = []testData{ topicEventStatesMap: map[string]map[string]alert.EventState{ "t1": alerttest.MakeEventStates(alerttest.EventStateSpec{N: 300, Mwc: 6, Dwc: 20}), }, + v2IsTheSame: true, }, { name: "three topics", @@ -33,6 +38,7 @@ var tests []testData = []testData{ "t2": alerttest.MakeEventStates(alerttest.EventStateSpec{N: 130, Mwc: 6, Dwc: 12}), "t3": alerttest.MakeEventStates(alerttest.EventStateSpec{N: 50, Mwc: 6, Dwc: 17}), }, + v2IsTheSame: true, }, { name: "two topics", @@ -40,6 +46,40 @@ var tests []testData = []testData{ "t1": alerttest.MakeEventStates(alerttest.EventStateSpec{N: 100, Mwc: 5, Dwc: 15}), "t2": alerttest.MakeEventStates(alerttest.EventStateSpec{N: 130, Mwc: 6, Dwc: 12}), }, + v2IsTheSame: true, + }, + { + name: "EAR issue 4984", // fails with "cannot store event \"\" in topic \"main:batch:alert3\": key required" + topicEventStatesMap: map[string]map[string]alert.EventState{ + "main:batch:alert3": { + "": { + Message: "event state no ID", + Details: "event state no ID details", + Time: time.Unix(1710838800, 0), + Duration: time.Duration(15 * time.Second), + Level: alertcore.OK, + }, + "event_state_id_1": { + Message: "event state 1", + Details: "event state 1 details", + Time: time.Unix(1710838801, 0), + Duration: time.Duration(15 * time.Second), + Level: alertcore.Warning, + }, + }, + }, + v2IsTheSame: false, + topicEventStatesV2Map: map[string]map[string]alert.EventState{ + "main:batch:alert3": { // event state with empty ID is not migrated + "event_state_id_1": { + Message: "event state 1", + Details: "event state 1 details", + Time: time.Unix(1710838801, 0), + Duration: time.Duration(15 * time.Second), + Level: alertcore.Warning, + }, + }, + }, }, } @@ -88,6 +128,13 @@ func Test_Migrate_TopicStore(t *testing.T) { t.Fatalf("topic store version: expected: %q, got: %q", alert.TopicStoreVersion2, version) } + var topicEventStatesMap map[string]map[string]alert.EventState + if tt.v2IsTheSame { + topicEventStatesMap = tt.topicEventStatesMap + } else { + topicEventStatesMap = tt.topicEventStatesV2Map + } + count := 0 err = alert.WalkTopicBuckets(s.AlertService.StorageService.Store(alert.TopicStatesNameSpace), func(tx storage.ReadOnlyTx, topic string) error { esStoredV2, err := alert.LoadTopicBucket(tx, []byte(topic)) @@ -95,7 +142,7 @@ func Test_Migrate_TopicStore(t *testing.T) { return err } count++ - if esOriginal, ok := tt.topicEventStatesMap[topic]; !ok { + if esOriginal, ok := topicEventStatesMap[topic]; !ok { return fmt.Errorf("topic %q not found in version two store: %w", topic, alert.ErrNoTopicStateExists) } else if ok, msg := eventStateMapCompare(esOriginal, esStoredV2); !ok { return fmt.Errorf("event states for topic %q differ between original and V2 storage: %s", topic, msg) @@ -104,8 +151,8 @@ func Test_Migrate_TopicStore(t *testing.T) { }) if err != nil { t.Fatalf("migration V1 to V2 error: %v", err) - } else if count != len(tt.topicEventStatesMap) { - t.Fatalf("wrong number of store topics. Expected %d, got %d", len(tt.topicEventStatesMap), count) + } else if count != len(topicEventStatesMap) { + t.Fatalf("wrong number of store topics. Expected %d, got %d", len(topicEventStatesMap), count) } err = alert.MigrateTopicStoreV2V1(s.StorageService) @@ -113,13 +160,13 @@ func Test_Migrate_TopicStore(t *testing.T) { t.Fatalf("migration V2 to V1 error: %v", err) } // Load all the saved topic states (plus one in case of error or duplicates in saving). - topicStates, err := TopicStatesDAO.List("", 0, len(tt.topicEventStatesMap)+1) + topicStates, err := TopicStatesDAO.List("", 0, len(topicEventStatesMap)+1) if err != nil { t.Fatalf("failed to load saved topic states: %v", err) } count = 0 for _, ts := range topicStates { - if esOriginal, ok := tt.topicEventStatesMap[ts.Topic]; !ok { + if esOriginal, ok := topicEventStatesMap[ts.Topic]; !ok { t.Fatalf("topic %q not found in version one store: %v", ts.Topic, alert.ErrNoTopicStateExists) } else if ok, msg := eventStateMapCompare(esOriginal, ts.EventStates); !ok { t.Fatalf("event states for topic %q differ between V2 storage and original: %s", ts.Topic, msg) @@ -127,8 +174,8 @@ func Test_Migrate_TopicStore(t *testing.T) { count++ } } - if count != len(tt.topicEventStatesMap) { - t.Fatalf("wrong number of store topics. Expected %d, got %d", len(tt.topicEventStatesMap), count) + if count != len(topicEventStatesMap) { + t.Fatalf("wrong number of store topics. Expected %d, got %d", len(topicEventStatesMap), count) } }) } diff --git a/services/alert/migrate_topic_store.go b/services/alert/migrate_topic_store.go index 5351da3d0..654dbafe9 100644 --- a/services/alert/migrate_topic_store.go +++ b/services/alert/migrate_topic_store.go @@ -70,6 +70,12 @@ func (s *Service) MigrateTopicStoreV1V2() (rErr error) { if err != nil { return fmt.Errorf("error converting event %q in topic %q to JSON: %w", id, ts.Topic, err) } + if id == "" { + s.diag.Info("event with empty ID not migrated", + keyvalue.T{Key: "topic", Value: ts.Topic}, + keyvalue.T{Key: "event", Value: string(data)}) + continue + } if err = txBucket.Put(id, data); err != nil { return fmt.Errorf("cannot store event %q in topic %q: %w", id, ts.Topic, err) }