Skip to content

Commit

Permalink
Sync changes to nginx coding to PR
Browse files Browse the repository at this point in the history
  • Loading branch information
realtyem committed Feb 5, 2023
1 parent e1251c2 commit 09d98ca
Showing 1 changed file with 27 additions and 18 deletions.
45 changes: 27 additions & 18 deletions docker/configure_workers_and_start.py
Original file line number Diff line number Diff line change
Expand Up @@ -293,20 +293,20 @@
"""

NGINX_UPSTREAM_CONFIG_BLOCK = """
upstream {upstream_worker_type} {{
upstream {upstream_worker_base_name} {{
{body}
}}
"""

NGINX_UPSTREAM_HASH_BY_CLIENT_IP_CONFIG_BLOCK = """
upstream {upstream_worker_type} {{
upstream {upstream_worker_base_name} {{
hash $proxy_add_x_forwarded_for;
{body}
}}
"""

NGINX_UPSTREAM_HASH_BY_AUTH_HEADER_BLOCK = """
upstream {upstream_worker_type} {{
upstream {upstream_worker_base_name} {{
hash $http_authorization consistent;
{body}
}}
Expand Down Expand Up @@ -706,10 +706,15 @@ def generate_worker_files(
# more than a single worker for cases where multiples would be bad(e.g. presence).
worker_type_shard_counter: Dict[str, int] = {}

# Similar to above, but for worker name's. This is used to check that worker names
# for different worker types or combinations of types is not used, as it will error
# with 'Address in use'(e.g. "to_device, to_device=typing" would error.
worker_name_type_counter: Dict[str, str] = {}
# Similar to above, but for worker name's. This has two uses:
# 1. Can be used to check that worker names for different worker types or
# combinations of types is not used, as it will error with 'Address in
# use'(e.g. "to_device, to_device=typing" would not work).
# 2. Convenient way to get the combination of worker types from worker_name after
# processing and merging.
# Follows the pattern:
# ["worker_name": "worker_type(s)"]
worker_name_type_list: Dict[str, str] = {}

# A list of internal endpoints to healthcheck, starting with the main process
# which exists even if no workers do.
Expand Down Expand Up @@ -866,11 +871,11 @@ def generate_worker_files(
# different worker_type. If it's not allowed, will error and stop. If no issues,
# it will be added to the counter.
if is_name_allowed_for_worker(
worker_name_type_counter,
worker_name_type_list,
worker_base_name,
worker_type,
):
worker_name_type_counter.setdefault(worker_base_name, worker_type)
worker_name_type_list.setdefault(worker_base_name, worker_type)

else:
error(
Expand All @@ -880,7 +885,7 @@ def generate_worker_files(
+ worker_type
+ ". It is already in use by "
# This is cast as a str because mypy thinks it could be None
+ str(worker_name_type_counter.get(worker_base_name))
+ str(worker_name_type_list.get(worker_base_name))
)

# Replace placeholder names in the config template with the actual worker name.
Expand Down Expand Up @@ -925,7 +930,7 @@ def generate_worker_files(
# Create or add to a load-balanced upstream for this worker
nginx_upstreams.setdefault(worker_base_name, set()).add(worker_port)

# Upstreams are named after the worker_type
# Upstreams are named after the worker_base_name
upstream = "http://" + worker_base_name
else:
upstream = "http://localhost:%d" % (worker_port,)
Expand Down Expand Up @@ -962,11 +967,11 @@ def generate_worker_files(
worker_type_load_balance_header_list = ["synchrotron"]
worker_type_load_balance_ip_list = ["federation_inbound"]

for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
for upstream_worker_base_name, upstream_worker_ports in nginx_upstreams.items():
body = ""
for port in upstream_worker_ports:
body += " server localhost:%d;\n" % (port,)
log("upstream_worker_type: " + upstream_worker_type)
log("upstream_worker_base_name: " + upstream_worker_base_name)

# This presents a dilemma. Some endpoints are better load-balanced by
# Authorization header, and some by remote IP. What do you do if a combo
Expand Down Expand Up @@ -995,11 +1000,13 @@ def generate_worker_files(
# cache data. This works well for federation.
if any(
x in worker_type_load_balance_ip_list
for x in upstream_worker_type.split("+")
for x in str(worker_name_type_list.get(upstream_worker_base_name)).split(
"+"
)
):
nginx_upstream_config += (
NGINX_UPSTREAM_HASH_BY_CLIENT_IP_CONFIG_BLOCK.format(
upstream_worker_type=upstream_worker_type,
upstream_worker_base_name=upstream_worker_base_name,
body=body,
)
)
Expand All @@ -1008,16 +1015,18 @@ def generate_worker_files(
# upstream source, like a synchrotron worker, with smarter caching of data.
elif any(
x in worker_type_load_balance_header_list
for x in upstream_worker_type.split("+")
for x in str(worker_name_type_list.get(upstream_worker_base_name)).split(
"+"
)
):
nginx_upstream_config += NGINX_UPSTREAM_HASH_BY_AUTH_HEADER_BLOCK.format(
upstream_worker_type=upstream_worker_type,
upstream_worker_base_name=upstream_worker_base_name,
body=body,
)
# Everything else, just use the default basic round-robin scheme.
else:
nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
upstream_worker_type=upstream_worker_type,
upstream_worker_base_name=upstream_worker_base_name,
body=body,
)

Expand Down

0 comments on commit 09d98ca

Please sign in to comment.