Skip to content

Commit

Permalink
OBSDATA-440 Adding SegmentMetadataEvent and publishing them via Kafka…
Browse files Browse the repository at this point in the history
…SegmentMetadataEmitter (#117)
  • Loading branch information
harinirajendran authored Nov 2, 2022
1 parent 041215a commit 7935ac0
Show file tree
Hide file tree
Showing 11 changed files with 563 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.emitter.service;

import com.fasterxml.jackson.annotation.JsonValue;
import org.apache.druid.java.util.emitter.core.Event;
import org.apache.druid.java.util.emitter.core.EventMap;
import org.joda.time.DateTime;

public class SegmentMetadataEvent implements Event
{
public static final String FEED = "feed";
public static final String DATASOURCE = "dataSource";
public static final String CREATED_TIME = "createdTime";
public static final String START_TIME = "startTime";
public static final String END_TIME = "endTime";
public static final String VERSION = "version";
public static final String IS_COMPACTED = "isCompacted";

private final DateTime createdTime;
private final String dataSource;
private final DateTime startTime;
private final DateTime endTime;
private final String version;
private final boolean isCompacted;

public SegmentMetadataEvent(
String dataSource,
DateTime createdTime,
DateTime startTime,
DateTime endTime,
String version,
boolean isCompacted
)
{
this.dataSource = dataSource;
this.createdTime = createdTime;
this.startTime = startTime;
this.endTime = endTime;
this.version = version;
this.isCompacted = isCompacted;
}

@Override
public String getFeed()
{
return "segment_metadata";
}

public DateTime getCreatedTime()
{
return createdTime;
}

public DateTime getStartTime()
{
return startTime;
}

public DateTime getEndTime()
{
return endTime;
}

public String getDataSource()
{
return dataSource;
}

public String getVersion()
{
return version;
}

public boolean isCompacted()
{
return isCompacted;
}

@Override
@JsonValue
public EventMap toMap()
{

return EventMap.builder()
.put(FEED, getFeed())
.put(DATASOURCE, dataSource)
.put(CREATED_TIME, createdTime)
.put(START_TIME, startTime)
.put(END_TIME, endTime)
.put(VERSION, version)
.put(IS_COMPACTED, isCompacted)
.build();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.druid.java.util.emitter.service;

import com.google.common.collect.ImmutableMap;
import org.apache.druid.java.util.common.DateTimes;
import org.junit.Assert;
import org.junit.Test;

public class SegmentMetadataEventTest
{
@Test
public void testBasicEvent()
{
SegmentMetadataEvent event = new SegmentMetadataEvent(
"dummy_datasource",
DateTimes.of("2001-01-01T00:00:00.000Z"),
DateTimes.of("2001-01-02T00:00:00.000Z"),
DateTimes.of("2001-01-03T00:00:00.000Z"),
"dummy_version",
true
);

Assert.assertEquals(
ImmutableMap.<String, Object>builder()
.put(SegmentMetadataEvent.FEED, "segment_metadata")
.put(SegmentMetadataEvent.DATASOURCE, "dummy_datasource")
.put(SegmentMetadataEvent.CREATED_TIME, DateTimes.of("2001-01-01T00:00:00.000Z"))
.put(SegmentMetadataEvent.START_TIME, DateTimes.of("2001-01-02T00:00:00.000Z"))
.put(SegmentMetadataEvent.END_TIME, DateTimes.of("2001-01-03T00:00:00.000Z"))
.put(SegmentMetadataEvent.VERSION, "dummy_version")
.put(SegmentMetadataEvent.IS_COMPACTED, true)
.build(),
event.toMap()
);
}
}
26 changes: 17 additions & 9 deletions docs/development/extensions-contrib/kafka-emitter.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,28 @@ to monitor the status of your Druid cluster with this extension.

All the configuration parameters for the Kafka emitter are under `druid.emitter.kafka`.

|property|description|required?|default|
|--------|-----------|---------|-------|
|`druid.emitter.kafka.bootstrap.servers`|Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`)|yes|none|
|`druid.emitter.kafka.metric.topic`|Kafka topic name for emitter's target to emit service metric.|yes|none|
|`druid.emitter.kafka.alert.topic`|Kafka topic name for emitter's target to emit alert.|yes|none|
|`druid.emitter.kafka.request.topic`|Kafka topic name for emitter's target to emit request logs. If left empty then request logs will not be sent to the Kafka topic.|no|none|
|`druid.emitter.kafka.producer.config`|JSON formatted configuration which user want to set additional properties to Kafka producer.|no|none|
|`druid.emitter.kafka.clusterName`|Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. |no|none|
| property | description | required? | default |
|----------------------------------------------------|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|-----------|-----------------------|
| `druid.emitter.kafka.bootstrap.servers` | Comma-separated Kafka broker. (`[hostname:port],[hostname:port]...`) | yes | none |
| `druid.emitter.kafka.event.types` | Comma-separated event types. <br/>Choices: alerts, metrics, requests, segmentMetadata | no | ["metrics", "alerts"] |
| `druid.emitter.kafka.metric.topic` | Kafka topic name for emitter's target to emit service metric. If `event.types` contains `metrics`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.alert.topic` | Kafka topic name for emitter's target to emit alert. If `event.types` contains `alerts`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.request.topic` | Kafka topic name for emitter's target to emit request logs. If `event.types` contains `requests`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.segmentMetadata.topic` | Kafka topic name for emitter's target to emit segments related metadata. If `event.types` contains `segmentMetadata`, this field cannot be left empty | no | none |
| `druid.emitter.kafka.segmentMetadata.topic.format` | Format in which segment related metadata will be emitted. <br/>Choices: json, protobuf.<br/> If set to `protobuf`, then segment metadata is emitted in `DruidSegmentEvent.proto` format | no | json |
| `druid.emitter.kafka.producer.config` | JSON formatted configuration which user want to set additional properties to Kafka producer. | no | none |
| `druid.emitter.kafka.clusterName` | Optional value to specify name of your druid cluster. It can help make groups in your monitoring environment. | no | none |

### Example

```
druid.emitter.kafka.bootstrap.servers=hostname1:9092,hostname2:9092
druid.emitter.kafka.metric.topic=druid-metric
druid.emitter.kafka.event.types=["alerts", "requests", "segmentMetadata"]
druid.emitter.kafka.alert.topic=druid-alert
druid.emitter.kafka.request.topic=druid-request-logs
druid.emitter.kafka.segmentMetadata.topic=druid-segment-metadata
druid.emitter.kafka.segmentMetadata.topic.format=protobuf
druid.emitter.kafka.producer.config={"max.block.ms":10000}
```
Whenever `druid.emitter.kafka.segmentMetadata.topic.format` field is updated, it is recommended to also update `druid.emitter.kafka.segmentMetadata.topic` to avoid the same topic from getting polluted with different formats of segment metadata.

46 changes: 45 additions & 1 deletion extensions-contrib/kafka-emitter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,11 @@
<artifactId>jackson-core</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>joda-time</groupId>
<artifactId>joda-time</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down Expand Up @@ -112,5 +116,45 @@
<version>${project.parent.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version>
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>detect</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
</configuration>
</plugin>
</plugins>
</build>
</project>
Loading

0 comments on commit 7935ac0

Please sign in to comment.