Skip to content

Commit

Permalink
Adding SegmentMetadataEvent and publishing them via KafkaEmitter (apa…
Browse files Browse the repository at this point in the history
…che#14281) (#139)

(cherry picked from commit 4ff6026)
  • Loading branch information
harinirajendran authored Jun 8, 2023
1 parent dc00aaf commit 12cc93c
Show file tree
Hide file tree
Showing 10 changed files with 86 additions and 43 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
import org.apache.druid.java.util.emitter.core.EventMap;
import org.joda.time.DateTime;

/**
* The event that gets generated whenever a segment is committed
*/
public class SegmentMetadataEvent implements Event
{
public static final String FEED = "feed";
Expand All @@ -34,11 +37,29 @@ public class SegmentMetadataEvent implements Event
public static final String VERSION = "version";
public static final String IS_COMPACTED = "isCompacted";

/**
* Time at which the segment metadata event is created
*/
private final DateTime createdTime;
/**
* Datasource for which the segment is committed
*/
private final String dataSource;
/**
* Start interval of the committed segment
*/
private final DateTime startTime;
/**
* End interval of the committed segment
*/
private final DateTime endTime;
/**
* Version of the committed segment
*/
private final String version;
/**
* Is the segment, a compacted segment or not
*/
private final boolean isCompacted;

public SegmentMetadataEvent(
Expand Down
26 changes: 13 additions & 13 deletions docs/development/extensions-contrib/kafka-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,28 +36,28 @@ to monitor the status of your Druid cluster with this extension.

All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`.

| property | description | required? | default |
|----------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`) | yes | none |
| `druid.emitter.kafka.event.types` | Comma-separated event types. <br/>Choices: alerts, metrics, requests, segmentMetadata | no | ["metrics", "alerts"] |
| `druid.emitter.kafka.metric.topic` | Kafka topic name for emitter's target to emit service metric. If `event.types` contains `metrics`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.alert.topic` | Kafka topic name for emitter's target to emit alert. If `event.types` contains `alerts`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.request.topic` | Kafka topic name for emitter's target to emit request logs. If `event.types` contains `requests`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segments related metadata. If `event.types` contains `segmentMetadata`, this field cannot be left empty | no | none |
| Property | Description | Required | Default |
|----------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`) | yes | none |
| `druid.emitter.kafka.event.types` | Comma-separated event types. <br/>Supported types are `alerts`, `metrics`, `requests`, and `segment_metadata`. | no | `["metrics", "alerts"]` |
| `druid.emitter.kafka.metric.topic` | Kafka topic name for emitter's target to emit service metrics. If `event.types` contains `metrics`, this field cannot be empty. | no | none |
| `druid.emitter.kafka.alert.topic` | Kafka topic name for emitter's target to emit alerts. If `event.types` contains `alerts`, this field cannot empty. | no | none |
| `druid.emitter.kafka.request.topic` | Kafka topic name for emitter's target to emit request logs. If `event.types` contains `requests`, this field cannot be empty. | no | none |
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segment metadata. If `event.types` contains `segment_metadata`, this field cannot be empty. | no | none |
| `druid.emitter.kafka.segmentMetadata.topic.format` | Format in which segment related metadata will be emitted. <br/>Choices: json, protobuf.<br/> If set to `protobuf`, then segment metadata is emitted in `DruidSegmentEvent.proto` format | no | json |
| `druid.emitter.kafka.producer.config` | JSON formatted configuration which user want to set additional properties to Kafka producer. | no | none |
| `druid.emitter.kafka.clusterName` | Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. | no | none |
| `druid.emitter.kafka.producer.config` | JSON configuration to set additional properties to Kafka producer. | no | none |
| `druid.emitter.kafka.clusterName` | Optional value to specify the name of your Druid cluster. It can help make groups in your monitoring environment. | no | none |

### Example

```
druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
druid.emitter.kafka.event.types=["alerts", "requests", "segmentMetadata"]
druid.emitter.kafka.event.types=["metrics", alerts", "requests", "segment_metadata"]
druid.emitter.kafka.metric.topic=druid-metric
druid.emitter.kafka.alert.topic=druid-alert
druid.emitter.kafka.request.topic=druid-request-logs
druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
druid.emitter.kafka.segmentMetadata.topic.format=protobuf
druid.emitter.kafka.segmentMetadata.topic.format=protobuf
druid.emitter.kafka.producer.config={"max.block.ms":10000}
```
Whenever `druid.emitter.kafka.segmentMetadata.topic.format` field is updated, it is recommended to also update `druid.emitter.kafka.segmentMetadata.topic` to avoid the same topic from getting polluted with different formats of segment metadata.

Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.hadoop.metrics2.sink.timeline.AbstractTimelineMetricsSink;
import org.apache.hadoop.metrics2.sink.timeline.TimelineMetric;
Expand Down Expand Up @@ -137,6 +138,8 @@ public void emit(Event event)
for (Emitter emitter : emitterList) {
emitter.emit(event);
}
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;

import java.util.LinkedHashMap;
Expand Down Expand Up @@ -127,6 +128,8 @@ public void emit(Event event)
for (Emitter emitter : alertEmitters) {
emitter.emit(event);
}
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
throw new ISE("unknown event type [%s]", event.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.druid.java.util.emitter.core.Emitter;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.service.AlertEvent;
import org.apache.druid.java.util.emitter.service.SegmentMetadataEvent;
import org.apache.druid.java.util.emitter.service.ServiceMetricEvent;
import org.apache.druid.server.log.RequestLogEvent;

Expand Down Expand Up @@ -139,6 +140,8 @@ public void emit(Event event)
"The following alert is dropped, description is [%s], severity is [%s]",
alertEvent.getDescription(), alertEvent.getSeverity()
);
} else if (event instanceof SegmentMetadataEvent) {
// do nothing. Ignore this event type
} else {
log.error("unknown event type [%s]", event.getClass());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,16 +138,16 @@ public void start()
if (eventTypes.contains(EventType.REQUESTS)) {
scheduler.schedule(this::sendRequestToKafka, sendInterval, TimeUnit.SECONDS);
}
if (eventTypes.contains(EventType.SEGMENTMETADATA)) {
if (eventTypes.contains(EventType.SEGMENT_METADATA)) {
scheduler.schedule(this::sendSegmentMetadataToKafka, sendInterval, TimeUnit.SECONDS);
}
scheduler.scheduleWithFixedDelay(() -> {
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], invalidLost=[%d] segmentMetadataLost=[%d]",
log.info("Message lost counter: metricLost=[%d], alertLost=[%d], requestLost=[%d], segmentMetadataLost=[%d], invalidLost=[%d]",
metricLost.get(),
alertLost.get(),
requestLost.get(),
invalidLost.get(),
segmentMetadataLost.get()
segmentMetadataLost.get(),
invalidLost.get()
);
}, DEFAULT_SEND_LOST_INTERVAL_MINUTES, DEFAULT_SEND_LOST_INTERVAL_MINUTES, TimeUnit.MINUTES);
log.info("Starting Kafka Emitter.");
Expand Down Expand Up @@ -204,6 +204,7 @@ public void emit(final Event event)
resultBytes,
resultBytes.length
);

