Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dask] make random port search more resilient to random collisions (fixes #4057) #4133

Merged
merged 7 commits into from
Mar 31, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions python-package/lightgbm/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,18 @@ def _machines_to_worker_map(machines: str, worker_addresses: List[str]) -> Dict[
return out


def _worker_map_has_duplicates(worker_map: Dict[str, int]) -> bool:
"""Check if there are any duplicate IP-port pairs in a ``worker_map``."""
host_to_port = defaultdict(set)
for worker, port in worker_map.items():
host = urlparse(worker).hostname
if port in host_to_port[host]:
return True
else:
host_to_port[host].add(port)
return False


def _train(
client: Client,
data: _DaskMatrixLike,
Expand Down Expand Up @@ -371,6 +383,18 @@ def _train(
_find_random_open_port,
workers=list(worker_addresses)
)
# handle the case where _find_random_open_port() produces duplicates
retries_left = 10
while _worker_map_has_duplicates(worker_address_to_port) and retries_left > 0:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm afraid that multiple re-runs of the same function still have high probability to produce same values, especially on the same physical machine.

If a is omitted or None, the current system time is used. If randomness sources are provided by the operating system, they are used instead of the system time (see the os.urandom() function for details on availability).
https://docs.python.org/3/library/random.html#random.seed

I believe that more reliable way to handle the case of same ports will be to resolve it manually by simply incrementing ports while conflicts are not resolved.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'll change this to something like that. I was worried about making this a lot more complicated, but maybe it's unavoidable.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm sorry for this, I'll try to come up with a solution as well. FWIW the "randomness" isn't related to python's random.seed, since we're asking the OS for an open port and it decides which one to give us. I believe the collisions happen when a worker has completed the function and freed the port and another one is just asking for it and the OS just returns the same one (kinda troll). I'll see if we can put the port in wait for a bit or something like that.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alright, I tried a different approach in 05303c8. I think this will be more reliable. Instead of re-running _find_random_port() all at once for all workers again, the process will be like:

  1. run _find_random_port() for every worker
  2. if any duplicate IP-port pairs were found, run _find_random_port() again only for those workers that need to be changed to eliminate duplicates

I think that pattern (only re-running for the workers that need to be changed to resolve duplicates) should give us confidence that duplicates will be resolved, because the system time will change each time that function is run.

I think this approach, will still relies on _find_random_port(), is preferable to just incrementing and checking if the new port is open, because it should (on average) find a new open port more quickly than the incrementing approach. Consider the case where, for example, LightGBM tries to put multiple workers in a LocalCluster on port 8887 (1 less than the default port for Jupyter). Jupyter uses such an approach of "increment by one until you find an open port", so if someone has multiple Jupyter sessions running it's possible that they might have ports 8888, 8889, 8890, and 8891 all occupied (for example), which would mean LightGBM would need 5 attempts to find a new open port (if 8892 is open).

I think the existence of systems like this is why Dask also searches randomly if the default port it prefers for its scheduler is occupied when you run dask-scheduler (instead of incrementing). You can see that in the logs for LightGBM's Dask tests that use dsitributed.LocalCluster, for example from this build:

  /opt/conda/envs/test-env/lib/python3.9/site-packages/distributed/node.py:151: UserWarning: Port 8787 is already in use.
  Perhaps you already have a cluster running?
  Hosting the HTTP server on port 37211 instead
    warnings.warn(

  /opt/conda/envs/test-env/lib/python3.9/site-packages/distributed/node.py:151: UserWarning: Port 8787 is already in use.
  Perhaps you already have a cluster running?
  Hosting the HTTP server on port 35045 instead
    warnings.warn(

  /opt/conda/envs/test-env/lib/python3.9/site-packages/distributed/node.py:151: UserWarning: Port 8787 is already in use.
  Perhaps you already have a cluster running?
  Hosting the HTTP server on port 35051 instead
    warnings.warn(

  /opt/conda/envs/test-env/lib/python3.9/site-packages/distributed/node.py:151: UserWarning: Port 8787 is already in use.
  Perhaps you already have a cluster running?
  Hosting the HTTP server on port 38031 instead

  /opt/conda/envs/test-env/lib/python3.9/site-packages/distributed/node.py:151: UserWarning: Port 8787 is already in use.
  Perhaps you already have a cluster running?
  Hosting the HTTP server on port 41941 instead

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@jmoralez I don't think you need to do any new work. Your review on the change I just pushed is welcome if you see anything wrong with it, but I'm fairly confident it will get the job done.

retries_left -= 1
_log_warning(
"Searching for random ports generated duplicates. Trying again (will try %i more times after this)." % retries_left
)
worker_address_to_port = client.run(
_find_random_open_port,
workers=list(worker_addresses)
)

machines = ','.join([
'%s:%d' % (urlparse(worker_address).hostname, port)
for worker_address, port
Expand Down
21 changes: 21 additions & 0 deletions tests/python_package_test/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -377,6 +377,27 @@ def test_find_random_open_port(client):
client.close(timeout=CLIENT_CLOSE_TIMEOUT)


def test_worker_map_has_duplicates():
map_with_duplicates = worker_map = {
'tcp://127.0.0.1:8786': 123,
'tcp://127.0.0.1:8788': 123,
'tcp://10.1.1.2:15001': 123
}
assert lgb.dask._worker_map_has_duplicates(map_with_duplicates)

map_without_duplicates = {
'tcp://127.0.0.1:8786': 12405,
'tcp://10.1.1.2:15001': 12405
}
assert lgb.dask._worker_map_has_duplicates(map_without_duplicates) is False

localcluster_map_without_duplicates = {
'tcp://127.0.0.1:708': 12405,
'tcp://127.0.0.1:312': 12405,
}
assert lgb.dask._worker_map_has_duplicates(map_without_duplicates) is False
StrikerRUS marked this conversation as resolved.
Show resolved Hide resolved

jameslamb marked this conversation as resolved.
Show resolved Hide resolved

def test_training_does_not_fail_on_port_conflicts(client):
_, _, _, _, dX, dy, dw, _ = _create_data('binary-classification', output='array')

Expand Down