diff --git a/x-pack/filebeat/input/azureeventhub/v2_input.go b/x-pack/filebeat/input/azureeventhub/v2_input.go index 03145163a49..4f3645f513f 100644 --- a/x-pack/filebeat/input/azureeventhub/v2_input.go +++ b/x-pack/filebeat/input/azureeventhub/v2_input.go @@ -388,7 +388,7 @@ func (in *eventHubInputV2) workersLoop(ctx context.Context, processor *azeventhu go func() { in.log.Infow( "starting a partition worker", - "partition", partitionID, + "partition_id", partitionID, ) if err := in.processEventsForPartition(ctx, processorPartitionClient); err != nil { @@ -397,13 +397,13 @@ func (in *eventHubInputV2) workersLoop(ctx context.Context, processor *azeventhu in.log.Infow( "stopping processing events for partition", "reason", err, - "partition", partitionID, + "partition_id", partitionID, ) } in.log.Infow( "partition worker exited", - "partition", partitionID, + "partition_id", partitionID, ) }() } @@ -428,7 +428,10 @@ func (in *eventHubInputV2) processEventsForPartition(ctx context.Context, partit // 3/3 [END] Do cleanup here, like shutting down database clients // or other resources used for processing this partition. shutdownPartitionResources(ctx, partitionClient, pipelineClient) - in.log.Debugw("partition resources cleaned up", "partition", partitionID) + in.log.Debugw( + "partition resources cleaned up", + "partition_id", partitionID, + ) }() // 2/3 [CONTINUOUS] Receive events, checkpointing as needed using UpdateCheckpoint. @@ -444,7 +447,7 @@ func (in *eventHubInputV2) processEventsForPartition(ctx context.Context, partit if errors.As(err, &eventHubError) && eventHubError.Code == azeventhubs.ErrorCodeOwnershipLost { in.log.Infow( "ownership lost for partition, stopping processing", - "partition", partitionID, + "partition_id", partitionID, ) return nil @@ -454,6 +457,10 @@ func (in *eventHubInputV2) processEventsForPartition(ctx context.Context, partit } if len(events) == 0 { + in.log.Debugw( + "no events received", + "partition_id", partitionID, + ) continue } @@ -467,30 +474,32 @@ func (in *eventHubInputV2) processEventsForPartition(ctx context.Context, partit // processReceivedEvents func (in *eventHubInputV2) processReceivedEvents(receivedEvents []*azeventhubs.ReceivedEventData, partitionID string, pipelineClient beat.Client) error { processingStartTime := time.Now() - eventHubMetadata := mapstr.M{ - "partition_id": partitionID, - "eventhub": in.config.EventHubName, - "consumer_group": in.config.ConsumerGroup, - } for _, receivedEventData := range receivedEvents { + eventHubMetadata := mapstr.M{ + "partition_id": partitionID, + "eventhub": in.config.EventHubName, + "consumer_group": in.config.ConsumerGroup, + } + // Update input metrics. in.metrics.receivedMessages.Inc() in.metrics.receivedBytes.Add(uint64(len(receivedEventData.Body))) + _, _ = eventHubMetadata.Put("offset", receivedEventData.Offset) + _, _ = eventHubMetadata.Put("sequence_number", receivedEventData.SequenceNumber) + _, _ = eventHubMetadata.Put("enqueued_time", receivedEventData.EnqueuedTime) + + // The partition key is optional. + if receivedEventData.PartitionKey != nil { + _, _ = eventHubMetadata.Put("partition_key", *receivedEventData.PartitionKey) + } + // A single event can contain multiple records. // We create a new event for each record. records := in.messageDecoder.Decode(receivedEventData.Body) for _, record := range records { - _, _ = eventHubMetadata.Put("offset", receivedEventData.Offset) - _, _ = eventHubMetadata.Put("sequence_number", receivedEventData.SequenceNumber) - _, _ = eventHubMetadata.Put("enqueued_time", receivedEventData.EnqueuedTime) - - // The partition key is optional. - if receivedEventData.PartitionKey != nil { - _, _ = eventHubMetadata.Put("partition_key", *receivedEventData.PartitionKey) - } event := beat.Event{ // this is the default value for the @timestamp field; usually the ingest @@ -537,7 +546,7 @@ func initializePartitionResources(ctx context.Context, partitionClient *azeventh if !ok { log.Errorw( "error updating checkpoint", - "partition", partitionClient.PartitionID(), + "partition_id", partitionClient.PartitionID(), "acked", acked, "error", "invalid data type", "type", fmt.Sprintf("%T", data), @@ -555,7 +564,7 @@ func initializePartitionResources(ctx context.Context, partitionClient *azeventh log.Debugw( "checkpoint updated", - "partition", partitionClient.PartitionID(), + "partition_id", partitionClient.PartitionID(), "acked", acked, "sequence_number", receivedEventData.SequenceNumber, "offset", receivedEventData.Offset,