Skip to content

Commit

Permalink
fix: Stream events schema
Browse files Browse the repository at this point in the history
  • Loading branch information
jachym-tousek-keboola committed Nov 12, 2024
1 parent 8a3bcdb commit b9b043c
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,15 +174,17 @@ func SendEvent(
"streamId": params.SourceKey.String(),
"sinkId": params.SinkID,
},
Results: map[string]any{},
}
if params.SourceName != "" {
event.Params["sourceName"] = params.SourceName
}

var sErr error
defer func() {
// BC compatibility, should be removed later.
event.Results = event.Params
if len(event.Results) == 0 {
event.Results = nil
}
event, sErr = api.CreateEventRequest(event).Send(ctx)
if sErr == nil {
logger.Debugf(ctx, "Sent eventID: %v", event.ID)
Expand All @@ -191,12 +193,12 @@ func SendEvent(

if err != nil {
event.Type = "error"
event.Params["error"] = fmt.Sprintf("%s", err)
event.Results["error"] = fmt.Sprintf("%s", err)
return sErr
}

if params.Stats.RecordsCount > 0 {
event.Params["statistics"] = map[string]any{
event.Results["statistics"] = map[string]any{
"firstRecordAt": params.Stats.FirstRecordAt.String(),
"lastRecordAt": params.Stats.LastRecordAt.String(),
"recordsCount": params.Stats.RecordsCount,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,8 @@ func TestBridge_SendSliceUploadEvent_OkEvent(t *testing.T) {
"component": "keboola.stream.sliceUpload",
"duration": 3,
"message": "Slice upload done.",
"params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}",
"results": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}",
"params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}",
"results": "{\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},}",
"type": "info"
}`, body)
}
Expand Down Expand Up @@ -83,8 +83,8 @@ func TestBridge_SendSliceUploadEvent_ErrorEvent(t *testing.T) {
"component": "keboola.stream.sliceUpload",
"duration": 3,
"message": "Slice upload failed.",
"params": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}",
"results": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}",
"params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}",
"results": "{\"error\":\"some error\"}",
"type": "error"
}`, body)
}
Expand Down Expand Up @@ -138,8 +138,8 @@ func TestBridge_SendFileImportEvent_OkEvent(t *testing.T) {
"component": "keboola.stream.fileImport",
"duration": 3,
"message": "File import done.",
"params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}",
"results": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}",
"params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}",
"results": "{\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600}}",
"type": "info"
}`, body)
}
Expand Down Expand Up @@ -170,8 +170,8 @@ func TestBridge_SendFileImportEvent_ErrorEvent(t *testing.T) {
"component": "keboola.stream.fileImport",
"duration": 3,
"message": "File import failed.",
"params": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}",
"results": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}",
"params": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}",
"results": "{\"error\":\"some error\"}",
"type": "error"
}`, body)
}
Expand Down

0 comments on commit b9b043c

Please sign in to comment.