Skip to content

Commit

Permalink
Update RetrievalFactory implementations to utilize the StreamIdentifi…
Browse files Browse the repository at this point in the history
…er field of StreamConfig
  • Loading branch information
furq-aws committed Mar 28, 2024
1 parent 44beda5 commit 5a26958
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,22 @@
package software.amazon.kinesis.retrieval;

import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;

/**
*
*/
public interface RetrievalFactory {
@Deprecated
GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(ShardInfo shardInfo, MetricsFactory metricsFactory);

default GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(
ShardInfo shardInfo, StreamIdentifier streamIdentifier, MetricsFactory metricsFactory) {
return createGetRecordsRetrievalStrategy(shardInfo, metricsFactory);
}

@Deprecated
RecordsPublisher createGetRecordsCache(ShardInfo shardInfo, MetricsFactory metricsFactory);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import software.amazon.kinesis.retrieval.RecordsPublisher;
import software.amazon.kinesis.retrieval.RetrievalFactory;

import javax.annotation.Nullable;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand All @@ -51,18 +52,16 @@ public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(final Shard

@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
final StreamConfig streamConfig,
final MetricsFactory metricsFactory) {
@NonNull final StreamConfig streamConfig,
@Nullable final MetricsFactory metricsFactory) {
final Optional<String> streamIdentifierStr = shardInfo.streamIdentifierSerOpt();
if (streamIdentifierStr.isPresent()) {
final StreamIdentifier streamIdentifier = StreamIdentifier.multiStreamInstance(streamIdentifierStr.get());
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
getOrCreateConsumerArn(streamIdentifier, streamConfig.consumerArn()),
getOrCreateConsumerArn(streamConfig.streamIdentifier(), streamConfig.consumerArn()),
streamIdentifierStr.get());
} else {
final StreamIdentifier streamIdentifier = StreamIdentifier.singleStreamInstance(defaultStreamName);
return new FanOutRecordsPublisher(kinesisClient, shardInfo.shardId(),
getOrCreateConsumerArn(streamIdentifier, defaultConsumerArn));
getOrCreateConsumerArn(streamConfig.streamIdentifier(), defaultConsumerArn));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
Expand Down Expand Up @@ -92,13 +93,21 @@ private static Function<DataFetcherProviderConfig, DataFetcher> defaultDataFetch
return dataFetcherProviderConfig -> new KinesisDataFetcher(kinesisClient, dataFetcherProviderConfig);
}

@Deprecated
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
StreamIdentifier.singleStreamInstance(streamName);

return createGetRecordsRetrievalStrategy(shardInfo, streamIdentifier, metricsFactory);
}

@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final StreamIdentifier streamIdentifier,
@NonNull final MetricsFactory metricsFactory) {
final DataFetcherProviderConfig kinesisDataFetcherProviderConfig = new KinesisDataFetcherProviderConfig(
streamIdentifier,
shardInfo.shardId(),
Expand All @@ -111,10 +120,31 @@ public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull fi
return new SynchronousGetRecordsRetrievalStrategy(dataFetcher);
}

@Deprecated
@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
return recordsFetcherFactory.createRecordsFetcher(createGetRecordsRetrievalStrategy(shardInfo, metricsFactory),
shardInfo.shardId(), metricsFactory, maxRecords);
return createGetRecordsCache(shardInfo,
createGetRecordsRetrievalStrategy(shardInfo, metricsFactory),
metricsFactory);
}

@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
@NonNull final StreamConfig streamConfig,
@NonNull final MetricsFactory metricsFactory) {
return createGetRecordsCache(shardInfo,
createGetRecordsRetrievalStrategy(shardInfo, streamConfig.streamIdentifier(), metricsFactory),
metricsFactory);
}

private RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
@NonNull final MetricsFactory metricsFactory) {
return recordsFetcherFactory.createRecordsFetcher(
getRecordsRetrievalStrategy,
shardInfo.shardId(),
metricsFactory,
maxRecords);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import lombok.NonNull;
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.annotations.KinesisClientInternalApi;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.metrics.MetricsFactory;
Expand All @@ -34,6 +35,8 @@
*/
@KinesisClientInternalApi
public class SynchronousPrefetchingRetrievalFactory implements RetrievalFactory {
private static final String PREFETCHING_OPERATION_NAME = "Prefetching";

@NonNull
private final String streamName;
@NonNull
Expand Down Expand Up @@ -66,12 +69,21 @@ public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncCli
this.maxFutureWait = maxFutureWait;
}

@Override public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@Deprecated
@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
final StreamIdentifier streamIdentifier = shardInfo.streamIdentifierSerOpt().isPresent() ?
StreamIdentifier.multiStreamInstance(shardInfo.streamIdentifierSerOpt().get()) :
StreamIdentifier.singleStreamInstance(streamName);

return createGetRecordsRetrievalStrategy(shardInfo, streamIdentifier, metricsFactory);
}

@Override
public GetRecordsRetrievalStrategy createGetRecordsRetrievalStrategy(@NonNull final ShardInfo shardInfo,
@NonNull final StreamIdentifier streamIdentifier,
@NonNull final MetricsFactory metricsFactory) {
return new SynchronousGetRecordsRetrievalStrategy(
new KinesisDataFetcher(kinesisClient, new KinesisDataFetcherProviderConfig(
streamIdentifier,
Expand All @@ -82,12 +94,36 @@ public SynchronousPrefetchingRetrievalFactory(String streamName, KinesisAsyncCli
)));
}

@Deprecated
@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
@NonNull final MetricsFactory metricsFactory) {
return createGetRecordsCache(shardInfo,
createGetRecordsRetrievalStrategy(shardInfo, metricsFactory),
metricsFactory);
}

@Override
public RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
@NonNull final StreamConfig streamConfig,
@NonNull final MetricsFactory metricsFactory) {
return createGetRecordsCache(shardInfo,
createGetRecordsRetrievalStrategy(shardInfo, streamConfig.streamIdentifier(), metricsFactory),
metricsFactory);
}

private RecordsPublisher createGetRecordsCache(@NonNull final ShardInfo shardInfo,
@NonNull final GetRecordsRetrievalStrategy getRecordsRetrievalStrategy,
@NonNull final MetricsFactory metricsFactory) {
return new PrefetchRecordsPublisher(recordsFetcherFactory.maxPendingProcessRecordsInput(),
recordsFetcherFactory.maxByteSize(), recordsFetcherFactory.maxRecordsCount(), maxRecords,
createGetRecordsRetrievalStrategy(shardInfo, metricsFactory), executorService, idleMillisBetweenCalls,
metricsFactory, "Prefetching", shardInfo.shardId());
recordsFetcherFactory.maxByteSize(),
recordsFetcherFactory.maxRecordsCount(),
maxRecords,
getRecordsRetrievalStrategy,
executorService,
idleMillisBetweenCalls,
metricsFactory,
PREFETCHING_OPERATION_NAME,
shardInfo.shardId());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
import software.amazon.kinesis.common.StreamConfig;
import software.amazon.kinesis.common.StreamIdentifier;
import software.amazon.kinesis.leases.ShardInfo;
import software.amazon.kinesis.leases.exceptions.DependencyException;
import software.amazon.kinesis.metrics.MetricsFactory;
Expand All @@ -58,6 +59,8 @@ public class FanOutConfigTest {
private KinesisAsyncClient kinesisClient;
@Mock
private StreamConfig streamConfig;
@Mock
private StreamIdentifier streamIdentifier;

private FanOutConfig config;

Expand All @@ -69,6 +72,8 @@ public void setup() {
.streamName(TEST_STREAM_NAME);
doReturn(consumerRegistration).when(config)
.createConsumerRegistration(eq(kinesisClient), anyString(), anyString());
when(streamConfig.streamIdentifier()).thenReturn(streamIdentifier);
when(streamIdentifier.streamName()).thenReturn(TEST_STREAM_NAME);
}

@Test
Expand Down

0 comments on commit 5a26958

Please sign in to comment.