Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dekaf: Track and report additional metrics, and improve connection stability/freshness #1655

Merged
merged 1 commit into from
Sep 30, 2024

Conversation

jshearer
Copy link
Contributor

@jshearer jshearer commented Sep 25, 2024

I observed some concerning behavior when switching dekaf.estuary.dev to advertise dekaf.estuary-data.com, namely all API requests to the upstream MSK broker were returning with error codes. A restart of Dekaf "fixed" the problem, and I ended up realizing that those connections were probably sitting open and idle for days.

The theory is that even though we set TCP keepalive, the connections had gotten into a bad state. So I introduced a couple different mitigations:

  • All idle connections (that is, connections that were opened but have since been returned to the pool) get closed after a short period of time (60s in this case)
  • All idle connections that were opened over 30m ago get closed. The stateless usage pattern of connections should mean that this results in no connection lasting longer than roughly 30m.
  • A connection is exercised every time it's returned to the pool to check whether it's still open and able to communicate with the broker.

New metrics:

  • pool_size (by broker): The number of connections open to this broker
  • pool_available (by broker): The number of open, free connections available for use
  • pool_waiting (by broker): The number of requests blocked waiting for a connection to become available
  • pool_connection_avg_age (by broker): the average age of a pool connection to this broker
  • pool_connection_avg_idle (by broker): the average time a connection to this broker has been idle
  • api_call_time (by api method): A histogram of the time it takes to serve a kafka protocol request

This change is Reviewable

…ability/freshness

I observed some concerning behavior when switching `dekaf.estuary.dev` to advertise `dekaf.estuary-data.com`, namely all API requests to the upstream MSK broker were returning with error codes. A restart of Dekaf "fixed" the problem, and I ended up realizing that those connections were probably sitting open and idle for days.

The theory is that even though we set TCP keepalive, the connections had gotten into a bad state. So I introduced a couple different mitigations:

* All idle connections (that is, connections that were opened but have since been returned to the pool) get closed after a short period of time (60s in this case)
* All idle connections that were opened over 30m ago get closed. The stateless usage pattern of connections should mean that this results in no connection lasting longer than roughly 30m.
* A connection is "exercised" every time it's returned to the pool to check whether it's still open and able to communicate with the broker.

New metrics:
* `pool_size` (by broker): The number of connections open to this broker
* `pool_available` (by broker): The number of open, free connections available for use
* `pool_waiting` (by broker): The number of requests blocked waiting for a connection to become available
* `pool_connection_avg_age` (by broker): the average age of a pool connection to this broker
* `pool_connection_avg_idle` (by broker): the average time a connection to this broker has been idle
* `api_call_time` (by api method): A histogram of the time it takes to serve a kafka protocol request
@jshearer jshearer requested review from jgraettinger and a team and removed request for jgraettinger September 25, 2024 17:58
Copy link
Member

@jgraettinger jgraettinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

let reap_interval = Duration::from_secs(30);
let max_age = Duration::from_secs(60 * 30);
let max_idle = Duration::from_secs(60);
let reaper = tokio_util::task::AbortOnDropHandle::new(tokio::spawn({
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

❤️ thanks for the explicit syntax

@jshearer jshearer merged commit 42e173f into master Sep 30, 2024
6 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants