Skip to content

Conversation

@owenowenisme
Copy link
Member

@owenowenisme owenowenisme commented Nov 13, 2025

Description

Added Kafka as data source, we can now use read_kafka for bounded data source

Related issues

Closes #58653

Additional information

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@owenowenisme owenowenisme added the go add ONLY when ready to merge, run all tests label Nov 14, 2025
owenowenisme and others added 11 commits November 14, 2025 21:52
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Comment on lines 4431 to 4432
start_offset: Optional[Union[int, str]] = "earliest",
end_offset: Optional[Union[int, str]] = "latest",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are earlier and latest the only valid string values? Will we need to add more values later?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed from str into Literal["earliest"] & Literal["latest"]

trigger: Literal["once"] = "once",
start_offset: Optional[Union[int, str]] = "earliest",
end_offset: Optional[Union[int, str]] = "latest",
authentication: Optional[Dict[str, Any]] = None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if this is None? If it's implicitly required, I think it'd be clearer if this was a required argument.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, in the interest of being explicit over implicit, I think we should use a typed dataclass rather than a dict (what are the valid fields and values? You'd only know by looking at the docstring)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made authentication a dataclass

What happens if this is None? If it's implicitly required, I think it'd be clearer if this was a required argument.

This is not required.

By default, the number of output blocks is dynamically decided based on
input data size and available resources. You shouldn't manually set this
value in most cases.
timeout_ms: Timeout in milliseconds to poll to until reaching end_offset (default 10000ms/10s).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we don't expose this?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think user should set their timeout?


# Create metadata for this task
metadata = BlockMetadata(
num_rows=max_records_per_task,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if we don't set this? This could be inaccurate, right?

Copy link
Member Author

@owenowenisme owenowenisme Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I removed max_records_per_task since we current run a task per partition.
Also set the num_rows to None.

in the remote task without requiring 'self' to be serialized.
"""

def kafka_read_fn() -> Iterable[Block]:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The implementation of this function is pretty long. Wonder if there's a way we can make it shorter (and easier to read) by abstracting some of the logic as functions.

If you feel like it'd hurt readability though, okay to keep as-is.

--hash=sha256:31f23644fe2602f88ff55e1f5c79ba497e01224ee7737937930c448e4d0e24dc \
--hash=sha256:a8a6399716257f45be6a007360200409fce5cda2661e3dec71d23dc15f6189ab
# via uvicorn
python-dotenv==1.2.1 \
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For my own understanding, what's the deal with all the lock files? Is this what REEf recommended?

Copy link
Member Author

@owenowenisme owenowenisme Nov 16, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked REEf team how do I resolve deps error and they want me to run this command
bazel run //ci/raydepsets:raydepsets -- build --all-configs

Pasting the message

I added some test deps and ran bash ci/ci.sh compile_pip_dependencies to update requirement_compiled.txt
And still getting this error, do you have any idea how we should resolve this?
https://buildkite.com/ray-project/premerge/builds/53872/steps/canvas?sid=019a84cf-060a-43b7-b6ef-39fe56484670

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what REEf recommended?

yes.

you can split the python-docenv version upgrade PR into a separate one.

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
owenowenisme and others added 13 commits November 16, 2025 14:06
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Copy link
Contributor

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new Kafka datasource for Ray Data, allowing bounded reads from Kafka topics. The implementation is well-structured, using kafka-python and providing a clear API via ray.data.read_kafka. The addition of KafkaAuthConfig makes authentication flexible. The integration tests are comprehensive, covering various scenarios including different offset configurations, multiple partitions/topics, and timeout behavior, which is excellent. I've found a few areas for improvement, mainly around documentation clarity and a couple of small code refinements. Overall, this is a great contribution.

owenowenisme and others added 4 commits November 16, 2025 22:23
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
trigger: Literal["once"] = "once",
start_offset: Union[int, Literal["earliest"]] = "earliest",
end_offset: Union[int, Literal["latest"]] = "latest",
kafka_auth_config: KafkaAuthConfig = KafkaAuthConfig(),
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bug: Mutable Defaults Break Configuration Isolation

Mutable default argument KafkaAuthConfig() creates a shared instance across all calls to read_kafka. Since KafkaAuthConfig is a non-frozen dataclass, any mutation to this shared instance would affect all subsequent function calls that don't provide kafka_auth_config. The default should be None instead, with instantiation inside the function if needed.

Fix in Cursor Fix in Web

Copy link
Collaborator

@aslonnie aslonnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fwiw, @elliot-barn is giving a talk about depset next tue.

--hash=sha256:31f23644fe2602f88ff55e1f5c79ba497e01224ee7737937930c448e4d0e24dc \
--hash=sha256:a8a6399716257f45be6a007360200409fce5cda2661e3dec71d23dc15f6189ab
# via uvicorn
python-dotenv==1.2.1 \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this what REEf recommended?

yes.

you can split the python-docenv version upgrade PR into a separate one.

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
--hash=sha256:31f23644fe2602f88ff55e1f5c79ba497e01224ee7737937930c448e4d0e24dc \
--hash=sha256:a8a6399716257f45be6a007360200409fce5cda2661e3dec71d23dc15f6189ab
# via uvicorn
python-dotenv==1.2.1 \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@elliot-barn could you change codeowner so that these lock files are not owned by ray-ci ? we do not need to review every single dependency change for release tests.

Copy link
Collaborator

@aslonnie aslonnie left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approval for release test lock file changes.

By default, the number of output blocks is dynamically decided based on
input data size and available resources. You shouldn't manually set this
value in most cases.
timeout_ms: Timeout in milliseconds to poll to until reaching end_offset (default 10000ms).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elaborate on this? For example, is this the timeout per task, or for the read as a whole? What happens when the timeout is reached? Does the job fail?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

Comment on lines 438 to 439
if elapsed_time >= timeout_seconds:
break
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If a read task times out and performs a partial read, I don't think the user would know that from the logs.

Maybe let's add a logging.warning or logging.info so the users aren't surprised (and it's easier to debug)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added logging.warning

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
@bveeramani bveeramani enabled auto-merge (squash) November 19, 2025 01:46
@bveeramani bveeramani merged commit 946e35c into ray-project:master Nov 19, 2025
6 of 7 checks passed
Aydin-ab pushed a commit to Aydin-ab/ray-aydin that referenced this pull request Nov 19, 2025
## Description
Added Kafka as data source, we can now use `read_kafka` for bounded data
source

## Related issues
Closes ray-project#58653

## Additional information

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Aydin Abiar <aydin@anyscale.com>
pavitrabhalla pushed a commit to rayai-labs/ray that referenced this pull request Nov 20, 2025
## Description
Added Kafka as data source, we can now use `read_kafka` for bounded data
source

## Related issues
Closes ray-project#58653

## Additional information

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: Pavitra Bhalla <pavitra@rayai.com>
400Ping pushed a commit to 400Ping/ray that referenced this pull request Nov 21, 2025
## Description
Added Kafka as data source, we can now use `read_kafka` for bounded data
source

## Related issues
Closes ray-project#58653 


## Additional information

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
ykdojo pushed a commit to ykdojo/ray that referenced this pull request Nov 27, 2025
## Description
Added Kafka as data source, we can now use `read_kafka` for bounded data
source

## Related issues
Closes ray-project#58653

## Additional information

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Signed-off-by: YK <1811651+ykdojo@users.noreply.github.com>
SheldonTsen pushed a commit to SheldonTsen/ray that referenced this pull request Dec 1, 2025
## Description
Added Kafka as data source, we can now use `read_kafka` for bounded data
source

## Related issues
Closes ray-project#58653 


## Additional information

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin (Owen) <mses010108@gmail.com>
Signed-off-by: You-Cheng Lin <106612301+owenowenisme@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

data Ray Data-related issues go add ONLY when ready to merge, run all tests

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[Data] Add Bounded Kafka Datasource

3 participants