Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fleet] fix bind failed with Address already in use #38174

Merged

Conversation

wangxicoding
Copy link
Contributor

@wangxicoding wangxicoding commented Dec 15, 2021

PR types

Bug fixes

PR changes

Others

Describe

fix bind failed with Address already in use.
之前在#32892 中分析过产生Address already in use的原因,并尝试解决,但后续发现还存在该现象。

产生原因

除之前分析的原因,进一步分析,还有以下原因。
3. 在paddle中存在wait_server_ready函数,在0号卡使用,用以判断其它卡的服务是否启动。
这里存在一个问题,0号卡wait_server_ready可能先于其它卡的bindwait_server_ready中使用了connect,在发起连接时会占用端口,正好可能选中其它卡使用的端口,产生tcp自连接的现象,导致其它卡bind时失败。
可见https://my.oschina.net/u/2310891/blog/652323

解决方案

wait_server_ready函数中给socket加上reuse_port
当然对于 找到空闲端口到给C++使用存在一个时间差,可能被别的程序给占用。此问题暂无解 的问题,还是存在的。

复现测试代码

from contextlib import closing
import os
import socket
import struct
import sys
import time


wait_time = 2

def wait_server_ready(endpoints, reuse_port=True):
    now = time.time()
    flag = True
    while True:
        all_ok = True
        not_ready_endpoints = []
        for ep in endpoints:
            ip_port = ep.split(":")
            with closing(socket.socket(socket.AF_INET,
                                       socket.SOCK_STREAM)) as sock:
                sock.settimeout(2)
                if reuse_port:
                    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
                    if hasattr(socket, 'SO_REUSEPORT'):
                        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)

                if flag:
                    sock.bind((ip_port[0], int(ip_port[1])))
                    flag = False

                result = sock.connect_ex((ip_port[0], int(ip_port[1])))
                if result != 0:
                    all_ok = False
                    not_ready_endpoints.append(ep)
        if not all_ok:
            if time.time() - now > wait_time:
                sys.stderr.write("server not ready, wait 3 sec to retry...\n")
                sys.stderr.write("not ready endpoints:" + str(not_ready_endpoints) +
                                 "\n")
                sys.stderr.flush()
                time.sleep(1)
        else:
            break

def bind_endpoint(endpoint):
    ip_port = endpoint.split(':')
    while True:
        try:
            s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
            s.bind((ip_port[0], int(ip_port[1])))
            s.listen()
            print('bind success with endpoint= ' + endpoint)
            break
        except OSError as msg:
            print('Bind failed. Error msg: {}'.format(msg))
            s.close()
            time.sleep(2)

    while True:
        time.sleep(3)


def test():
    rank = int(os.environ['PADDLE_TRAINER_ID'])
    nranks = int(os.environ['PADDLE_TRAINERS_NUM'])
    endpoint = str(os.environ['PADDLE_CURRENT_ENDPOINT'])
    endpoints = str(os.environ['PADDLE_TRAINER_ENDPOINTS']).split(',')
    print('endpoint={}, endpoints={}'.format(endpoint, endpoints))

    reuse_port = True
    if len(sys.argv) == 2:
        print(sys.argv)
        reuse_port = bool(int(sys.argv[1]))
        print("reuse_port={}".format(reuse_port))

    if rank == 0:
        wait_server_ready(endpoints[1:], reuse_port)
        print("====ok, exit(1)====")
        sys.exit(1)

    time.sleep(wait_time)
    bind_endpoint(endpoint)


test()
  1. 这里面wait_server_ready在connect其它卡之前,会先模拟绑定别的卡的端口。下面测试0选项为关闭reuse_port,即develop中的代码。1选项为开启reuse_port,即PR的做法。
# 不reuse_port,基本每次都会出现address already in use的情况
python -m paddle.distributed.launch pd_bind_test.py 0
# reuse_port,基本不会出现address already in use的情况了
python -m paddle.distributed.launch pd_bind_test.py 1
  1. 或者完全模拟现在develop中的代码,不先绑定别的卡的端口,将flag设置为False。不过为增大connect使用别的卡端口的概率,将local_port端口范围缩小到100。
# 设置pd_bind_test.py中的flag=False
# 将端口范围大小设置为100
echo "61000    62000" > /proc/sys/net/ipv4/ip_local_port_range
# 不reuse_port,出现address already in use的情况很频繁
python -m paddle.distributed.launch pd_bind_test.py 0
# reuse_port,基本不会出现address already in use的情况了
python -m paddle.distributed.launch pd_bind_test.py 1

@paddle-bot-old
Copy link

Thanks for your contribution!
Please wait for the result of CI firstly. See Paddle CI Manual for details.

@wangxicoding wangxicoding changed the title fix bind failed with Address already in use [fleet] fix bind failed with Address already in use Dec 16, 2021
@wangxicoding wangxicoding merged commit 446a62e into PaddlePaddle:develop Dec 17, 2021
@wangxicoding wangxicoding deleted the fix_address_already_in_use branch December 17, 2021 01:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants