From b9b043c51e95b8ff446708671c1895a2a1d3ce05 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=A1chym=20Tou=C5=A1ek?= Date: Tue, 12 Nov 2024 10:06:23 +0100 Subject: [PATCH] fix: Stream events schema --- .../sink/type/tablesink/keboola/bridge/event.go | 10 ++++++---- .../type/tablesink/keboola/bridge/event_test.go | 16 ++++++++-------- 2 files changed, 14 insertions(+), 12 deletions(-) diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go index a34fc9b8ad..73567d28a9 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go @@ -174,6 +174,7 @@ func SendEvent( "streamId": params.SourceKey.String(), "sinkId": params.SinkID, }, + Results: map[string]any{}, } if params.SourceName != "" { event.Params["sourceName"] = params.SourceName @@ -181,8 +182,9 @@ func SendEvent( var sErr error defer func() { - // BC compatibility, should be removed later. - event.Results = event.Params + if len(event.Results) == 0 { + event.Results = nil + } event, sErr = api.CreateEventRequest(event).Send(ctx) if sErr == nil { logger.Debugf(ctx, "Sent eventID: %v", event.ID) @@ -191,12 +193,12 @@ func SendEvent( if err != nil { event.Type = "error" - event.Params["error"] = fmt.Sprintf("%s", err) + event.Results["error"] = fmt.Sprintf("%s", err) return sErr } if params.Stats.RecordsCount > 0 { - event.Params["statistics"] = map[string]any{ + event.Results["statistics"] = map[string]any{ "firstRecordAt": params.Stats.FirstRecordAt.String(), "lastRecordAt": params.Stats.LastRecordAt.String(), "recordsCount": params.Stats.RecordsCount, diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go index f616eaac99..dc10603964 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go @@ -51,8 +51,8 @@ func TestBridge_SendSliceUploadEvent_OkEvent(t *testing.T) { "component": "keboola.stream.sliceUpload", "duration": 3, "message": "Slice upload done.", - "params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}", - "results": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}", + "params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", + "results": "{\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},}", "type": "info" }`, body) } @@ -83,8 +83,8 @@ func TestBridge_SendSliceUploadEvent_ErrorEvent(t *testing.T) { "component": "keboola.stream.sliceUpload", "duration": 3, "message": "Slice upload failed.", - "params": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", - "results": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", + "params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", + "results": "{\"error\":\"some error\"}", "type": "error" }`, body) } @@ -138,8 +138,8 @@ func TestBridge_SendFileImportEvent_OkEvent(t *testing.T) { "component": "keboola.stream.fileImport", "duration": 3, "message": "File import done.", - "params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}", - "results": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}", + "params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", + "results": "{\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600}}", "type": "info" }`, body) } @@ -170,8 +170,8 @@ func TestBridge_SendFileImportEvent_ErrorEvent(t *testing.T) { "component": "keboola.stream.fileImport", "duration": 3, "message": "File import failed.", - "params": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", - "results": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", + "params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", + "results": "{\"error\":\"some error\"}", "type": "error" }`, body) }