diff --git a/flink-cyber/flink-indexing/flink-indexing-elastic/pom.xml b/flink-cyber/flink-indexing/flink-indexing-elastic/pom.xml deleted file mode 100644 index 6e65f0b6a..000000000 --- a/flink-cyber/flink-indexing/flink-indexing-elastic/pom.xml +++ /dev/null @@ -1,162 +0,0 @@ - - - - - - flink-indexing - com.cloudera.cyber - 2.4.0 - - 4.0.0 - - flink-indexing-elastic - - - com.cloudera.cyber.indexing.ElasticJobKafka - - - - - com.cloudera.cyber - flink-cyber-api - ${project.parent.version} - compile - - - - com.cloudera.cyber - flink-common - ${project.parent.version} - compile - - - - com.cloudera.cyber - flink-indexing-search - ${project.parent.version} - compile - - - - com.cloudera.cyber - flink-logging - ${project.parent.version} - provided - - - - org.apache.flink - flink-streaming-java - - - - - org.apache.httpcomponents - httpclient - ${global.httpclient.version} - compile - - - - org.apache.flink - flink-connector-elasticsearch7 - - - org.apache.logging.log4j - log4j-to-slf4j - - - - - - org.apache.avro - avro - - - - junit - junit - test - - - - org.testcontainers - elasticsearch - 1.14.3 - test - - - - org.apache.flink - flink-test-utils - test - - - - - - - - - org.apache.maven.plugins - maven-shade-plugin - ${maven-shade-plugin.version} - - - - package - - shade - - - - - org.apache.flink:force-shading - com.google.code.findbugs:jsr305 - org.slf4j:* - log4j:* - org.apache.logging.log4j:* - - - - - - *:* - - META-INF/*.SF - META-INF/*.DSA - META-INF/*.RSA - - - - - - ${main.class} - - - - - - - - - org.apache.maven.plugins - maven-compiler-plugin - - - - \ No newline at end of file diff --git a/flink-cyber/flink-indexing/flink-indexing-elastic/src/main/java/com/cloudera/cyber/indexing/elastic/ElasticJob.java b/flink-cyber/flink-indexing/flink-indexing-elastic/src/main/java/com/cloudera/cyber/indexing/elastic/ElasticJob.java deleted file mode 100644 index 8ba63ed9e..000000000 --- a/flink-cyber/flink-indexing/flink-indexing-elastic/src/main/java/com/cloudera/cyber/indexing/elastic/ElasticJob.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Copyright 2020 - 2022 Cloudera. All Rights Reserved. - * - * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. Refer to the License for the specific permissions and - * limitations governing your use of the file. - */ - -package com.cloudera.cyber.indexing.elastic; - -import com.cloudera.cyber.indexing.IndexEntry; -import com.cloudera.cyber.indexing.SearchIndexJob; -import org.apache.flink.api.common.functions.RuntimeContext; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; -import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; -import org.apache.flink.streaming.connectors.elasticsearch7.ElasticsearchSink; -import org.apache.http.HttpHost; -import org.elasticsearch.action.index.IndexRequest; -import org.elasticsearch.client.Requests; -import org.elasticsearch.client.RestHighLevelClient; - -import java.io.IOException; -import java.util.Arrays; -import java.util.List; - -import static java.util.stream.Collectors.toList; - -public abstract class ElasticJob extends SearchIndexJob { - private static final String PARAMS_ES_HOSTS = "es.hosts"; - protected RestHighLevelClient client; - - @Override - protected final void writeResults(DataStream results, ParameterTool params) throws IOException { - List httpHosts = Arrays.stream(params.getRequired(PARAMS_ES_HOSTS).split(",")) - .map(HttpHost::create).collect(toList()); - ElasticsearchSink.Builder esSinkBuilder = new ElasticsearchSink.Builder<>( - httpHosts, - new ElasticsearchSinkFunction() { - public IndexRequest createIndexRequest(IndexEntry element) { - return Requests.indexRequest() - .index(element.getIndex()) - .id(element.getId()) - .source(element.getFields()); - - } - @Override - public void process(IndexEntry element, RuntimeContext ctx, RequestIndexer indexer) { - indexer.add(createIndexRequest(element)); - } - } - ); - esSinkBuilder.setBulkFlushMaxActions(10000); - esSinkBuilder.setBulkFlushMaxSizeMb(10); - esSinkBuilder.setBulkFlushInterval(1000); - - results.addSink(esSinkBuilder.build()).name("Elastic Sink").uid("elastic-sink"); - } -} diff --git a/flink-cyber/flink-indexing/flink-indexing-elastic/src/main/java/com/cloudera/cyber/indexing/elastic/ElasticJobKafka.java b/flink-cyber/flink-indexing/flink-indexing-elastic/src/main/java/com/cloudera/cyber/indexing/elastic/ElasticJobKafka.java deleted file mode 100644 index 7c44fa91b..000000000 --- a/flink-cyber/flink-indexing/flink-indexing-elastic/src/main/java/com/cloudera/cyber/indexing/elastic/ElasticJobKafka.java +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Copyright 2020 - 2022 Cloudera. All Rights Reserved. - * - * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. Refer to the License for the specific permissions and - * limitations governing your use of the file. - */ - -package com.cloudera.cyber.indexing.elastic; - -import com.cloudera.cyber.Message; -import com.cloudera.cyber.flink.FlinkUtils; -import com.cloudera.cyber.flink.Utils; -import com.cloudera.cyber.indexing.CollectionField; -import com.fasterxml.jackson.databind.ObjectMapper; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.connector.base.DeliveryGuarantee; -import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; -import org.apache.flink.connector.kafka.sink.KafkaSink; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.datastream.DataStreamSource; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.util.Preconditions; -import org.apache.kafka.clients.producer.ProducerRecord; - -import java.io.IOException; -import java.time.Instant; -import java.util.Properties; - -import static com.cloudera.cyber.flink.ConfigConstants.PARAMS_TOPIC_INPUT; -import static com.cloudera.cyber.flink.Utils.readKafkaProperties; - -@Slf4j -public class ElasticJobKafka extends ElasticJob { - - private static final String DEFAULT_TOPIC_CONFIG_LOG = "es.config.log"; - public static final String INDEXER_ELASTIC_GROUP_ID = "indexer-elastic"; - - public static void main(String[] args) throws Exception { - Preconditions.checkArgument(args.length >= 1, "Arguments must consist of a properties files"); - ParameterTool params = Utils.getParamToolsFromProperties(args); - new ElasticJobKafka().createPipeline(params).execute("Indexing - Elastic"); - } - - @Override - protected DataStream createSource(StreamExecutionEnvironment env, ParameterTool params) { - return env.fromSource( - FlinkUtils.createKafkaSource(params.getRequired(PARAMS_TOPIC_INPUT), params, INDEXER_ELASTIC_GROUP_ID), - WatermarkStrategy.noWatermarks(), "Kafka Source").uid("kafka-source"); - } - - @Override - protected DataStream createConfigSource(StreamExecutionEnvironment env, ParameterTool params) { - DataStreamSource dataStreamSource = env.addSource( - new ElasticTemplateFieldsSource(client, - params.getLong(PARAMS_SCHEMA_REFRESH_INTERVAL, DEFAULT_SCHEMA_REFRESH_INTERVAL)) - ); - return dataStreamSource - .name("Schema Stream").uid("schema-stream") - .setMaxParallelism(1) - .setParallelism(1); - } - - @Override - protected void logConfig(DataStream configSource, ParameterTool params) { - String topic = params.get(PARAMS_TOPIC_CONFIG_LOG, DEFAULT_TOPIC_CONFIG_LOG); - Properties kafkaProperties = readKafkaProperties(params, INDEXER_ELASTIC_GROUP_ID,false); - log.info("Creating Kafka Sink for {}, using {}", topic, kafkaProperties); - KafkaSink kafkaSink = KafkaSink.builder().setRecordSerializer( - (KafkaRecordSerializationSchema) (collectionField, kafkaSinkContext, timestamp) -> { - ObjectMapper om = new ObjectMapper(); - try { - return new ProducerRecord<>(topic, null, Instant.now().toEpochMilli(), collectionField.getKey().getBytes(), om.writeValueAsBytes(collectionField)); - } catch (IOException e) { - return new ProducerRecord<>(topic, null, Instant.now().toEpochMilli(), collectionField.getKey().getBytes(), e.getMessage().getBytes()); - } - }).setKafkaProducerConfig(kafkaProperties).setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE).build(); - - configSource.sinkTo(kafkaSink).name("Schema Log").uid("schema-log") - .setParallelism(1); - } - -} diff --git a/flink-cyber/flink-indexing/flink-indexing-elastic/src/main/java/com/cloudera/cyber/indexing/elastic/ElasticTemplateFieldsSource.java b/flink-cyber/flink-indexing/flink-indexing-elastic/src/main/java/com/cloudera/cyber/indexing/elastic/ElasticTemplateFieldsSource.java deleted file mode 100644 index 5f998e89e..000000000 --- a/flink-cyber/flink-indexing/flink-indexing-elastic/src/main/java/com/cloudera/cyber/indexing/elastic/ElasticTemplateFieldsSource.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Copyright 2020 - 2022 Cloudera. All Rights Reserved. - * - * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. Refer to the License for the specific permissions and - * limitations governing your use of the file. - */ - -package com.cloudera.cyber.indexing.elastic; - -import com.cloudera.cyber.indexing.CollectionField; -import lombok.NonNull; -import lombok.RequiredArgsConstructor; -import lombok.extern.slf4j.Slf4j; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.http.HttpHost; -import org.elasticsearch.client.RequestOptions; -import org.elasticsearch.client.RestClient; -import org.elasticsearch.client.RestHighLevelClient; -import org.elasticsearch.client.indices.GetIndexTemplatesRequest; -import org.elasticsearch.client.indices.GetIndexTemplatesResponse; - -import java.time.Instant; -import java.util.ArrayList; -import java.util.Arrays; - -@Slf4j -@RequiredArgsConstructor -public class ElasticTemplateFieldsSource extends RichParallelSourceFunction { - - @NonNull - private RestHighLevelClient client; - @NonNull - private final long delay; - private volatile boolean isRunning = true; - - @Override - public void open(Configuration parameters) throws Exception { - super.open(parameters); - HttpHost[] hosts = (HttpHost[]) Arrays.stream(parameters.getString("es.host", "", false).split(",")) - .map(HttpHost::create).toArray(); - this.client = new RestHighLevelClient(RestClient.builder(hosts)); - } - - @Override - public void run(SourceContext sourceContext) throws Exception { - while (isRunning) { - try { - GetIndexTemplatesResponse response = client.indices() - .getIndexTemplate(new GetIndexTemplatesRequest(), RequestOptions.DEFAULT); - response.getIndexTemplates().stream() - .map( - template -> CollectionField.builder() - .key(template.name()) - .values(new ArrayList<>(template.mappings().getSourceAsMap().keySet())) - .build()) - .forEach(c -> { - long now = Instant.now().toEpochMilli(); - sourceContext.collectWithTimestamp(c, now); - sourceContext.emitWatermark(new Watermark(now)); - }); - Thread.sleep(delay); - } catch (Exception e) { - log.error("Solr Collection updater failed", e); - throw (e); - } - } - } - - @Override - public void cancel() { - isRunning = false; - } -} diff --git a/flink-cyber/flink-indexing/flink-indexing-elastic/src/test/java/com/cloudera/cyber/indexing/elastic/ElasticJobTest.java b/flink-cyber/flink-indexing/flink-indexing-elastic/src/test/java/com/cloudera/cyber/indexing/elastic/ElasticJobTest.java deleted file mode 100644 index ab28c8892..000000000 --- a/flink-cyber/flink-indexing/flink-indexing-elastic/src/test/java/com/cloudera/cyber/indexing/elastic/ElasticJobTest.java +++ /dev/null @@ -1,85 +0,0 @@ -/* - * Copyright 2020 - 2022 Cloudera. All Rights Reserved. - * - * This file is licensed under the Apache License Version 2.0 (the "License"). You may not use this file - * except in compliance with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0. - * - * This file is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, - * either express or implied. Refer to the License for the specific permissions and - * limitations governing your use of the file. - */ - -package com.cloudera.cyber.indexing.elastic; - -import com.cloudera.cyber.Message; -import com.cloudera.cyber.indexing.CollectionField; -import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.api.java.utils.ParameterTool; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.test.util.JobTester; -import org.apache.flink.test.util.ManualSource; -import org.junit.*; -import org.testcontainers.elasticsearch.ElasticsearchContainer; - -import static org.junit.Assert.fail; - -public class ElasticJobTest extends ElasticJob { - - private ManualSource source; - private ManualSource configSource; - - @Rule - public ElasticsearchContainer container = new ElasticsearchContainer(); - - @Before - public void before() { - container.start(); - } - - @After - public void after() { - container.stop(); - } - - @Override - public DataStream createSource(StreamExecutionEnvironment env, ParameterTool params) { - source = JobTester.createManualSource(env, TypeInformation.of(Message.class)); - return source.getDataStream(); - } - - @Override - protected DataStream createConfigSource(StreamExecutionEnvironment env, ParameterTool params) { - configSource = JobTester.createManualSource(env, TypeInformation.of(CollectionField.class)); - return configSource.getDataStream(); - } - - @Override - protected void logConfig(DataStream configSource, ParameterTool params) { - } - - @Test - @Ignore - public void testSingleDestination() { - fail("Not implemented"); - } - - @Test - @Ignore - public void testMultipleDestination() { - fail("Not implemented"); - } - - @Test - @Ignore - public void testFieldFiltering() { - fail("Not implemented"); - } - - @Test - @Ignore - public void testEventFiltering() { - fail("Not implemented"); - } -}