Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

add produce/fetch timeout #490

Merged

Conversation

dockerzhang
Copy link
Contributor

KoP uses requestTimeoutMs config to prevent asynchronous responseFuture timeout, but FetchRequest and ProduceRequest have it's own timeout parameters which are brought in from the client, we should replace the default request timeout for Produce and Fetch.

@dockerzhang dockerzhang force-pushed the add-produce-fetch-timeout branch 2 times, most recently from bf2dfc3 to 0d364ed Compare May 11, 2021 07:43
@BewareMyPower
Copy link
Collaborator

I think it's better to use DelayOperation for produce/fetch timeout for precise timeout control, like https://github.com/apache/kafka/blob/0fc30b5e4320c1cac0024ac2095b8c26aa88a815/core/src/main/scala/kafka/server/ReplicaManager.scala#L658

        val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)

        // create a list of (topic, partition) pairs to use as keys for this delayed produce operation
        val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq

        // try to complete the request immediately, otherwise put it into the purgatory
        // this is because while the delayed produce operation is being created, new
        // requests may arrive and hence make this operation completable.
        delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)

Currently the request timeout needs a completed response to trigger it after #465. Assuming a producer was sending messages continuously and the channel would only receive PRODUCE request, only if one PRODUCE request completed would the expiration check happen.

The change of #465 is based on an assumption that a PRODUCE request could complete quickly and trigger the expiration check of previous requests like METADATA request.

@dockerzhang dockerzhang force-pushed the add-produce-fetch-timeout branch 3 times, most recently from 49a064b to 9c88929 Compare May 13, 2021 12:01
@BewareMyPower
Copy link
Collaborator

I approved this PR first.

Maybe in the future we can find a way to add tests for produce/fetch timeout. At this moment, I didn't find the timeout tests in Kafka for reference.

@BewareMyPower BewareMyPower merged commit 06e6530 into streamnative:master May 14, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants