Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KCL Worker is not responding to maxRecords value #285

Open
muhufuk opened this issue Jan 22, 2018 · 12 comments
Open

KCL Worker is not responding to maxRecords value #285

muhufuk opened this issue Jan 22, 2018 · 12 comments

Comments

@muhufuk
Copy link

muhufuk commented Jan 22, 2018

While starting worker I have set maxRecords to 10 in KinesisClientLibConfiguration but still I am getting large (~15 ~13 etc..) records in one call of ProcessRecords.

public KinesisClientLibConfiguration kinesisClientLibConfiguration( final ClientConfiguration clientConfiguration) { return new KinesisClientLibConfiguration( applicationName + dataStreamName, dataStreamName, new DefaultAWSCredentialsProviderChain(), "Worker-" + UUID.randomUUID().toString()) .withRegionName(region) .withInitialPositionInStream(InitialPositionInStream.TRIM_HORIZON) .withCommonClientConfig(clientConfiguration) .withInitialLeaseTableReadCapacity(leaseTableReadCapacity) .withInitialLeaseTableWriteCapacity(leaseTableWriteCapacity) .withFailoverTimeMillis(failOverTime) .withMaxRecords(10);

@cgpassante
Copy link

cgpassante commented Mar 2, 2018

I am seeing the same behavior...maxRecords being ignored completely. In fact the number of records being returned is often larger than the default of 10,000. Could really use a workaround.

@xujiaxj
Copy link

xujiaxj commented Mar 12, 2018

Same here. This is an annoying issue when we want to slow down the processing when our downstream processing slows.

@shenavaa
Copy link

Based on [1] MaxRecords is the maximum number of Kinesis records per request. If KPL is used on the producer side, there will be more user records in each KinesisRecord so the RecordsProcessor will receive more records per batch.

[1] https://github.com/awslabs/amazon-kinesis-client/blob/master/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java

@muhufuk
Copy link
Author

muhufuk commented Mar 14, 2018

@shenavaa Yes we also suspected from it but in some cases we have got 3k+ record even though max record is 1500.
Beside as you mentioned we have used kpl but aggretion is false.

@shenavaa
Copy link

@muhufuk Can you also share KPL configuration properties and number of shards in the stream for reproduction ?

@muhufuk
Copy link
Author

muhufuk commented Mar 15, 2018

@shenavaa we have just changed region and aggregation flag. rest is set as default.
we have 60 shards.

@kakaday22
Copy link

Hello, I am having the same issue! I am running version 1.9.0 and I have a similar setup for my KinesisClientLibConfiguration. However when setting the maxrecord it is not limited. Worth mentioning I have a custom Record Processor class.

@WineYe
Copy link

WineYe commented Jul 1, 2019

I am using KCL 1.9.1 and have the same issue. I set MaxRecords around 300-500, but it keeps getting records around 1000 in one time.

@cebbott
Copy link

cebbott commented Aug 31, 2022

So how do you limit the KPL so we can get a consistent number of records?

@fommil
Copy link

fommil commented Apr 23, 2024

My understanding of reading the kcl source code is that maxRecords is only used by the polling fetcher, not the fanout one. I don't see a way to limit the max records when using fanout... which is unfortunate because it can lead to extremely large per-shard caches, c.f. awslabs/aws-eventstream-java#4

@wellingtonmacena
Copy link

wellingtonmacena commented Jun 19, 2024

I'm using KVL V2, I used this and it worked

final var polling =
      new PollingConfig(getKinesisStreamName(), kinesisClient)
        .maxRecords(maximumRecordsPerCall)
        .idleTimeBetweenReadsInMillis(getKinesisTimeIntervalBetweenCalls());

    var retrievalConfig = configsBuilder.retrievalConfig().retrievalSpecificConfig(polling);

 this.scheduler =
      new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configsBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        retrievalConfig);

@jhmartin
Copy link

I'm using KVL V2, I used this and it worked @wellingtonmacena
PollingConfig isn't EFO mode though, and as @fommil pointed out maxRecords seems to be polling-only.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

10 participants