Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Dask] Race condition in finding ports #5865

Open
adfea9c0 opened this issue May 2, 2023 · 11 comments
Open

[Dask] Race condition in finding ports #5865

adfea9c0 opened this issue May 2, 2023 · 11 comments
Labels

Comments

@adfea9c0
Copy link

adfea9c0 commented May 2, 2023

Description

Dask LightGBM will sometimes try to bind to ports that were previously free, but are now used by a different program.

Specifically, it seems that the python code in LGBMDaskRegressor [1] finds open ports, saves the port number and then immediately closes them. After that the C++ layer will try reopening the port [2]. This can go wrong when another program binds to the port between these two steps.

I would say most of my runs succeed but I've ran into the 'LightGBMError: Binding port blah failed' error a handful of times now, and I'm fairly confident the above race condition is the issue.

[1]

def _find_n_open_ports(n: int) -> List[int]:

[2]
if (listener_->Bind(port)) {

Reproducible example

It's kind of hard to reproduce this reliably since it's effectively a race condition. I hope my description of the issue suffices, let me know if I can do more to help.

Environment info

I'm using LightGBM 3.3.2 on Dask.

@jameslamb jameslamb added the dask label May 2, 2023
@jmoralez
Copy link
Collaborator

jmoralez commented May 2, 2023

Hi @adfea9c0, thanks for raising this. Indeed that can happen because of the reasons you describe. The optimal solution IMO would be allowing to pass 0 as port to the C++ side and have that find a random open port and acquire it immediately. I'll take a look soon to see if that'd be possible.

@jameslamb
Copy link
Collaborator

jameslamb commented May 3, 2023

+1 thanks for reporting this so we have a specific issue to track. Linking some other related things:

@jmoralez I have one idea to consider that might be a quick way to make these issues less likely (inspired by @adfea9c0 's comment that _find_n_open_ports() immediately frees the ports it finds).

Maybe we could change the contract for _find_n_open_ports() to something like _claim_n_open_ports(), where it opens but does not close a socket to claim the port, and then have _train_part() somehow free that port again right before calling .fit() here?

All the time spent between the beginning of _train_part() and that line collecting and concatenating all the local data is time for another process to come in and claim that port. If we could instead know that the port is claimed by LightGBM until right before .fit() is called, the window of time for such conflicts could be reduced significantly.


I mention this Python-side just because I think the C++ side's use of collective communications (where every worker process can talk directly to every other worker process, and therefore they all are initialized with that machines configuration listing all the IP addresses + ports) would make it pretty challenging for each worker process to find its own local free port. Not sure how the workers would find each other, you know?

@adfea9c0
Copy link
Author

adfea9c0 commented May 4, 2023

Would it be possible to have _find_n_open_ports call the C++ layer directly and just immediately and permanently claim the port right there? Or does that not make sense?

@jameslamb
Copy link
Collaborator

jameslamb commented May 4, 2023

Maybe, but the exact mechanism of "permanently claim" could be difficult.

Right now, LightGBM's distributed training code in C++ assumes the following:

And on the Dask side, it's important to remember that:

  • whatever we do needs to work with both worker processes on the same physical machine (LocalCluster) and worker processes on physically different machines

So what would it mean for lightgbm.dask to invoke some C++ "claim a port on every machine" code prior to beginning training? Would it mean opening a TcpSocket object and returning its handle (a pointer to it), then somehow sharing that pointer with the LightGBM training process that later starts up on the same machine?

That might be possible, but I think it'd be difficult to get right in a way that doesn't leak sockets or processes.

I don't say all this to discourage an attempt to do this on the C++ side, just saying that the suggestion I gave above about reducing the time between determining a set of ports to use and actually starting up training is where I'd probably start if I was working on this problem, since it's easier to reason about and requires less-invasive changes in LightGBM (at the expense of only benefiting lightgbm.dask and not other LightGBM interfaces).

@jmoralez
Copy link
Collaborator

jmoralez commented May 5, 2023

Yeah I agree that's a good thing to try first. We just need to figure out a way to do it, I don't think sockets are serializable so we may need to use actors or something similar. I can work on that and open a PR.

@jameslamb
Copy link
Collaborator

I don't think sockets are serializable so we may need to use actors or something similar

I have a much worse, Dask-specific half-idea, but maybe it could work... could we run a function to claim and never release a port on each worker...

def _claim_a_port_until_killed(port: int):
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('', port))
        while True:
            time.sleep(10)

futures = client.run(_claim_a_port_until_killed)

And then somehow have each _train_part() cancel that function right before calling .fit(), like this

def _train_part(...):
    # ... all that data-collecting code ... #
    with worker_client() as client:
         client.cancel(the_future_holding_the_port_for_this_worker)
    model.fit()
    # ...

@jameslamb
Copy link
Collaborator

(I won't be offended if you say "absolutely not that's way too weird")

@adfea9c0
Copy link
Author

adfea9c0 commented May 5, 2023

So what would it mean for lightgbm.dask to invoke some C++ "claim a port on every machine" code prior to beginning training? Would it mean opening a TcpSocket object and returning its handle (a pointer to it), then somehow sharing that pointer with the LightGBM training process that later starts up on the same machine?

Oh I see, so when you claim a port in python, the C++ process is not yet running, and you want to claim a port first since the C++ process expects ports to be known upon initialization?

worker processes are the only LightGBM distributed training process happening on that machine

I didn't know this -- does machine here really mean physical machine? I think I've previously had some dask workers run on the same physical box and I don't think I ran into issues, but maybe I misremember. But if this is the case, an intermediate solution for me personally would be to a) enforce a single worker per physical machine (*) b) just locally reserve a port for training across our network and pass that in with local_listen_port ?

*) I've personally noticed LightGBM benefits from few workers with many threads rather than many workers with few threads anyway.

@jameslamb
Copy link
Collaborator

you want to claim a port first since the C++ process expects ports to be known upon initialization

Correct.

does machine here really mean physical machine?

Sorry about that! I misspoke, just edited that comment. you can run multiple distributed training processes on the same physical machine, but not within the same process.

More of a problem on a single-physical-machine setup using LocalCluster. See that comment I linked for why: #5510 (comment).

just locally reserve a port for training across our network and pass that in with local_listen_port

Reserving a port ahead of time in your cluster should totally eliminate the risk of hitting this race condition.

But to be clear, it isn't required that it be via the local_listen_port mechanism. local_listen_port is a convenience, in the case where you want every LightGBM worker process to use the same port. All that's required is that every worker process have a complete list of ip:port pairs describing how to talk to all the other workers.

To have the most control, you can pass such an explicit list with the machine parameter: https://lightgbm.readthedocs.io/en/v3.3.2/Parallel-Learning-Guide.html#using-specific-ports

@adfea9c0
Copy link
Author

What is the reason ports need to be selected before distributing the data? Is it because you want the params and training data to be in a single client.submit call? Would it be possible to include dask futures in the params, i.e. set params["machines"] equal to some future that resolves to calling _find_n_open_ports, and only instantiate the future in the _train_part call?

@jameslamb
Copy link
Collaborator

Ports needs to be decided and broadcast to all workers before distributed training starts, because workers use worker-to-worker communication, as I explained in #5865 (comment).

Because they use worker-to-worker direct communication, not some central "driver" process, finding a port for a given worker process can't be done by that worker process itself... it'd have no way to communicate that information to the other workers.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

3 participants