Skip to content

Commit

Permalink
Reduce Authentication Requests to Keycloak (#281)
Browse files Browse the repository at this point in the history
* Create RestClient early to facilitate reuse
  • Loading branch information
blinkdog authored Jul 10, 2024
1 parent fd55f11 commit 8ac153c
Show file tree
Hide file tree
Showing 34 changed files with 459 additions and 481 deletions.
2 changes: 1 addition & 1 deletion lta/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
# is zero for an official release, positive for a development branch,
# or negative for a release candidate or beta (after the base version
# number has been incremented)
__version__ = "0.41.34"
__version__ = "0.42.0"
version_info = (
int(__version__.split(".")[0]),
int(__version__.split(".")[1]),
Expand Down
23 changes: 8 additions & 15 deletions lta/bundler.py
Original file line number Diff line number Diff line change
Expand Up @@ -80,36 +80,24 @@ def _expected_config(self) -> Dict[str, Optional[str]]:
return EXPECTED_CONFIG

@wtt.spanned()
async def _do_work(self) -> None:
async def _do_work(self, lta_rc: RestClient) -> None:
"""Perform a work cycle for this component."""
self.logger.info("Starting work on Bundles.")
load_level = -1
work_claimed = True
while work_claimed:
load_level += 1
work_claimed = await self._do_work_claim()
work_claimed = await self._do_work_claim(lta_rc)
# if we are configured to run once and die, then die
if self.run_once_and_die:
sys.exit()
load_gauge.labels(component='bundler', level='bundle', type='work').set(load_level)
self.logger.info("Ending work on Bundles.")

@wtt.spanned()
async def _do_work_claim(self) -> bool:
async def _do_work_claim(self, lta_rc: RestClient) -> bool:
"""Claim a bundle and perform work on it."""
# 1. Ask the LTA DB for the next Bundle to be built
# configure a RestClient to talk to the File Catalog
fc_rc = ClientCredentialsAuth(address=self.file_catalog_rest_url,
token_url=self.lta_auth_openid_url,
client_id=self.file_catalog_client_id,
client_secret=self.file_catalog_client_secret)
# configure a RestClient to talk to the LTA DB
lta_rc = ClientCredentialsAuth(address=self.lta_rest_url,
token_url=self.lta_auth_openid_url,
client_id=self.client_id,
client_secret=self.client_secret,
timeout=self.work_timeout_seconds,
retries=self.work_retries)
self.logger.info("Asking the LTA DB for a Bundle to build.")
pop_body = {
"claimant": f"{self.name}-{self.instance_uuid}"
Expand All @@ -120,6 +108,11 @@ async def _do_work_claim(self) -> bool:
if not bundle:
self.logger.info("LTA DB did not provide a Bundle to build. Going on vacation.")
return False
# configure a RestClient to talk to the File Catalog
fc_rc = ClientCredentialsAuth(address=self.file_catalog_rest_url,
token_url=self.lta_auth_openid_url,
client_id=self.file_catalog_client_id,
client_secret=self.file_catalog_client_secret)
# process the Bundle that we were given
try:
await self._do_work_bundle(fc_rc, lta_rc, bundle)
Expand Down
16 changes: 14 additions & 2 deletions lta/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from typing import Any, Dict, Optional
from uuid import uuid4

from rest_tools.client import ClientCredentialsAuth, RestClient
import wipac_telemetry.tracing_tools as wtt

from .lta_const import drain_semaphore_filename
Expand All @@ -29,7 +30,9 @@
"RUN_ONCE_AND_DIE": "False",
"RUN_UNTIL_NO_WORK": "False",
"SOURCE_SITE": None,
"WORK_RETRIES": "3",
"WORK_SLEEP_DURATION_SECONDS": "60",
"WORK_TIMEOUT_SECONDS": "30",
}

LOGGING_DENY_LIST = ["CLIENT_SECRET", "FILE_CATALOG_CLIENT_SECRET"]
Expand Down Expand Up @@ -85,7 +88,9 @@ def __init__(self,
self.run_once_and_die = boolify(config["RUN_ONCE_AND_DIE"])
self.run_until_no_work = boolify(config["RUN_UNTIL_NO_WORK"])
self.source_site = config["SOURCE_SITE"]
self.work_retries = int(config["WORK_RETRIES"])
self.work_sleep_duration_seconds = float(config["WORK_SLEEP_DURATION_SECONDS"])
self.work_timeout_seconds = float(config["WORK_TIMEOUT_SECONDS"])
# record some default state
timestamp = datetime.utcnow().isoformat()
self.last_work_begin_timestamp = timestamp
Expand All @@ -102,11 +107,18 @@ def __init__(self,
async def run(self) -> None:
"""Perform the Component's work cycle."""
self.logger.info(f"Starting {self.type} work cycle")
# obtain a RestClient to talk to the LTA REST service (LTA DB)
lta_rc = ClientCredentialsAuth(address=self.lta_rest_url,
token_url=self.lta_auth_openid_url,
client_id=self.client_id,
client_secret=self.client_secret,
timeout=self.work_timeout_seconds,
retries=self.work_retries)
# start the work cycle stopwatch
self.last_work_begin_timestamp = datetime.utcnow().isoformat()
# perform the work
try:
await self._do_work()
await self._do_work(lta_rc)
except Exception as e:
# ut oh, something went wrong; log about it
self.logger.error(f"Error occurred during the {self.type} work cycle")
Expand Down Expand Up @@ -142,7 +154,7 @@ def _expected_config(self) -> Dict[str, Optional[str]]:
"""Override this to return expected configuration."""
raise NotImplementedError()

async def _do_work(self) -> None:
async def _do_work(self, lta_rc: RestClient) -> None:
"""Override this to provide work cycle behavior."""
raise NotImplementedError()

Expand Down
15 changes: 4 additions & 11 deletions lta/deleter.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Any, Dict, Optional

from prometheus_client import Counter, Gauge, start_http_server
from rest_tools.client import ClientCredentialsAuth, RestClient
from rest_tools.client import RestClient
import wipac_telemetry.tracing_tools as wtt

from .component import COMMON_CONFIG, Component, now, work_loop
Expand Down Expand Up @@ -63,31 +63,24 @@ def _expected_config(self) -> Dict[str, Optional[str]]:
return EXPECTED_CONFIG

@wtt.spanned()
async def _do_work(self) -> None:
async def _do_work(self, lta_rc: RestClient) -> None:
"""Perform a work cycle for this component."""
self.logger.info("Starting work on Bundles.")
load_level = -1
work_claimed = True
while work_claimed:
load_level += 1
work_claimed = await self._do_work_claim()
work_claimed = await self._do_work_claim(lta_rc)
# if we are configured to run once and die, then die
if self.run_once_and_die:
sys.exit()
load_gauge.labels(component='deleter', level='bundle', type='work').set(load_level)
self.logger.info("Ending work on Bundles.")

@wtt.spanned()
async def _do_work_claim(self) -> bool:
async def _do_work_claim(self, lta_rc: RestClient) -> bool:
"""Claim a bundle and perform work on it."""
# 1. Ask the LTA DB for the next Bundle to be deleted
# configure a RestClient to talk to the LTA DB
lta_rc = ClientCredentialsAuth(address=self.lta_rest_url,
token_url=self.lta_auth_openid_url,
client_id=self.client_id,
client_secret=self.client_secret,
timeout=self.work_timeout_seconds,
retries=self.work_retries)
self.logger.info("Asking the LTA DB for a Bundle to delete.")
pop_body = {
"claimant": f"{self.name}-{self.instance_uuid}"
Expand Down
15 changes: 4 additions & 11 deletions lta/desy_move_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
from typing import Any, Dict, Optional

from prometheus_client import Counter, Gauge, start_http_server
from rest_tools.client import ClientCredentialsAuth, RestClient
from rest_tools.client import RestClient
import wipac_telemetry.tracing_tools as wtt

from .component import COMMON_CONFIG, Component, now, work_loop
Expand Down Expand Up @@ -70,31 +70,24 @@ def _expected_config(self) -> Dict[str, Optional[str]]:
return EXPECTED_CONFIG

@wtt.spanned()
async def _do_work(self) -> None:
async def _do_work(self, lta_rc: RestClient) -> None:
"""Perform a work cycle for this component."""
self.logger.info("Starting work on Bundles.")
load_level = -1
work_claimed = True
while work_claimed:
load_level += 1
work_claimed = await self._do_work_claim()
work_claimed = await self._do_work_claim(lta_rc)
# if we are configured to run once and die, then die
if self.run_once_and_die:
sys.exit()
load_gauge.labels(component='desy_move_verifier', level='bundle', type='work').set(load_level)
self.logger.info("Ending work on Bundles.")

@wtt.spanned()
async def _do_work_claim(self) -> bool:
async def _do_work_claim(self, lta_rc: RestClient) -> bool:
"""Claim a bundle and perform work on it."""
# 1. Ask the LTA DB for the next Bundle to be verified
# configure a RestClient to talk to the LTA DB
lta_rc = ClientCredentialsAuth(address=self.lta_rest_url,
token_url=self.lta_auth_openid_url,
client_id=self.client_id,
client_secret=self.client_secret,
timeout=self.work_timeout_seconds,
retries=self.work_retries)
self.logger.info("Asking the LTA DB for a Bundle to verify.")
pop_body = {
"claimant": f"{self.name}-{self.instance_uuid}"
Expand Down
13 changes: 3 additions & 10 deletions lta/desy_verifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,32 +73,25 @@ def _expected_config(self) -> Dict[str, Optional[str]]:
return EXPECTED_CONFIG

@wtt.spanned()
async def _do_work(self) -> None:
async def _do_work(self, lta_rc: RestClient) -> None:
"""Perform a work cycle for this component."""
self.logger.info("Starting work on Bundles.")
load_level = -1
work_claimed = True
while work_claimed:
load_level += 1
work_claimed = await self._do_work_claim()
work_claimed = await self._do_work_claim(lta_rc)
# if we are configured to run once and die, then die
if self.run_once_and_die:
sys.exit()
load_gauge.labels(component='desy_verifier', level='bundle', type='work').set(load_level)
self.logger.info("Ending work on Bundles.")

@wtt.spanned()
async def _do_work_claim(self) -> bool:
async def _do_work_claim(self, lta_rc: RestClient) -> bool:
"""Claim a bundle and perform work on it."""
# 1. Ask the LTA DB for the next Bundle to be verified
self.logger.info("Asking the LTA DB for a Bundle to record as verified at DESY.")
# configure a RestClient to talk to the LTA DB
lta_rc = ClientCredentialsAuth(address=self.lta_rest_url,
token_url=self.lta_auth_openid_url,
client_id=self.client_id,
client_secret=self.client_secret,
timeout=self.work_timeout_seconds,
retries=self.work_retries)
pop_body = {
"claimant": f"{self.name}-{self.instance_uuid}"
}
Expand Down
15 changes: 4 additions & 11 deletions lta/gridftp_replicator.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import Any, Dict, Optional

from prometheus_client import Counter, Gauge, start_http_server
from rest_tools.client import ClientCredentialsAuth, RestClient
from rest_tools.client import RestClient
import wipac_telemetry.tracing_tools as wtt

from .component import COMMON_CONFIG, Component, now, work_loop
Expand Down Expand Up @@ -82,31 +82,24 @@ def _expected_config(self) -> Dict[str, Optional[str]]:
return EXPECTED_CONFIG

@wtt.spanned()
async def _do_work(self) -> None:
async def _do_work(self, lta_rc: RestClient) -> None:
"""Perform a work cycle for this component."""
self.logger.info("Starting work on Bundles.")
load_level = -1
work_claimed = True
while work_claimed:
load_level += 1
work_claimed = await self._do_work_claim()
work_claimed = await self._do_work_claim(lta_rc)
# if we are configured to run once and die, then die
if self.run_once_and_die:
sys.exit()
load_gauge.labels(component='gridftp_replicator', level='bundle', type='work').set(load_level)
self.logger.info("Ending work on Bundles.")

@wtt.spanned()
async def _do_work_claim(self) -> bool:
async def _do_work_claim(self, lta_rc: RestClient) -> bool:
"""Claim a bundle and perform work on it."""
# 1. Ask the LTA DB for the next Bundle to be transferred
# configure a RestClient to talk to the LTA DB
lta_rc = ClientCredentialsAuth(address=self.lta_rest_url,
token_url=self.lta_auth_openid_url,
client_id=self.client_id,
client_secret=self.client_secret,
timeout=self.work_timeout_seconds,
retries=self.work_retries)
self.logger.info("Asking the LTA DB for a Bundle to transfer.")
pop_body = {
"claimant": f"{self.name}-{self.instance_uuid}"
Expand Down
13 changes: 3 additions & 10 deletions lta/locator.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,31 +93,24 @@ def _expected_config(self) -> Dict[str, Optional[str]]:
return EXPECTED_CONFIG

@wtt.spanned()
async def _do_work(self) -> None:
async def _do_work(self, lta_rc: RestClient) -> None:
"""Perform a work cycle for this component."""
self.logger.info("Starting work on TransferRequests.")
load_level = -1
work_claimed = True
while work_claimed:
load_level += 1
work_claimed = await self._do_work_claim()
work_claimed = await self._do_work_claim(lta_rc)
# if we are configured to run once and die, then die
if self.run_once_and_die:
sys.exit()
load_gauge.labels(component='locator', level='transfer_request', type='work').set(load_level)
self.logger.info("Ending work on TransferRequests.")

@wtt.spanned()
async def _do_work_claim(self) -> bool:
async def _do_work_claim(self, lta_rc: RestClient) -> bool:
"""Claim a transfer request and perform work on it."""
# 1. Ask the LTA DB for the next TransferRequest to be picked
# configure a RestClient to talk to the LTA DB
lta_rc = ClientCredentialsAuth(address=self.lta_rest_url,
token_url=self.lta_auth_openid_url,
client_id=self.client_id,
client_secret=self.client_secret,
timeout=self.work_timeout_seconds,
retries=self.work_retries)
self.logger.info("Asking the LTA DB for a TransferRequest to work on.")
pop_body = {
"claimant": f"{self.name}-{self.instance_uuid}"
Expand Down
2 changes: 1 addition & 1 deletion lta/lta_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -784,7 +784,7 @@ async def request_new(args: Namespace) -> ExitCode:
dest = args.dest
path = normalize_path(args.path)
# if the request contains nothing at all, don't try to archive it
if (size == 0) or (len(disk_files) == 0):
if ((size == 0) or (len(disk_files) == 0)) and (args.force is False):
raise Exception(f"TransferRequest for {path}\n{size:,} bytes ({hurry.filesize.size(size)}) in {len(disk_files):,} files.\nWill NOT attempt to archive 0 bytes.")
# if it doesn't meet our minimize size requirement
if size < MINIMUM_REQUEST_SIZE:
Expand Down
15 changes: 4 additions & 11 deletions lta/nersc_mover.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from typing import Any, Dict, List, Optional

from prometheus_client import Counter, Gauge, start_http_server
from rest_tools.client import ClientCredentialsAuth, RestClient
from rest_tools.client import RestClient
import wipac_telemetry.tracing_tools as wtt

from .component import COMMON_CONFIG, Component, now, work_loop
Expand Down Expand Up @@ -80,22 +80,22 @@ def _expected_config(self) -> Dict[str, Optional[str]]:
return EXPECTED_CONFIG

@wtt.spanned()
async def _do_work(self) -> None:
async def _do_work(self, lta_rc: RestClient) -> None:
"""Perform a work cycle for this component."""
self.logger.info("Starting work on Bundles.")
load_level = -1
work_claimed = True
while work_claimed:
load_level += 1
work_claimed = await self._do_work_claim()
work_claimed = await self._do_work_claim(lta_rc)
# if we are configured to run once and die, then die
if self.run_once_and_die:
sys.exit()
load_gauge.labels(component='nersc_mover', level='bundle', type='work').set(load_level)
self.logger.info("Ending work on Bundles.")

@wtt.spanned()
async def _do_work_claim(self) -> bool:
async def _do_work_claim(self, lta_rc: RestClient) -> bool:
"""Claim a bundle and perform work on it."""
# 0. Do some pre-flight checks to ensure that we can do work
# if the HPSS system is not available
Expand All @@ -107,13 +107,6 @@ async def _do_work_claim(self) -> bool:
return False
# 1. Ask the LTA DB for the next Bundle to be taped
self.logger.info("Asking the LTA DB for a Bundle to tape at NERSC with HPSS.")
# configure a RestClient to talk to the LTA DB
lta_rc = ClientCredentialsAuth(address=self.lta_rest_url,
token_url=self.lta_auth_openid_url,
client_id=self.client_id,
client_secret=self.client_secret,
timeout=self.work_timeout_seconds,
retries=self.work_retries)
pop_body = {
"claimant": f"{self.name}-{self.instance_uuid}"
}
Expand Down
Loading

0 comments on commit 8ac153c

Please sign in to comment.