Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions flink-connector-elasticsearch8/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,27 @@ under the License.
<type>test-jar</type>
<scope>test</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-runtime</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-loader</artifactId>
<version>${flink.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>${jackson.version}</version>
</dependency>

</dependencies>

<dependencyManagement>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
package org.apache.flink.connector.elasticsearch.sink;

import org.apache.flink.api.connector.sink2.SinkWriter;
import org.apache.flink.api.connector.sink2.WriterInitContext;
import org.apache.flink.connector.base.sink.AsyncSinkBaseBuilder;
import org.apache.flink.connector.base.sink.writer.ElementConverter;
import org.apache.flink.util.function.SerializableSupplier;
Expand Down Expand Up @@ -80,6 +81,15 @@ public class Elasticsearch8AsyncSinkBuilder<InputT>
*/
private ElementConverter<InputT, BulkOperationVariant> elementConverter;

/** the path's prefix for every request. */
private String connectionPathPrefix;

private Integer connectionTimeout;

private Integer connectionRequestTimeout;

private Integer socketTimeout;

private SerializableSupplier<SSLContext> sslContextSupplier;

private SerializableSupplier<HostnameVerifier> sslHostnameVerifier;
Expand All @@ -97,6 +107,28 @@ public Elasticsearch8AsyncSinkBuilder<InputT> setHosts(HttpHost... hosts) {
return this;
}

public Elasticsearch8AsyncSinkBuilder<InputT> setConnectionPathPrefix(
String connectionPathPrefix) {
this.connectionPathPrefix = connectionPathPrefix;
return this;
}

public Elasticsearch8AsyncSinkBuilder<InputT> setConnectionTimeout(Integer connectionTimeout) {
this.connectionTimeout = connectionTimeout;
return this;
}

public Elasticsearch8AsyncSinkBuilder<InputT> setConnectionRequestTimeout(
Integer connectionRequestTimeout) {
this.connectionRequestTimeout = connectionRequestTimeout;
return this;
}

public Elasticsearch8AsyncSinkBuilder<InputT> setSocketTimeout(Integer socketTimeout) {
this.socketTimeout = socketTimeout;
return this;
}

/**
* setHeaders set the headers to be sent with the requests made to Elasticsearch cluster..
*
Expand Down Expand Up @@ -239,7 +271,16 @@ private OperationConverter<InputT> buildOperationConverter(
private NetworkConfig buildNetworkConfig() {
checkArgument(!hosts.isEmpty(), "Hosts cannot be empty.");
return new NetworkConfig(
hosts, username, password, headers, sslContextSupplier, sslHostnameVerifier);
hosts,
username,
password,
headers,
connectionPathPrefix,
connectionRequestTimeout,
connectionTimeout,
socketTimeout,
sslContextSupplier,
sslHostnameVerifier);
}

/** A wrapper that evolves the Operation, since a BulkOperationVariant is not Serializable. */
Expand All @@ -250,6 +291,12 @@ public OperationConverter(ElementConverter<T, BulkOperationVariant> converter) {
this.converter = converter;
}

@Override
public void open(WriterInitContext context) {
// call converter.open() before calling converter.apply()
converter.open(context);
}

@Override
public Operation apply(T element, SinkWriter.Context context) {
return new Operation(converter.apply(element, context));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@
import co.elastic.clients.elasticsearch.ElasticsearchAsyncClient;
import co.elastic.clients.json.jackson.JacksonJsonpMapper;
import co.elastic.clients.transport.rest_client.RestClientTransport;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.json.JsonMapper;
import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalTimeSerializer;
import org.apache.http.Header;
import org.apache.http.HttpHost;
import org.apache.http.auth.AuthScope;
Expand All @@ -40,43 +46,66 @@
import javax.net.ssl.SSLContext;

import java.io.Serializable;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.format.DateTimeFormatter;
import java.util.List;

import static org.apache.flink.util.Preconditions.checkState;

/** A factory that creates valid ElasticsearchClient instances. */
public class NetworkConfig implements Serializable {
private final List<HttpHost> hosts;

private final List<Header> headers;

private final String username;

private final String password;

@Nullable private final String connectionPathPrefix;
@Nullable Integer connectionRequestTimeout;
@Nullable Integer connectionTimeout;
@Nullable Integer socketTimeout;
@Nullable private final SerializableSupplier<SSLContext> sslContextSupplier;

@Nullable private final SerializableSupplier<HostnameVerifier> sslHostnameVerifier;
private static final DateTimeFormatter DATE_TIME_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
private static final DateTimeFormatter DATE_FORMATTER =
DateTimeFormatter.ofPattern("yyyy-MM-dd");
private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss");

public NetworkConfig(
List<HttpHost> hosts,
String username,
String password,
List<Header> headers,
SerializableSupplier<SSLContext> sslContextSupplier,
SerializableSupplier<HostnameVerifier> sslHostnameVerifier) {
@Nullable String connectionPathPrefix,
@Nullable Integer connectionRequestTimeout,
@Nullable Integer connectionTimeout,
@Nullable Integer socketTimeout,
@Nullable SerializableSupplier<SSLContext> sslContextSupplier,
@Nullable SerializableSupplier<HostnameVerifier> sslHostnameVerifier) {
checkState(!hosts.isEmpty(), "Hosts must not be empty");
this.hosts = hosts;
this.username = username;
this.password = password;
this.headers = headers;
this.connectionRequestTimeout = connectionRequestTimeout;
this.connectionTimeout = connectionTimeout;
this.socketTimeout = socketTimeout;
this.connectionPathPrefix = connectionPathPrefix;
this.sslContextSupplier = sslContextSupplier;
this.sslHostnameVerifier = sslHostnameVerifier;
}

public ElasticsearchAsyncClient createEsClient() {
// the JavaTimeModule is added to provide support for java 8 Time classes.
JavaTimeModule javaTimeModule = new JavaTimeModule();
javaTimeModule.addSerializer(
LocalDateTime.class, new LocalDateTimeSerializer(DATE_TIME_FORMATTER));
javaTimeModule.addSerializer(LocalDate.class, new LocalDateSerializer(DATE_FORMATTER));
javaTimeModule.addSerializer(LocalTime.class, new LocalTimeSerializer(TIME_FORMATTER));
ObjectMapper mapper = JsonMapper.builder().addModule(javaTimeModule).build();
return new ElasticsearchAsyncClient(
new RestClientTransport(this.getRestClient(), new JacksonJsonpMapper()));
new RestClientTransport(this.getRestClient(), new JacksonJsonpMapper(mapper)));
}

private RestClient getRestClient() {
Expand Down Expand Up @@ -105,6 +134,29 @@ private RestClient getRestClient() {
restClientBuilder.setDefaultHeaders(headers.toArray(new Header[0]));
}

if (connectionPathPrefix != null) {
restClientBuilder.setPathPrefix(connectionPathPrefix);
}

if (connectionRequestTimeout != null
|| connectionTimeout != null
|| socketTimeout != null) {
restClientBuilder.setRequestConfigCallback(
requestConfigBuilder -> {
if (connectionRequestTimeout != null) {
requestConfigBuilder.setConnectionRequestTimeout(
connectionRequestTimeout);
}
if (connectionTimeout != null) {
requestConfigBuilder.setConnectTimeout(connectionTimeout);
}
if (socketTimeout != null) {
requestConfigBuilder.setSocketTimeout(socketTimeout);
}
return requestConfigBuilder;
});
}

return restClientBuilder.build();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.apache.flink.connector.elasticsearch.table;

import org.apache.flink.annotation.Internal;

import java.time.format.DateTimeFormatter;

/** Abstract class for time related {@link IndexGenerator}. */
@Internal
abstract class AbstractTimeIndexGenerator extends IndexGeneratorBase {

private final String dateTimeFormat;
protected transient DateTimeFormatter dateTimeFormatter;

public AbstractTimeIndexGenerator(String index, String dateTimeFormat) {
super(index);
this.dateTimeFormat = dateTimeFormat;
}

@Override
public void open() {
this.dateTimeFormatter = DateTimeFormatter.ofPattern(dateTimeFormat);
}
}
Loading