Set<EventType> eventTypes = config.getEventTypes();
if (event instanceof ServiceMetricEvent) {
if (!eventTypes.contains(EventType.METRICS) || !metricQueue.offer(objectContainer)) {
Expand All @@ -218,7 +219,7 @@ public void emit(final Event event)
requestLost.incrementAndGet();
}
} else if (event instanceof SegmentMetadataEvent) {
if (!eventTypes.contains(EventType.SEGMENTMETADATA)) {
if (!eventTypes.contains(EventType.SEGMENT_METADATA)) {
segmentMetadataLost.incrementAndGet();
} else {
switch (config.getSegmentMetadataTopicFormat()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,7 @@ public enum EventType
METRICS,
ALERTS,
REQUESTS,
SEGMENTMETADATA {
@Override
public String toString()
{
return "segmentMetadata";
}
};
SEGMENT_METADATA;

@JsonValue
@Override
Expand All @@ -62,8 +56,6 @@ public static EventType fromString(String name)
}
}

public static final Set<EventType> DEFAULT_EVENT_TYPES = ImmutableSet.of(EventType.ALERTS, EventType.METRICS);

public enum SegmentMetadataTopicFormat
{
JSON,
Expand All @@ -83,6 +75,7 @@ public static SegmentMetadataTopicFormat fromString(String name)
}
}

public static final Set<EventType> DEFAULT_EVENT_TYPES = ImmutableSet.of(EventType.ALERTS, EventType.METRICS);
@JsonProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)
private final String bootstrapServers;
@Nullable @JsonProperty("event.types")
Expand Down Expand Up @@ -115,19 +108,18 @@ public KafkaEmitterConfig(
@JsonProperty("producer.config") @Nullable Map<String, String> kafkaProducerConfig
)
{
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "bootstrap.servers can not be null");
this.eventTypes = validateEventTypes(eventTypes, requestTopic);
this.bootstrapServers = Preconditions.checkNotNull(bootstrapServers, "druid.emitter.kafka.bootstrap.servers can not be null");
this.eventTypes = maybeUpdateEventTypes(eventTypes, requestTopic);
this.segmentMetadataTopicFormat = segmentMetadataTopicFormat == null ? SegmentMetadataTopicFormat.JSON : segmentMetadataTopicFormat;

this.metricTopic = this.eventTypes.contains(EventType.METRICS) ? Preconditions.checkNotNull(metricTopic, "metric.topic can not be null") : null;
this.alertTopic = this.eventTypes.contains(EventType.ALERTS) ? Preconditions.checkNotNull(alertTopic, "alert.topic can not be null") : null;
this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? Preconditions.checkNotNull(requestTopic, "request.topic can not be null") : null;
this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENTMETADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "segmentMetadata.topic can not be null") : null;
this.metricTopic = this.eventTypes.contains(EventType.METRICS) ? Preconditions.checkNotNull(metricTopic, "druid.emitter.kafka.metric.topic can not be null") : null;
this.alertTopic = this.eventTypes.contains(EventType.ALERTS) ? Preconditions.checkNotNull(alertTopic, "druid.emitter.kafka.alert.topic can not be null") : null;
this.requestTopic = this.eventTypes.contains(EventType.REQUESTS) ? Preconditions.checkNotNull(requestTopic, "druid.emitter.kafka.request.topic can not be null") : null;
this.segmentMetadataTopic = this.eventTypes.contains(EventType.SEGMENT_METADATA) ? Preconditions.checkNotNull(segmentMetadataTopic, "druid.emitter.kafka.segmentMetadata.topic can not be null") : null;
this.clusterName = clusterName;
this.kafkaProducerConfig = kafkaProducerConfig == null ? ImmutableMap.of() : kafkaProducerConfig;
}

