Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

AmpTask1786_Integrate_20240803 #1109

Merged
merged 1 commit into from
Aug 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 58 additions & 5 deletions helpers/haws.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
"""

import logging
from typing import Optional
from typing import Dict, List, Optional

import boto3
from boto3.resources.base import ServiceResource
Expand Down Expand Up @@ -51,7 +51,6 @@ def get_session(
# then passing everything.
if "aws_s3_bucket" in credentials:
del credentials["aws_s3_bucket"]
_LOG.debug(hprint.to_str("credentials"))
session = boto3.session.Session(**credentials)
return session

Expand Down Expand Up @@ -127,9 +126,8 @@ def update_task_definition(
"""
Create the new revision of specified ECS task definition.

If region is different then the default one, it is assumed
that ECR replication is enabled from the default region to the
target region.
If region is different then the default one, it is assumed that ECR
replication is enabled from the default region to the target region.

:param task_definition_name: the name of the ECS task definition for
which an update to container image URL is made, e.g., cmamp-test
Expand Down Expand Up @@ -176,3 +174,58 @@ def update_task_definition(
task_definition_name,
updated_image_url,
)


def list_all_objects(
s3_client: BaseClient, bucket_name: str, prefix: str
) -> List[Dict]:
"""
List all objects in the specified S3 bucket under the given prefix,
handling pagination.

:param s3_client: instance of boto3 S3 client
:param bucket_name: the name of the S3 bucket e.g., `cryptokaizen-data-test`
:param prefix: prefix to filter the S3 objects e.g., `binance/historical_bid_ask/`
:return: A list of dictionaries containing metadata about each object. E.g.,
```
[
{
'Key': 'binance/historical_bid_ask/S_DEPTH/1000BONK_USDT/2023-05-27/data.tar.gz',
'LastModified': datetime.datetime(2024, 5, 30, 17, 12, 12, tzinfo=tzlocal()),
'ETag': '"d41d8cd98f00b204e9800998ecf8427e"',
'Size': 0,
'StorageClass': 'STANDARD'
},
{
'Key': 'binance/historical_bid_ask/S_DEPTH/1000BONK_USDT/2023-05-28/data.tar.gz',
'LastModified': datetime.datetime(2024, 5, 30, 17, 12, 12, tzinfo=tzlocal()),
'ETag': '"d41d8cd98f00b204e9800998ecf8427e"',
'Size': 0,
'StorageClass': 'STANDARD'
}
]
```
"""
objects = []
continuation_token = None
while True:
# If there's a continuation token, include it in the request to fetch
# the next page of results.
if continuation_token:
response = s3_client.list_objects_v2(
Bucket=bucket_name,
Prefix=prefix,
ContinuationToken=continuation_token,
)
else:
response = s3_client.list_objects_v2(
Bucket=bucket_name, Prefix=prefix
)
# Extend the objects list with the contents of the current page.
objects.extend(response.get("Contents", []))
# Check if there are more pages.
if response.get("IsTruncated"):
continuation_token = response.get("NextContinuationToken")
else:
break
return objects
9 changes: 6 additions & 3 deletions helpers/hdatetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,13 +889,16 @@ def timestamp_to_str(

:param timestamp: timestamp to convert
:param include_msec: whether to include milliseconds e.g.
`20230727-11105712`
:return: timestamp in string format e.g. `20230727-111057`.
`20230727_111057_123`
:return: timestamp in string format e.g. `20230727_111057`.
"""
hdbg.dassert_isinstance(timestamp, pd.Timestamp)
# Convert timestamp to string.
if include_msec:
timestamp_str = timestamp.strftime("%Y%m%d_%H%M%S%f")[:-4]
# %f is the format code for microseconds. We truncate the last 3 digits
# to get milliseconds.
# This results in a string like "20230426_153042_123".
timestamp_str = timestamp.strftime("%Y%m%d_%H%M%S_%f")[:-3]
else:
timestamp_str = timestamp.strftime("%Y%m%d_%H%M%S")
return timestamp_str
21 changes: 18 additions & 3 deletions helpers/hdocker.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,9 @@
import helpers.hdocker as hdocker
"""

import copy
import logging
from typing import Optional

import helpers.hdbg as hdbg
import helpers.henv as henv
Expand Down Expand Up @@ -37,22 +39,35 @@ def volume_rm(volume_name: str) -> None:
_LOG.debug("docker volume '%s' deleted", volume_name)


def replace_shared_root_path(path: str) -> str:
def replace_shared_root_path(
path: str, *, replace_ecs_tokyo: Optional[bool] = False
) -> str:
"""
Replace root path of the shared directory based on the mapping.

:param path: path to replace, e.g., `/data/shared`
:return: replaced shared data dir root path, e.g., `/shared_data`
:param replace_ecs_tokyo: if True replace `ecs_tokyo` to `ecs` in the path
:return: replaced shared data dir root path, e.g.,
- `/data/shared/ecs_tokyo/test/system_reconciliation/C11a/prod/20240522_173000.20240522_182500/` ->
`/shared_data/ecs/test/system_reconciliation/C11a/prod/20240522_173000.20240522_182500/`
- `/data/shared/ecs/test/system_reconciliation/C11a/prod/20240522_173000.20240522_182500` ->
`/shared_data/ecs/test/system_reconciliation/C11a/prod/20240522_173000.20240522_182500`
"""
# Inside ECS we keep the original shared data path and replace it only when
# running inside Docker on the dev server.
if hserver.is_inside_docker() and not hserver.is_inside_ecs_container():
shared_data_dirs = henv.execute_repo_config_code("get_shared_data_dirs()")
if replace_ecs_tokyo:
# Make a copy to avoid modifying the original one.
shared_data_dirs = copy.deepcopy(shared_data_dirs)
shared_data_dirs["ecs_tokyo"] = "ecs"
for shared_dir, docker_shared_dir in shared_data_dirs.items():
path = path.replace(shared_dir, docker_shared_dir)
_LOG.debug(
"Running inside Docker on the dev server, thus replacing %s "
"with %s", shared_dir, docker_shared_dir,
"with %s",
shared_dir,
docker_shared_dir,
)
else:
_LOG.debug("No replacement found, returning path as-is: %s", path)
Expand Down
Loading
Loading