Skip to content

Commit cdf3a2e

Browse files
authored
Merge branch 'main' into issue-17659
Signed-off-by: Andriy Redko <andriy.redko@aiven.io>
2 parents 254ba24 + b52e63f commit cdf3a2e

File tree

152 files changed

+9411
-65
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

152 files changed

+9411
-65
lines changed

CHANGELOG.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,12 +12,13 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1212
- Fix systemd integTest on deb regarding path ownership check ([#17641](https://github.com/opensearch-project/OpenSearch/pull/17641))
1313
- Add dfs transformation function in XContentMapValues ([#17612](https://github.com/opensearch-project/OpenSearch/pull/17612))
1414
- [Security Manager Replacement] Add support of Java policies ([#17663](https://github.com/opensearch-project/OpenSearch/pull/17663))
15+
- Added Kinesis support as a plugin for the pull-based ingestion ([#17615](https://github.com/opensearch-project/OpenSearch/pull/17615)
1516

1617
### Changed
1718
- Migrate BC libs to their FIPS counterparts ([#14912](https://github.com/opensearch-project/OpenSearch/pull/14912))
1819

1920
### Dependencies
20-
- Bump `com.nimbusds:nimbus-jose-jwt` from 9.41.1 to 10.0.2 ([#17607](https://github.com/opensearch-project/OpenSearch/pull/17607))
21+
- Bump `com.nimbusds:nimbus-jose-jwt` from 9.41.1 to 10.0.2 ([#17607](https://github.com/opensearch-project/OpenSearch/pull/17607), [#17669](https://github.com/opensearch-project/OpenSearch/pull/17669))
2122
- Bump `com.google.api:api-common` from 1.8.1 to 2.46.1 ([#17604](https://github.com/opensearch-project/OpenSearch/pull/17604))
2223
- Bump `ch.qos.logback:logback-core` from 1.5.16 to 1.5.17 ([#17609](https://github.com/opensearch-project/OpenSearch/pull/17609))
2324
- Bump `org.jruby.joni:joni` from 2.2.3 to 2.2.5 ([#17608](https://github.com/opensearch-project/OpenSearch/pull/17608))

plugins/ingestion-kafka/src/main/java/org/opensearch/plugin/kafka/KafkaPartitionConsumer.java

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -119,22 +119,34 @@ protected static Consumer<byte[], byte[]> createConsumer(String clientId, KafkaS
119119
}
120120

121121
@Override
122-
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(KafkaOffset offset, long maxMessages, int timeoutMillis)
123-
throws TimeoutException {
122+
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(
123+
KafkaOffset offset,
124+
boolean includeStart,
125+
long maxMessages,
126+
int timeoutMillis
127+
) throws TimeoutException {
124128
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
125-
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(offset.getOffset(), maxMessages, timeoutMillis)
129+
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
130+
offset.getOffset(),
131+
includeStart,
132+
maxMessages,
133+
timeoutMillis
134+
)
126135
);
127136
return records;
128137
}
129138

130139
@Override
131-
public KafkaOffset nextPointer() {
132-
return new KafkaOffset(lastFetchedOffset + 1);
133-
}
134-
135-
@Override
136-
public KafkaOffset nextPointer(KafkaOffset pointer) {
137-
return new KafkaOffset(pointer.getOffset() + 1);
140+
public List<ReadResult<KafkaOffset, KafkaMessage>> readNext(long maxMessages, int timeoutMillis) throws TimeoutException {
141+
List<ReadResult<KafkaOffset, KafkaMessage>> records = AccessController.doPrivileged(
142+
(PrivilegedAction<List<ReadResult<KafkaOffset, KafkaMessage>>>) () -> fetch(
143+
lastFetchedOffset,
144+
false,
145+
maxMessages,
146+
timeoutMillis
147+
)
148+
);
149+
return records;
138150
}
139151

140152
@Override
@@ -191,18 +203,28 @@ public IngestionShardPointer pointerFromOffset(String offset) {
191203
return new KafkaOffset(offsetValue);
192204
}
193205

194-
private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(long startOffset, long maxMessages, int timeoutMillis) {
195-
if (lastFetchedOffset < 0 || lastFetchedOffset != startOffset - 1) {
196-
logger.info("Seeking to offset {}", startOffset);
197-
consumer.seek(topicPartition, startOffset);
206+
private synchronized List<ReadResult<KafkaOffset, KafkaMessage>> fetch(
207+
long startOffset,
208+
boolean includeStart,
209+
long maxMessages,
210+
int timeoutMillis
211+
) {
212+
long kafkaStartOffset = startOffset;
213+
if (!includeStart) {
214+
kafkaStartOffset += 1;
215+
}
216+
217+
if (lastFetchedOffset < 0 || lastFetchedOffset != kafkaStartOffset - 1) {
218+
logger.info("Seeking to offset {}", kafkaStartOffset);
219+
consumer.seek(topicPartition, kafkaStartOffset);
198220
// update the last fetched offset so that we don't need to seek again if no more messages to fetch
199-
lastFetchedOffset = startOffset - 1;
221+
lastFetchedOffset = kafkaStartOffset - 1;
200222
}
201223

202224
ConsumerRecords<byte[], byte[]> consumerRecords = consumer.poll(Duration.ofMillis(timeoutMillis));
203225
List<ConsumerRecord<byte[], byte[]>> messageAndOffsets = consumerRecords.records(topicPartition);
204226

205-
long endOffset = startOffset + maxMessages;
227+
long endOffset = kafkaStartOffset + maxMessages;
206228
List<ReadResult<KafkaOffset, KafkaMessage>> results = new ArrayList<>();
207229

208230
for (ConsumerRecord<byte[], byte[]> messageAndOffset : messageAndOffsets) {

plugins/ingestion-kafka/src/test/java/org/opensearch/plugin/kafka/KafkaPartitionConsumerTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,11 +61,10 @@ public void testReadNext() throws Exception {
6161

6262
when(mockConsumer.poll(any(Duration.class))).thenReturn(records);
6363

64-
List<IngestionShardConsumer.ReadResult<KafkaOffset, KafkaMessage>> result = consumer.readNext(new KafkaOffset(0), 10, 1000);
64+
List<IngestionShardConsumer.ReadResult<KafkaOffset, KafkaMessage>> result = consumer.readNext(new KafkaOffset(0), true, 10, 1000);
6565

6666
assertEquals(1, result.size());
6767
assertEquals("message", new String(result.get(0).getMessage().getPayload(), StandardCharsets.UTF_8));
68-
assertEquals(1, consumer.nextPointer().getOffset());
6968
assertEquals(0, consumer.getShardId());
7069
assertEquals("client1", consumer.getClientId());
7170
}
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
/*
2+
* SPDX-License-Identifier: Apache-2.0
3+
*
4+
* The OpenSearch Contributors require contributions made to
5+
* this file be licensed under the Apache-2.0 license or a
6+
* compatible open source license.
7+
*
8+
* Modifications Copyright OpenSearch Contributors. See
9+
* GitHub history for details.
10+
*/
11+
12+
apply plugin: 'opensearch.internal-cluster-test'
13+
14+
opensearchplugin {
15+
description = 'Pull-based ingestion plugin to consume from Kinesis'
16+
classname = 'org.opensearch.plugin.kinesis.KinesisPlugin'
17+
}
18+
19+
versions << [
20+
'docker': '3.3.6',
21+
'testcontainers': '1.19.7',
22+
'ducttape': '1.0.8',
23+
'snappy': '1.1.10.7',
24+
]
25+
26+
dependencies {
27+
// aws sdk
28+
api "software.amazon.awssdk:sdk-core:${versions.aws}"
29+
api "software.amazon.awssdk:annotations:${versions.aws}"
30+
api "software.amazon.awssdk:aws-core:${versions.aws}"
31+
api "software.amazon.awssdk:auth:${versions.aws}"
32+
api "software.amazon.awssdk:identity-spi:${versions.aws}"
33+
api "software.amazon.awssdk:checksums:${versions.aws}"
34+
api "software.amazon.awssdk:checksums-spi:${versions.aws}"
35+
api "software.amazon.awssdk.crt:aws-crt:${versions.awscrt}"
36+
api "software.amazon.awssdk:http-auth:${versions.aws}"
37+
api "software.amazon.awssdk:http-auth-aws:${versions.aws}"
38+
api "software.amazon.awssdk:http-auth-spi:${versions.aws}"
39+
api "software.amazon.awssdk:retries:${versions.aws}"
40+
api "software.amazon.awssdk:retries-spi:${versions.aws}"
41+
api "software.amazon.awssdk:endpoints-spi:${versions.aws}"
42+
api "software.amazon.awssdk:http-client-spi:${versions.aws}"
43+
api "software.amazon.awssdk:apache-client:${versions.aws}"
44+
api "software.amazon.awssdk:metrics-spi:${versions.aws}"
45+
api "software.amazon.awssdk:profiles:${versions.aws}"
46+
api "software.amazon.awssdk:regions:${versions.aws}"
47+
api "software.amazon.awssdk:utils:${versions.aws}"
48+
api "software.amazon.awssdk:aws-json-protocol:${versions.aws}"
49+
api "software.amazon.awssdk:protocol-core:${versions.aws}"
50+
api "software.amazon.awssdk:json-utils:${versions.aws}"
51+
api "software.amazon.awssdk:third-party-jackson-core:${versions.aws}"
52+
api "software.amazon.awssdk:aws-xml-protocol:${versions.aws}"
53+
api "software.amazon.awssdk:aws-json-protocol:${versions.aws}"
54+
api "software.amazon.awssdk:aws-query-protocol:${versions.aws}"
55+
api "software.amazon.awssdk:sts:${versions.aws}"
56+
api "software.amazon.awssdk:netty-nio-client:${versions.aws}"
57+
api "software.amazon.awssdk:kinesis:${versions.aws}"
58+
api "software.amazon.awssdk:aws-cbor-protocol:${versions.aws}"
59+
api "software.amazon.awssdk:third-party-jackson-dataformat-cbor:${versions.aws}"
60+
61+
api "org.apache.httpcomponents:httpclient:${versions.httpclient}"
62+
api "org.apache.httpcomponents:httpcore:${versions.httpcore}"
63+
api "commons-logging:commons-logging:${versions.commonslogging}"
64+
api "commons-codec:commons-codec:${versions.commonscodec}"
65+
api "com.fasterxml.jackson.core:jackson-core:${versions.jackson}"
66+
api "com.fasterxml.jackson.core:jackson-databind:${versions.jackson_databind}"
67+
api "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
68+
api "com.fasterxml.jackson.dataformat:jackson-dataformat-cbor:${versions.jackson}"
69+
api "joda-time:joda-time:${versions.joda}"
70+
api "org.slf4j:slf4j-api:${versions.slf4j}"
71+
72+
// network stack
73+
api "io.netty:netty-buffer:${versions.netty}"
74+
api "io.netty:netty-codec:${versions.netty}"
75+
api "io.netty:netty-codec-http:${versions.netty}"
76+
api "io.netty:netty-codec-http2:${versions.netty}"
77+
api "io.netty:netty-common:${versions.netty}"
78+
api "io.netty:netty-handler:${versions.netty}"
79+
api "io.netty:netty-resolver:${versions.netty}"
80+
api "io.netty:netty-transport:${versions.netty}"
81+
api "io.netty:netty-transport-native-unix-common:${versions.netty}"
82+
api "io.netty:netty-transport-classes-epoll:${versions.netty}"
83+
84+
85+
// test
86+
testImplementation "com.github.docker-java:docker-java-api:${versions.docker}"
87+
testImplementation "com.github.docker-java:docker-java-transport:${versions.docker}"
88+
testImplementation "com.github.docker-java:docker-java-transport-zerodep:${versions.docker}"
89+
testImplementation "com.fasterxml.jackson.core:jackson-annotations:${versions.jackson}"
90+
testImplementation "org.testcontainers:testcontainers:${versions.testcontainers}"
91+
testImplementation "org.testcontainers:localstack:${versions.testcontainers}"
92+
testImplementation "org.rnorth.duct-tape:duct-tape:${versions.ducttape}"
93+
testImplementation "org.apache.commons:commons-compress:${versions.commonscompress}"
94+
testImplementation "commons-io:commons-io:${versions.commonsio}"
95+
testImplementation 'org.awaitility:awaitility:4.2.0'
96+
}
97+
98+
internalClusterTest{
99+
environment 'TESTCONTAINERS_RYUK_DISABLED', 'true'
100+
// TODO: Adding permission in plugin-security.policy doesn't seem to work.
101+
systemProperty 'tests.security.manager', 'false'
102+
}
103+
104+
tasks.named("dependencyLicenses").configure {
105+
mapping from: /jackson-.*/, to: 'jackson'
106+
mapping from: /netty-.*/, to: 'netty'
107+
mapping from: /log4j-.*/, to: 'log4j'
108+
}
109+
110+
thirdPartyAudit {
111+
ignoreMissingClasses(
112+
'com.aayushatharva.brotli4j.Brotli4jLoader',
113+
'com.aayushatharva.brotli4j.decoder.DecoderJNI$Status',
114+
'com.aayushatharva.brotli4j.decoder.DecoderJNI$Wrapper',
115+
'com.aayushatharva.brotli4j.encoder.BrotliEncoderChannel',
116+
'com.aayushatharva.brotli4j.encoder.Encoder$Mode',
117+
'com.aayushatharva.brotli4j.encoder.Encoder$Parameters',
118+
119+
'com.google.protobuf.nano.CodedOutputByteBufferNano',
120+
'com.google.protobuf.nano.MessageNano',
121+
122+
'org.apache.avalon.framework.logger.Logger',
123+
'org.apache.log.Hierarchy',
124+
'org.apache.log.Logger',
125+
'org.apache.log4j.Level',
126+
'org.apache.log4j.Logger',
127+
'org.apache.log4j.Priority',
128+
129+
'org.slf4j.impl.StaticLoggerBinder',
130+
'org.slf4j.impl.StaticMDCBinder',
131+
'org.slf4j.impl.StaticMarkerBinder',
132+
133+
'org.graalvm.nativeimage.hosted.Feature',
134+
'org.graalvm.nativeimage.hosted.Feature$AfterImageWriteAccess',
135+
136+
'com.ning.compress.BufferRecycler',
137+
'com.ning.compress.lzf.ChunkDecoder',
138+
'com.ning.compress.lzf.ChunkEncoder',
139+
'com.ning.compress.lzf.LZFChunk',
140+
'com.ning.compress.lzf.LZFEncoder',
141+
'com.ning.compress.lzf.util.ChunkDecoderFactory',
142+
'com.ning.compress.lzf.util.ChunkEncoderFactory',
143+
144+
'javax.servlet.ServletContextEvent',
145+
'javax.servlet.ServletContextListener',
146+
147+
'io.netty.internal.tcnative.Buffer',
148+
'io.netty.internal.tcnative.CertificateCompressionAlgo',
149+
'io.netty.internal.tcnative.Library',
150+
'io.netty.internal.tcnative.SSLContext',
151+
'io.netty.internal.tcnative.SSLPrivateKeyMethod',
152+
153+
'io.netty.internal.tcnative.AsyncSSLPrivateKeyMethod',
154+
'io.netty.internal.tcnative.AsyncTask',
155+
'io.netty.internal.tcnative.CertificateCallback',
156+
'io.netty.internal.tcnative.CertificateVerifier',
157+
'io.netty.internal.tcnative.ResultCallback',
158+
'io.netty.internal.tcnative.SessionTicketKey',
159+
'io.netty.internal.tcnative.SniHostNameMatcher',
160+
'io.netty.internal.tcnative.SSL',
161+
'io.netty.internal.tcnative.SSLSession',
162+
'io.netty.internal.tcnative.SSLSessionCache',
163+
164+
'lzma.sdk.lzma.Encoder',
165+
'net.jpountz.lz4.LZ4Compressor',
166+
'net.jpountz.lz4.LZ4Factory',
167+
'net.jpountz.lz4.LZ4FastDecompressor',
168+
'net.jpountz.xxhash.XXHash32',
169+
'net.jpountz.xxhash.XXHashFactory',
170+
171+
// from io.netty.handler.ssl.util.BouncyCastleSelfSignedCertGenerator (netty)
172+
'org.bouncycastle.cert.X509v3CertificateBuilder',
173+
'org.bouncycastle.cert.jcajce.JcaX509CertificateConverter',
174+
'org.bouncycastle.operator.jcajce.JcaContentSignerBuilder',
175+
'org.bouncycastle.openssl.PEMEncryptedKeyPair',
176+
'org.bouncycastle.openssl.PEMParser',
177+
'org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter',
178+
'org.bouncycastle.openssl.jcajce.JceOpenSSLPKCS8DecryptorProviderBuilder',
179+
'org.bouncycastle.openssl.jcajce.JcePEMDecryptorProviderBuilder',
180+
'org.bouncycastle.pkcs.PKCS8EncryptedPrivateKeyInfo',
181+
182+
'org.conscrypt.AllocatedBuffer',
183+
'org.conscrypt.BufferAllocator',
184+
'org.conscrypt.Conscrypt',
185+
'org.conscrypt.HandshakeListener',
186+
187+
'org.eclipse.jetty.alpn.ALPN$ClientProvider',
188+
'org.eclipse.jetty.alpn.ALPN$ServerProvider',
189+
'org.eclipse.jetty.alpn.ALPN',
190+
191+
// from io.netty.handler.ssl.JettyNpnSslEngine (netty)
192+
'org.eclipse.jetty.npn.NextProtoNego$ClientProvider',
193+
'org.eclipse.jetty.npn.NextProtoNego$ServerProvider',
194+
'org.eclipse.jetty.npn.NextProtoNego',
195+
196+
// from io.netty.handler.codec.marshalling.ChannelBufferByteInput (netty)
197+
'org.jboss.marshalling.ByteInput',
198+
199+
// from io.netty.handler.codec.marshalling.ChannelBufferByteOutput (netty)
200+
'org.jboss.marshalling.ByteOutput',
201+
202+
// from io.netty.handler.codec.marshalling.CompatibleMarshallingEncoder (netty)
203+
'org.jboss.marshalling.Marshaller',
204+
205+
// from io.netty.handler.codec.marshalling.ContextBoundUnmarshallerProvider (netty)
206+
'org.jboss.marshalling.MarshallerFactory',
207+
'org.jboss.marshalling.MarshallingConfiguration',
208+
'org.jboss.marshalling.Unmarshaller',
209+
210+
'reactor.blockhound.BlockHound$Builder',
211+
'reactor.blockhound.integration.BlockHoundIntegration',
212+
213+
'software.amazon.eventstream.HeaderValue',
214+
'software.amazon.eventstream.Message',
215+
'software.amazon.eventstream.MessageDecoder'
216+
)
217+
218+
ignoreViolations (
219+
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator',
220+
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$1',
221+
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$2',
222+
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$3',
223+
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$4',
224+
'io.netty.handler.ssl.util.OpenJdkSelfSignedCertGenerator$5',
225+
226+
'io.netty.util.internal.PlatformDependent0',
227+
'io.netty.util.internal.PlatformDependent0$1',
228+
'io.netty.util.internal.PlatformDependent0$2',
229+
'io.netty.util.internal.PlatformDependent0$3',
230+
'io.netty.util.internal.PlatformDependent0$4',
231+
'io.netty.util.internal.PlatformDependent0$6',
232+
233+
'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueConsumerNodeRef',
234+
'io.netty.util.internal.shaded.org.jctools.queues.BaseLinkedQueueProducerNodeRef',
235+
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueColdProducerFields',
236+
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueConsumerFields',
237+
'io.netty.util.internal.shaded.org.jctools.queues.BaseMpscLinkedArrayQueueProducerFields',
238+
'io.netty.util.internal.shaded.org.jctools.queues.LinkedQueueNode',
239+
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueConsumerIndexField',
240+
'io.netty.util.internal.shaded.org.jctools.queues.MpmcArrayQueueProducerIndexField',
241+
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueConsumerIndexField',
242+
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerIndexField',
243+
'io.netty.util.internal.shaded.org.jctools.queues.MpscArrayQueueProducerLimitField',
244+
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueConsumerIndexField',
245+
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerIndexField',
246+
'io.netty.util.internal.shaded.org.jctools.queues.unpadded.MpscUnpaddedArrayQueueProducerLimitField',
247+
'io.netty.util.internal.shaded.org.jctools.util.UnsafeAccess',
248+
'io.netty.util.internal.shaded.org.jctools.util.UnsafeRefArrayAccess',
249+
'io.netty.util.internal.shaded.org.jctools.util.UnsafeLongArrayAccess',
250+
)
251+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
c5acc1da9567290302d80ffa1633785afa4ce630

0 commit comments

Comments
 (0)