private Set<EventType> validateEventTypes(Set<EventType> eventTypes, String requestTopic)
private Set<EventType> maybeUpdateEventTypes(Set<EventType> eventTypes, String requestTopic)
{
// Unless explicitly overridden, kafka emitter will always emit metrics and alerts
if (eventTypes == null) {
Expand Down Expand Up @@ -262,7 +254,7 @@ public String toString()
{
return "KafkaEmitterConfig{" +
"bootstrap.servers='" + bootstrapServers + '\'' +
", event.types='" + eventTypes.toString() + '\'' +
", event.types='" + eventTypes + '\'' +
", metric.topic='" + metricTopic + '\'' +
", alert.topic='" + alertTopic + '\'' +
", request.topic='" + requestTopic + '\'' +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.HashSet;
import java.util.Set;

public class KafkaEmitterConfigTest
{
Expand Down Expand Up @@ -68,6 +70,22 @@ public void testSerDeserKafkaEmitterConfigNullRequestTopic() throws IOException
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}

@Test
public void testSerDeserKafkaEmitterConfigNullMetricsTopic() throws IOException
{
Set<KafkaEmitterConfig.EventType> eventTypeSet = new HashSet<KafkaEmitterConfig.EventType>();
eventTypeSet.add(KafkaEmitterConfig.EventType.SEGMENT_METADATA);
KafkaEmitterConfig kafkaEmitterConfig = new KafkaEmitterConfig("hostname", eventTypeSet, null,
"alertTest", null, "metadataTest", null,
"clusterNameTest", ImmutableMap.<String, String>builder()
.put("testKey", "testValue").build()
);
String kafkaEmitterConfigString = mapper.writeValueAsString(kafkaEmitterConfig);
KafkaEmitterConfig kafkaEmitterConfigExpected = mapper.readerFor(KafkaEmitterConfig.class)
.readValue(kafkaEmitterConfigString);
Assert.assertEquals(kafkaEmitterConfigExpected, kafkaEmitterConfig);
}

@Test
public void testSerDeNotRequiredKafkaProducerConfig()
{
Expand All @@ -87,9 +105,9 @@ public void testSerDeNotRequiredKafkaProducerConfig()
@Test
public void testDeserializeEventTypesWithDifferentCase() throws JsonProcessingException
{
Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENTMETADATA, mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class));
Assert.assertEquals(KafkaEmitterConfig.EventType.SEGMENT_METADATA, mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class));
Assert.assertEquals(KafkaEmitterConfig.EventType.ALERTS, mapper.readValue("\"alerts\"", KafkaEmitterConfig.EventType.class));
Assert.assertThrows(ValueInstantiationException.class, () -> mapper.readValue("\"segment_metadata\"", KafkaEmitterConfig.EventType.class));
Assert.assertThrows(ValueInstantiationException.class, () -> mapper.readValue("\"segmentMetadata\"", KafkaEmitterConfig.EventType.class));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,13 @@ public class KafkaEmitterTest
public static Object[] data()
{
return new Object[][] {
{new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENTMETADATA)), "requests"},
{new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENTMETADATA)), null}
{new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.REQUESTS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENT_METADATA)), "requests"},
{new HashSet<>(Arrays.asList(KafkaEmitterConfig.EventType.METRICS, KafkaEmitterConfig.EventType.ALERTS, KafkaEmitterConfig.EventType.SEGMENT_METADATA)), null}
};
}

// there is 1 seconds wait in kafka emitter before it starts sending events to broker, set a timeout for 5 seconds
@Test(timeout = 5_000)
// there is 1 seconds wait in kafka emitter before it starts sending events to broker, set a timeout for 10 seconds
@Test(timeout = 10_000)
public void testKafkaEmitter() throws InterruptedException
{
final List<ServiceMetricEvent> serviceMetricEvents = ImmutableList.of(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,9 @@ public SegmentPublishResult perform(Task task, TaskActionToolbox toolbox)
segment.getShardSpec() == null ? null : segment.getShardSpec().getType()
);
toolbox.getEmitter().emit(metricBuilder.build("segment/added/bytes", segment.getSize()));
// Emit the segment related metadata using the configured emitters
// Emit the segment related metadata using the configured emitters.
// There is a possibility that some segments' metadata event might get missed if the
// server crashes after commiting segment but before emitting the event.
this.emitSegmentMetadata(segment, toolbox);
}

Expand Down

0 comments on commit 12cc93c

Please sign in to comment.