Skip to content

Conversation

@antfin-oss
Copy link

This Pull Request was created automatically to merge the latest changes from master into main branch.

πŸ“… Created: 2025-11-19
πŸ”€ Merge direction: master β†’ main
πŸ€– Triggered by: Scheduled

Please review and merge if everything looks good.

edoakes and others added 30 commits October 30, 2025 22:15
…t Raylet logs (ray-project#58244)

`test_gcs_fault_tolerance.py:: test_worker_raylet_resubscription` is
still flaky in CI despite bumping up the timeout. Making a few
improvements here:

- Increasing the timeout to `20s` just in case it's a timeout issue
(unlikely).
- Changing to scheduling an actor instead of using `internal_kv` for our
signal that the GCS is back up. This should better indicate that the
Raylet is resubscribed.
- Cleaning up some system logs.
- Modifying the `ObjectLostError` logs to avoid logging
likely-irrelevant plasma usage on owner death.

It's likely that the underlying issue here is that we don't actually
reliably resubscribe to all worker death notifications, as indicated in
the TODO in the PR.

---------

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
removing core_ prefix used on release tests for testing purposes

Original change:
https://github.com/ray-project/ray/pull/57049/files#diff-5879986113a0287dff865f81faf24a2294660b6c4767d5a71fc6281e78101ad6R1380

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Co-authored-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
…ay-project#58255)

## Description
Currently, the `ray.util.state.list_actors(limit=N)` API will return a
details for at most N actors. However, when N exceeds the default value
for `RAY_MAX_LIMIT_FROM_API_SERVER=10_000`, the error will fail with a
misleading message (the error msg being that the dashboard or API server
is unavailable, even if it is available). The reason why this fails is
because we don't handle `ValueErrors`, and default throw a 500 error. My
solution is to handle that, and open to suggestions/alternatives

NOTE: I noticed it still fails with internal_server error, I think this
should be a 4XX error, but it looks like that will require more code
changes since it uses a very ubiquitous function: `do_reply`. Gemini
suggests returning `rest_response` directly, happy to follow those
orders too

### Before
```python
>>> import ray
>>> import ray.util.state
>>> ray.init()
>>> ray.util.state.list_actors(limit=100000)
ray.util.state.exception.RayStateApiException: Failed to make request to http://127.0.0.1:8265/api/v0/actors. Failed to connect to API server. Please check the API server log for details. Make sure dependencies are installed with `pip install ray[default]`. Please also check dashboard is available, and included when starting ray cluster, i.e. `ray start --include-dashboard=True --head`. Response(url=http://127.0.0.1:8265/api/v0/actors?limit=100000&timeout=24&detail=False&exclude_driver=True&server_timeout_multiplier=0.8,status=500)
```

### After
```python
>>> import ray
>>> import ray.util.state
>>> ray.init()
>>> ray.util.state.list_actors(limit=100000)
ray.util.state.exception.RayStateApiException: API server internal error. See dashboard.log file for more details. Error: Given limit 100000 exceeds the supported limit 10000. Use a lower limit, or set the RAY_MAX_LIMIT_FROM_API_SERVER=limit
```

## Related issues
None
## Additional information
None

---------

Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
ugprading tune fault release test to 3.10

Successful release test run:
https://buildkite.com/ray-project/release/builds/65673#

---------

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
upgrading jobs tests to run on python 3.10

Successful release tests:
https://buildkite.com/ray-project/release/builds/65845

---------

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
removing byod compile jobs for release test images
Now using raydepsets to generate lock files

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Co-authored-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
## Description
This PR introduces more information into the `explain` API. Before,
`explain` showed Unoptimized Logical Plan, and Optimized Physical Plan.
To make the `explain` API clearer, I introduce 4 types of plans
- Logical Plan
- Logical Plan (Optimized)
- Physical Plan
- Physical Plan (Optimized)

Example Output
```python
>>> import ray
>>> ray.data.range(1000).select_columns("id").explain()
-------- Logical Plan --------
Project[Project]
+- Read[ReadRange]

-------- Logical Plan (Optimized) --------
Project[Project]
+- Read[ReadRange]

-------- Physical Plan --------
TaskPoolMapOperator[Project]
+- TaskPoolMapOperator[ReadRange]
   +- InputDataBuffer[Input]

-------- Physical Plan (Optimized) --------
TaskPoolMapOperator[ReadRange->Project]
+- InputDataBuffer[Input]
```

## Related issues
None

## Additional information
None

---------

Signed-off-by: EkinKarabulut <ekarabulut@nvidia.com>
Signed-off-by: EkinKarabulut <82878945+EkinKarabulut@users.noreply.github.com>
Signed-off-by: Rueian <rueiancsie@gmail.com>
Signed-off-by: iamjustinhsu <jhsu@anyscale.com>
Co-authored-by: EkinKarabulut <82878945+EkinKarabulut@users.noreply.github.com>
Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com>
Co-authored-by: angelinalg <122562471+angelinalg@users.noreply.github.com>
Co-authored-by: fscnick <6858627+fscnick@users.noreply.github.com>
Co-authored-by: Jiajun Yao <jeromeyjj@gmail.com>
Co-authored-by: Rueian <rueiancsie@gmail.com>
…ses (ray-project#58265)

## Description
> Briefly describe what this PR accomplishes and why it's needed.

Using the ip tables script created in ray-project#58241 we found a bug in
RequestWorkerLease where a RAY_CHECK was being triggered here:
https://github.com/ray-project/ray/blob/66c08b47a195bcfac6878a234dc804142e488fc2/src/ray/raylet/lease_dependency_manager.cc#L222-L223
The issue is that transient network errors can happen ANYTIME, including
when the server logic is executing and has not yet replied back to the
client. Our original testing framework using an env variable to drop the
request or reply when it's being sent, hence this was missed. The issue
specifically is that RequestWorkerLease could be in the process of
pulling the lease dependencies to it's local plasma store, and the retry
can arrive triggering this check. Created a cpp unit test that
specifically triggers this RAY_CHECK without this change and is fixed. I
decided to store the callbacks instead of replacing the older one with
the new one due to the possibility of message reordering where the new
one could arrive before the old one.

---------

Signed-off-by: joshlee <joshlee@anyscale.com>
…yments (ray-project#58073)

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
…irements (ray-project#58323)

Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
…gression guard (ray-project#58289)

Signed-off-by: Nikhil Ghosh <nikhil@anyscale.com>
Signed-off-by: Nikhil G <nrghosh@users.noreply.github.com>
updating docgpu build and to run on py3.10

Previous failed run on postmerge:
https://buildkite.com/ray-project/postmerge/builds/14082#019a2d51-e7f1-4eb9-8745-e8993908eb83

Serve event loop test:
https://buildkite.com/ray-project/postmerge/builds/14084

Rllib gpu tests:
https://buildkite.com/ray-project/postmerge/builds/14085

Serve: doc gpu tests:
https://buildkite.com/ray-project/postmerge/builds/14086

serve: pydantic < 2.0 tests:
https://buildkite.com/ray-project/postmerge/builds/14093

Full postmerge run:
https://buildkite.com/ray-project/postmerge/builds/14091

---------

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Co-authored-by: Nikhil G <nrghosh@users.noreply.github.com>
Co-authored-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
I was testing out `cython-lint` and figured I'd open a PR to see if
there was interest

This flags some issues here such as:
- unused imports
- unused variables
- "pointless string statements"
- f-string without placeholders

Signed-off-by: Marco Gorelli <33491632+MarcoGorelli@users.noreply.github.com>
)

Verify token presence when using ray start CLI  + tests

---------

Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: Sampan S Nayak <sampansnayak2@gmail.com>
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: sampan <sampan@anyscale.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
This PR adds token-based authentication support to the
PythonGcsSubscriber, which previously made direct gRPC calls via the
stub without auth. The rest of the pub-sub layer already uses the shared
gRPC infrastructure (GrpcServer, GrpcClient), which supports token
authentication.

---------

Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: sampan <sampan@anyscale.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…ime virtualenv (ray-project#58084)

If temp_dir is a subdirectory of current virtualenv root, it will cause
copying files recursively and not finishing when ray try to create
runtime virtualenv.

## Description
If temp_dir is a subdirectory of current virtualenv root, when we submit
a task, and specify pip dependencies, ray will create a runtime
virtualenv, cause copying directories recursively, and runtime
virtualenv directory explosion.

---------

Signed-off-by: ideal <idealities@gmail.com>
This PR is a follow-up to
ray-project#56147 (comment)

The `find_free_port()` function was duplicated across multiple locations
in the codebase. This PR consolidates all implementations into a single
canonical location.

---------

Signed-off-by: Yicheng-Lu-llll <luyc58576@gmail.com>
## Description
Adds PredicatePushdown Rule to Ray Data. 

Pushes predicates through Read (for supporting datasources), Union

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
Signed-off-by: abrar <abrar@anyscale.com>
…ct#58281)

Supports token based authentication in dashboard head sdk, all clients
which build on top of the submission_client will now support token auth
out of the box. so this covers all cli commands like job submit, state
api, serve related cli commands etc.

---------

Signed-off-by: sampan <sampan@anyscale.com>
Signed-off-by: Sampan S Nayak <sampansnayak2@gmail.com>
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: sampan <sampan@anyscale.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
this helps prevent an edge case when using file based log exporters like
vector that use fingerprinting
[ref](https://vector.dev/docs/reference/configuration/sources/file/#fingerprint)
to identify unique files.

example edge case that this fixes:
two jobs are submitted to a cluster and begin executing at the same
time, they both contain an invalid entrypoint that references a
nonexistant file

before fix:
- both jobs have the identical "Runtime env is setting up" log with
identical timestamps
  - both jobs have identical entrypoint failure logs
  
as a result, the log files for these jobs are identical, so vector will
only export one.

after fix:
- both jobs have the identical "Runtime env is setting up" log with
identical timestamps
- each job has a **unique** entrypoint log containing its job_id
- both jobs have identical entrypoint failure logs

vector can differentiate between these two files, so both will be
exported

---------

Signed-off-by: Chris Fellowes <chrisfellowes@anyscale.com>
Signed-off-by: chrisfellowes <chrisfellowes@anyscale.com>
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…8341)

When running on older machines with less impressive CPU, and when under
a bit of load. We've seen the dashboard failing to start in the expected
200 iterations (`sleep(0.1)` for 20 seconds). Increases this timeout to 60 seconds.

---------

Signed-off-by: morotti <r.morotti@gmail.com>
Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
Co-authored-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…s-process support (ray-project#58332)

Signed-off-by: Kourosh Hakhamaneshi <kourosh@anyscale.com>
…oject#58344)

so that we can perform surgeries in the last minute if required.

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
add `--use-pep517` flag; otherwise some part of the wheel building logic
does not work on windows python 3.12

also removes the unnecessary "uninstall+reinstall" dance. the script
only builds the wheel, it does not (and should not) install the wheel.

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
observability release tests on py3.10

Successful release test run:
https://buildkite.com/ray-project/release/builds/65851
failing tests are set to manual (disabled):
aws_cluster_launcher_release_image
k8s_serve_ha_test

enabling agent_stress_test.gce which is now passing

---------

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
## Description
Replace `map_batches` and numpy invocations with `with_column` and arrow
kernels

Release test:
https://buildkite.com/ray-project/release/builds/66243#019a37da-4d9d-4f19-9180-e3f3dc3f8043

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
…collate_fn (ray-project#58327)

Signed-off-by: Gang Zhao <gang@gang-JQ62HD2C37.local>
Co-authored-by: Gang Zhao <gang@gang-JQ62HD2C37.local>
owenowenisme and others added 22 commits November 17, 2025 14:09
…ect#58699)

## Description
Previously we will try slice the block when `self._total_pending_rows >=
self._target_num_rows` or `flush_remaining` is True, but flush_remaining
doesn't mean `self._total_pending_rows >= self._target_num_rows ` so it
could make the slicing failed because our slicing logic is based on
assumption there should be at least one full block.

This PR fix the logic and added test for such case.

---------

Signed-off-by: You-Cheng Lin <mses010108@gmail.com>
…bles (ray-project#58241)

Signed-off-by: joshlee <joshlee@anyscale.com>
Co-authored-by: Dhyey Shah <dhyey2019@gmail.com>
Getting rid of the excessive `while True` loops & timeouts in the tests
(we already wait for the dashboard to be up).

Also just cleaned up some comments and naming while I was poking around.

---------

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…enable distributed lightgbm training (ray-project#58709)

The default is tree_learner=”serial”, which trains a separate model per
worker. Users should set tree_learner in order to configure lightgbm to
train a single model across all the dataset shards. `pre_partition`
should also be set if using Ray Data to shard the dataset.

---------

Signed-off-by: Justin Yu <justinvyu@anyscale.com>
…sting (ray-project#57178)

This commit takes a step towards utilizing real classes during unit
testing instead of mocks for components that are simple and local (e.g.
reference counter). We remove the reference counter mock as promised in
ray-project#57177 (review).

By directly testing on the real reference_counter, we are able to catch
a behavior mismatch between AddNewActorHandle and its doc described
behavior. Specifically, the EmplaceNewActorHandle doc states that the
function should return false instead of failing due to ray check and the
test tests exactly that. However, due to calling AddOwnedObject,
EmplaceNewActorHandle actually ray check fails. The commit addresses
this issue by making AddNewActorHandle idempotent.

---------

Signed-off-by: davik <davik@anyscale.com>
Signed-off-by: Kunchen (David) Dai <54918178+Kunchd@users.noreply.github.com>
Co-authored-by: davik <davik@anyscale.com>
Co-authored-by: Ibrahim Rabbani <irabbani@anyscale.com>
Co-authored-by: Ibrahim Rabbani <israbbani@gmail.com>
…inal state (ray-project#58287)

1) Add traceback to all `ControllerError`s and log it when making a
failure decision so we can see where `Worker group is not active. Call
WorkerGroup.create() to create a new worker group.` is coming from. **I
also sanity checked that this does not cause
`UserExceptionWithTraceback` to double print the traceback because this
only applies to ControllerError**
2) `_poll_workers` has the only `asyncio.sleep` in the Ray Train
controller. After waking up, it exits from the foreground asyncio task
if its state is terminal, which can happen due to the issue mentioned in
5).

---------

Signed-off-by: Timothy Seah <tseah@anyscale.com>
Signed-off-by: ahao-anyscale <ahao@anyscale.com>
updating logic for checking valid subsets of expanded depsets

Ex. original depset -> expanded depset -> subsetted depset

if the subsetted depset containers reuirement files that are in the
original depset raydepsets will throw an error

This pr is to fix the logic and consider the original depset requirement
files to determine whether the subsetted depset is a valid subset

Updated unit tests

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
…roject#58453)

rather than python 3.9. python 3.9 is end of life now.

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
Signed-off-by: dayshah <dhyey2019@gmail.com>
…-project#58650)

## Description
The data grafana dashboard by default should work well when viewing
across all operators. Use a percentile graph that is grouped by operator
instead.

For the histogram bar charts, hide that in the operator panels row. This
is useful for areas where we by default filter by a single operator
(like in the data dashboard)

<img width="1163" height="379" alt="Screenshot 2025-11-14 at 2 33 16β€―PM"
src="https://github.com/user-attachments/assets/fd4a3d0c-4a60-4bb9-a803-859b3ed14a59"
/>
<img width="1158" height="430" alt="Screenshot 2025-11-14 at 2 33 08β€―PM"
src="https://github.com/user-attachments/assets/a78b4dde-8066-478b-86b4-b838761431f2"
/>

---------

Signed-off-by: Alan Guo <aguo@anyscale.com>
…ct#58733)

needs to keep this until all release tests are migrated.

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
merge everything into AnyscaleJobRunner

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
…oject#58231)

### Description
This PR will:
1- Introduce `GroupedData.with_column`, allowing grouped datasets to
evaluate Ray Data expressions per group while preserving existing
columns.
2- Validate the supplied expression type (reject non‑Expr and
DownloadExpr since the expression evaluator can’t visit downloads as far
as I understand) and reuse the projection engine so grouping flows stay
aligned with the dataset-level expression API.
3- Add tests for grouped expression usage through udf-based and
arithmetic expressions.

### Related issues
Closes ray-project#57907

---------

Signed-off-by: YoussefEssDS <oyoussefesseddiq@gmail.com>
Co-authored-by: Balaji Veeramani <balaji@anyscale.com>
…58718)

not being used any more since a loong time ago.

Signed-off-by: Lonnie Liu <lonnie@anyscale.com>
generating lock files for base-deps image

Including setuptools in base-deps requirements due to
https://buildkite.com/ray-project/release/builds/67063#019a7066-7935-4eea-ac7b-3bf62705b81d/142-3586

Copying lock file to image and uv pip installing the deps

Hello world release test run:
https://buildkite.com/ray-project/release/builds/67552

---------

Signed-off-by: elliot-barn <elliot.barnwell@anyscale.com>
Co-authored-by: Lonnie Liu <95255098+aslonnie@users.noreply.github.com>
`add_command_alias` deepcopy the `Command` object but it's no longer
deep-copyable:
pallets/click#3065 (comment). We
can remove it completely since it just creates some command aliases that
have been deprecated for 5+ years.

Closes ray-project#56747

---------

Signed-off-by: Jiajun Yao <jeromeyjj@gmail.com>
…BRT: "corrupted size vs. prev_size") (ray-project#58660)

## Summary

This PR fixes a heap corruption bug that causes the driver to crash with
SIGABRT. The issue is caused by a use-after-free when the `RayletClient`
object is destroyed while an asynchronous RPC callback is still pending.

## Problem Description

### Scenario

A Ray Data job (Ray 2.50.0) with pipeline `read_parquet -> filter ->
map_batches -> write` running for 4+ hours, where workers use elastic
resources with low job priority causing frequent worker deaths due to
pod preemption, crashes the driver with SIGABRT:
```
corrupted size vs. prev_size
*** SIGABRT received at time=1761916578 on cpu 30 ***
PC: @ 0x7f073569d9fc (unknown) pthread_kill
Aborted (core dumped)
```



### Trigger Conditions

After reproducing with an ASan image, Asan reveals the actual
use-after-free at:
```
 #0 0x7ff282967361 in std::__atomic_base<long>::fetch_sub(long, std::memory_order) /usr/include/c++/11/bits/atomic_base.h:628
    #1 0x7ff282967361 in std::__atomic_base<long>::operator--(int) /usr/include/c++/11/bits/atomic_base.h:377
    #2 0x7ff282967361 in operator() src/ray/raylet_rpc_client/raylet_client.cc:338
    #3 0x7ff282967361 in __invoke_impl<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:61
    #4 0x7ff282967361 in __invoke_r<void, ray::rpc::RayletClient::PinObjectIDs(const ray::rpc::Address&, const std::vector<ray::ObjectID>&, const ray::ObjectID&, ray::rpc::ClientCallback<ray::rpc::PinObjectIDsReply>&)::<lambda(ray::Status, ray::rpc::PinObjectIDsReply&&)>&, const ray::Status&, ray::rpc::PinObjectIDsReply> /usr/include/c++/11/bits/invoke.h:111
    #5 0x7ff282967361 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #6 0x7ff2829fbadf in std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>::operator()(ray::Status const&, ray::rpc::PinObjectIDsReply&&) const /usr/include/c++/11/bits/std_function.h:590
    #7 0x7ff2829fbadf in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}::operator()(ray::Status const&) const bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:293
    #8 0x7ff2829fbadf in void std::__invoke_impl<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(std::__invoke_other, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:61
    #9 0x7ff2829fbadf in std::enable_if<is_invocable_r_v<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>, void>::type std::__invoke_r<void, ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status>(ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}&, ray::Status&&) /usr/include/c++/11/bits/invoke.h:111
    #10 0x7ff2829fbadf in std::_Function_handler<void (ray::Status), ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Create<ray::rpc::NodeManagerService, ray::rpc::PinObjectIDsRequest, ray::rpc::PinObjectIDsReply>(std::weak_ptr<ray::rpc::RetryableGrpcClient>, std::unique_ptr<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply>, std::default_delete<grpc::ClientAsyncResponseReader<ray::rpc::PinObjectIDsReply> > > (ray::rpc::NodeManagerService::Stub::*)(grpc::ClientContext*, ray::rpc::PinObjectIDsRequest const&, grpc::CompletionQueue*), std::shared_ptr<ray::rpc::GrpcClient<ray::rpc::NodeManagerService> >, std::__cxx11::basic_string<char, std::char_traits<char>, std::allocator<char> >, ray::rpc::PinObjectIDsRequest, std::function<void (ray::Status const&, ray::rpc::PinObjectIDsReply&&)>, long)::{lambda(ray::Status const&)#2}>::_M_invoke(std::_Any_data const&, ray::Status&&) /usr/include/c++/11/bits/std_function.h:290
    #11 0x7ff2834657e9 in std::function<void (ray::Status)>::operator()(ray::Status) const /usr/include/c++/11/bits/std_function.h:590
    #12 0x7ff2834657e9 in ray::rpc::RetryableGrpcClient::RetryableGrpcRequest::Fail(ray::Status const&) bazel-out/k8-dbg/bin/src/ray/rpc/_virtual_includes/retryable_grpc_client/ray/rpc/retryable_grpc_client.h:109
    #13 0x7ff2834657e9 in operator() src/ray/rpc/retryable_grpc_client.cc:30
    #14 0x7ff2834657e9 in __invoke_impl<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #15 0x7ff2834657e9 in __invoke_r<void, ray::rpc::RetryableGrpcClient::~RetryableGrpcClient()::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #16 0x7ff2834657e9 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
2025-11-14 16:15:05,608	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #17 0x7ff2834e2407 in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #18 0x7ff2834e2407 in EventTracker::RecordExecution(std::function<void ()> const&, std::shared_ptr<StatsHandle>) src/ray/common/event_stats.cc:112
    #19 0x7ff2834bea54 in operator() src/ray/common/asio/instrumented_io_context.cc:110
    #20 0x7ff2834bea54 in __invoke_impl<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #21 0x7ff2834bea54 in __invoke_r<void, instrumented_io_context::post(std::function<void()>, std::string, int64_t)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #22 0x7ff2834bea54 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #23 0x7ff28242fb5b in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #24 0x7ff28242fb5b in boost::asio::detail::binder0<std::function<void ()> >::operator()() external/boost/boost/asio/detail/bind_handler.hpp:60
    #25 0x7ff28242fb5b in void boost::asio::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, ...) external/boost/boost/asio/handler_invoke_hook.hpp:88
    #26 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, std::function<void ()>&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54
    #27 0x7ff28242fb5b in void boost::asio::detail::asio_handler_invoke<boost::asio::detail::binder0<std::function<void ()> >, std::function<void ()> >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >*) external/boost/boost/asio/detail/bind_handler.hpp:111
    #28 0x7ff28242fb5b in void boost_asio_handler_invoke_helpers::invoke<boost::asio::detail::binder0<std::function<void ()> >, boost::asio::detail::binder0<std::function<void ()> > >(boost::asio::detail::binder0<std::function<void ()> >&, boost::asio::detail::binder0<std::function<void ()> >&) external/boost/boost/asio/detail/handler_invoke_helpers.hpp:54
    #29 0x7ff28242fb5b in boost::asio::detail::executor_op<boost::asio::detail::binder0<std::function<void ()> >, std::allocator<void>, boost::asio::detail::scheduler_operation>::do_complete(void*, boost::asio::detail::scheduler_operation*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/executor_op.hpp:70
    #30 0x7ff2838607d6 in boost::asio::detail::scheduler_operation::complete(void*, boost::system::error_code const&, unsigned long) external/boost/boost/asio/detail/scheduler_operation.hpp:40
    #31 0x7ff2838607d6 in boost::asio::detail::scheduler::do_run_one(boost::asio::detail::conditionally_enabled_mutex::scoped_lock&, boost::asio::detail::scheduler_thread_info&, boost::system::error_code const&) external/boost/boost/asio/detail/impl/scheduler.ipp:492
    #32 0x7ff283892d35 in boost::asio::detail::scheduler::run(boost::system::error_code&) external/boost/boost/asio/detail/impl/scheduler.ipp:210
    #33 0x7ff2838981e0 in boost::asio::io_context::run() external/boost/boost/asio/impl/io_context.ipp:63
2025-11-14 16:15:05,742	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #34 0x7ff281e9d0aa in operator() src/ray/core_worker/core_worker_process.cc:193
    #35 0x7ff281e9d247 in run external/boost/boost/thread/detail/thread.hpp:120
    #36 0x7ff282503c47 in thread_proxy external/boost/libs/thread/src/pthread/thread.cpp:179
    #37 0x7ff28b013ac2 in start_thread nptl/pthread_create.c:442
    #38 0x7ff28b0a58bf  (/lib/x86_64-linux-gnu/libc.so.6+0x1268bf)

0x50c003fd3d30 is located 112 bytes inside of 120-byte region [0x50c003fd3cc0,0x50c003fd3d38)
freed by thread T68 here:
2025-11-14 16:15:05,876	INFO streaming_executor_state.py:511 -- Running activate tasks is {'MapBatches(QwenInfer)': ['MapBatches(QwenInfer)-79153', 'MapBatches(QwenInfer)-80170', 'MapBatches(QwenInfer)-80225', 'MapBatches(QwenInfer)-80299', 'MapBatches(QwenInfer)-82624'], 'MapBatches(drop_columns)->Write': ['MapBatches(drop_columns)->Write-25244', 'MapBatches(drop_columns)->Write-34438', 'MapBatches(drop_columns)->Write-34439', 'MapBatches(drop_columns)->Write-34440', 'MapBatches(drop_columns)->Write-34441']}
    #0 0x7ff28b39924f in operator delete(void*, unsigned long) ../../../../src/libsanitizer/asan/asan_new_delete.cpp:172
    #1 0x7ff281eceb5f in __gnu_cxx::new_allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >::deallocate(std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/ext/new_allocator.h:145
    #2 0x7ff281eceb5f in std::allocator_traits<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::deallocate(std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> >&, std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>*, unsigned long) /usr/include/c++/11/bits/alloc_traits.h:496
    #3 0x7ff281eceb5f in std::__allocated_ptr<std::allocator<std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2> > >::~__allocated_ptr() /usr/include/c++/11/bits/allocated_ptr.h:74
    #4 0x7ff281eceb5f in std::_Sp_counted_ptr_inplace<ray::rpc::RayletClient, std::allocator<ray::rpc::RayletClient>, (__gnu_cxx::_Lock_policy)2>::_M_destroy() /usr/include/c++/11/bits/shared_ptr_base.h:538
    #5 0x7ff282a73f0a in std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() /usr/include/c++/11/bits/shared_ptr_base.h:184
    #6 0x7ff282a73f0a in std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() /usr/include/c++/11/bits/shared_ptr_base.h:705
    #7 0x7ff282a73f0a in std::__shared_ptr<ray::RayletClientInterface, (__gnu_cxx::_Lock_policy)2>::~__shared_ptr() /usr/include/c++/11/bits/shared_ptr_base.h:1154
    #8 0x7ff282a73f0a in std::shared_ptr<ray::RayletClientInterface>::~shared_ptr() /usr/include/c++/11/bits/shared_ptr.h:122
    #9 0x7ff282a73f0a in std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >::~pair() /usr/include/c++/11/bits/stl_pair.h:211
    #10 0x7ff282a73f0a in void __gnu_cxx::new_allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/ext/new_allocator.h:168
    #11 0x7ff282a73f0a in void std::allocator_traits<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::destroy<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >&, std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> >*) /usr/include/c++/11/bits/alloc_traits.h:535
    #12 0x7ff282a73f0a in void absl::lts_20230802::container_internal::map_slot_policy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/container_memory.h:421
    #13 0x7ff282a73f0a in void absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/flat_hash_map.h:578
    #14 0x7ff282a73f0a in void absl::lts_20230802::container_internal::common_policy_traits<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, void>::destroy<std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >(std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > >*, absl::lts_20230802::container_internal::map_slot_type<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >*) external/com_google_absl/absl/container/internal/common_policy_traits.h:50
    #15 0x7ff282a73f0a in absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::erase(absl::lts_20230802::container_internal::raw_hash_set<absl::lts_20230802::container_internal::FlatHashMapPolicy<ray::NodeID, std::shared_ptr<ray::RayletClientInterface> >, absl::lts_20230802::hash_internal::Hash<ray::NodeID>, std::equal_to<ray::NodeID>, std::allocator<std::pair<ray::NodeID const, std::shared_ptr<ray::RayletClientInterface> > > >::iterator) external/com_google_absl/absl/container/internal/raw_hash_set.h:2183
    #16 0x7ff282a73f0a in ray::rpc::RayletClientPool::Disconnect(ray::NodeID) src/ray/raylet_rpc_client/raylet_client_pool.cc:114
    #17 0x7ff282a7aa61 in operator() src/ray/raylet_rpc_client/raylet_client_pool.cc:69
    #18 0x7ff282a7ac66 in __invoke_impl<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:61
    #19 0x7ff282a7ac66 in __invoke_r<void, ray::rpc::RayletClientPool::GetDefaultUnavailableTimeoutCallback(ray::gcs::GcsClient*, ray::rpc::RayletClientPool*, const ray::rpc::Address&)::<lambda()>&> /usr/include/c++/11/bits/invoke.h:111
    #20 0x7ff282a7ac66 in _M_invoke /usr/include/c++/11/bits/std_function.h:290
    #21 0x7ff28346a1ac in std::function<void ()>::operator()() const /usr/include/c++/11/bits/std_function.h:590
    #22 0x7ff28346a1ac in ray::rpc::RetryableGrpcClient::CheckChannelStatus(bool) src/ray/rpc/retryable_grpc_client.cc:85
    #23 0x7ff28346c06a in operator() src/ray/rpc/retryable_grpc_client.cc:45
```

This is a **non-deterministic race condition** that occurs under the
following sequence:

1. Worker A's pod is preempted β†’ Worker A dies
2. Objects on Worker A are lost
3. Objects are found on Worker B β†’ `PinObjectIDs` RPC is initiated
4. Worker B dies or becomes unavailable β†’ `CheckChannelStatus` detects
this β†’ `Disconnect` is called
5. The `RayletClient` corresponding to Worker B on the driver is
destroyed
6. RPC callback executes and accesses the already-freed `RayletClient` β†’
use-after-free triggers crash

Whether the use-after-free occurs depends on the relative timing of
steps 5 and 6. In scenarios with frequent pod preemptions, object
recovery frequently triggers `PinObjectIDs`, making this race condition
more likely to occur.

### Root Cause

In `RayletClient::PinObjectIDs`, the RPC callback lambda directly
captured the raw `this` pointer:

```cpp
auto rpc_callback = [this, callback = std::move(callback)](...) {
    pins_in_flight_--;  // Accessing member via 'this' pointer
    ...
};
```

If the `RayletClient` object is destroyed before the async RPC callback
executes, the callback will access freed memory through the dangling
`this` pointer, leading to heap corruption and SIGABRT with the error
message "corrupted size vs. prev_size".

## Solution

The fix ensures that the `RayletClient` object remains alive during the
asynchronous callback execution by:

1. **Inheriting from `std::enable_shared_from_this<RayletClient>`**: The
class already inherits from this base class (line 43 in
`raylet_client.h`), which enables safe shared pointer management.

2. **Capturing `shared_from_this()` in the lambda**: Instead of
capturing the raw `this` pointer, the callback now captures a
`shared_ptr` to the object. The `shared_from_this()` is called before
incrementing `pins_in_flight_` to ensure proper lifetime management:

```cpp
// Capture shared_from_this() before incrementing to ensure object lifetime
// is extended for the async callback, preventing use-after-free.
auto self = shared_from_this();
pins_in_flight_++;
auto rpc_callback = [self, callback = std::move(callback)](
                        Status status, rpc::PinObjectIDsReply &&reply) {
  self->pins_in_flight_--;
  callback(status, std::move(reply));
};
```

This ensures that the `RayletClient` object's lifetime is extended until
the callback completes, preventing the use-after-free bug. By capturing
the shared pointer before incrementing the counter, we also ensure that
if `shared_from_this()` were to fail (though it shouldn't in normal
usage), we don't leave the counter in an inconsistent state.

## Code Changes

- **File**: `src/ray/raylet_rpc_client/raylet_client.cc`
- **Method**: `RayletClient::PinObjectIDs`
- **Change**: Replace `this` capture with `shared_from_this()` capture
in the RPC callback lambda

Signed-off-by: dragongu <andrewgu@vip.qq.com>
Co-authored-by: gulonglong <gulonglong@stepfun.com>
…h projects (ray-project#58688)

## Description
1. Use `rows_same` util for the tests in `test_projection_fusion`
2. Properly handle pushing predicates past projections

## Related issues
> Link related issues: "Fixes ray-project#1234", "Closes ray-project#1234", or "Related to
ray-project#1234".

## Additional information
> Optional: Add implementation details, API changes, usage examples,
screenshots, etc.

---------

Signed-off-by: Goutam <goutam@anyscale.com>
The GIL makes checking `self._serialize_cache is not None` atomic, so we
don't need lock.

Signed-off-by: Xinyuan <43737116+xinyuangui2@users.noreply.github.com>
…_this` (ray-project#58744)

While writing the [follow
up](https://github.com/ray-project/ray/pull/58660/files#r2538761540) to
ban stack-allocated `RayletClient` instances, @dayshah proposed a
simpler solution: make the counter a shared_ptr instead of capturing the
entire class.

---------

Signed-off-by: Edward Oakes <ed.nmi.oakes@gmail.com>
…on. (ray-project#58748)

The test was flaky because the pid `123` could actually exist.

Signed-off-by: irabbani <irabbani@anyscale.com>
Copy link

@sourcery-ai sourcery-ai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The pull request #681 has too many files changed.

The GitHub API will only let us fetch up to 300 changed files, and this pull request has 5405.

@gemini-code-assist
Copy link

Summary of Changes

Hello @antfin-oss, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request introduces a comprehensive set of changes to the Ray project's build and CI infrastructure. It focuses on modernizing dependency management, improving code quality, and enhancing the efficiency and reliability of the CI pipelines. The changes span across Bazel configuration, Buildkite CI setup, Docker image builds, and code formatting/linting processes, aiming to create a more robust and maintainable development environment.

Highlights

  • Bazel Configuration: Updates the Bazel configuration to improve build environment determinism and dependency management.
  • Buildkite CI: Refactors Buildkite CI pipelines for better organization, efficiency, and test coverage, including adding new test suites and improving existing ones.
  • Dependency Management: Modernizes dependency management using uv and raydepsets for improved Python dependency resolution and caching.
  • Code Formatting and Linting: Enhances code formatting and linting processes, including updates to pre-commit hooks and improved checks for code style and documentation.
  • Docker Image Builds: Streamlines Docker image builds by optimizing base images, improving caching, and adding support for multiple architectures (x86_64 and aarch64).
Ignored Files
  • Ignored by pattern: .gemini/** (1)
    • .gemini/config.yaml
  • Ignored by pattern: .github/workflows/** (1)
    • .github/workflows/stale_pull_request.yaml
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with πŸ‘ and πŸ‘Ž on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩

Copy link

@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request is a massive and impressive refactoring of the entire CI/CD and build system. The changes are extensive, touching almost every aspect of the build process, from Bazel configurations and dependency management to CI pipeline definitions and Dockerfiles. Key themes include modularization of build steps, adoption of modern tooling like pre-commit and uv, dropping support for older platforms like macOS x86_64 and Python 3.9, and improving build performance. The introduction of a new dependency management system raydepsets is a significant addition. Overall, these changes represent a major step forward in improving the project's maintainability and build robustness. My review focuses on a couple of areas for further improvement.

trap cleanup EXIT
(which bazel && bazel clean) || true
if [[ "$(uname -m)" == "arm64" ]]; then
brew install pkg-config nvm node || true

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The || true will suppress errors if brew install fails. This could hide issues in the build environment setup. If pkg-config, nvm, or node are required for subsequent steps, their installation failure should ideally fail the build. Consider removing || true if these dependencies are critical.

Suggested change
brew install pkg-config nvm node || true
brew install pkg-config nvm node

Comment on lines +102 to +110
echo "--- Extract redis binaries"

mkdir -p python/ray/core/src/ray/thirdparty/redis/src
if [[ "${HOSTTYPE}" =~ ^aarch64 ]]; then
REDIS_BINARY_URL="https://github.com/ray-project/redis/releases/download/7.2.3/redis-linux-arm64.tar.gz"
else
REDIS_BINARY_URL="https://github.com/ray-project/redis/releases/download/7.2.3/redis-linux-x86_64.tar.gz"
fi
curl -sSL "${REDIS_BINARY_URL}" -o - | tar -xzf - -C python/ray/core/src/ray/thirdparty/redis/src

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The redis binaries are being downloaded from GitHub releases using curl. This can be fragile and dependent on network availability and GitHub's uptime. Since the redis binaries are already defined as http_archive dependencies in the WORKSPACE file, it would be more robust and hermetic to include them in the ray_pkg.zip artifact. This can be achieved by adding :redis_files to the srcs of the ray_pkg_zip target in the root BUILD.bazel file. This would eliminate the need to download them here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.