Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adding in slug updates. #69

Merged
merged 4 commits into from
Nov 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions integration_tests/setup_execution_data_file.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,19 @@
from cloudpathlib import S3Path


def create_execution_data_file(bucket_name: str, execution_data_key: str, new_and_updated_documents_file_name: str) -> None:
def create_execution_data_file(
bucket_name: str, execution_data_key: str, new_and_updated_documents_file_name: str
) -> None:
"""Create the test execution data file dynamically from environment variables in the test."""
data = {
"input_dir_path": str(S3Path(f"s3://{bucket_name}/{new_and_updated_documents_file_name}").parent),
"input_dir_path": str(
S3Path(f"s3://{bucket_name}/{new_and_updated_documents_file_name}").parent
),
}

local_output_path = os.path.join(os.getcwd(), f'integration_tests/data/pipeline_in/{execution_data_key}')
local_output_path = os.path.join(
os.getcwd(), f"integration_tests/data/pipeline_in/{execution_data_key}"
)

if Path(local_output_path).exists():
os.remove(local_output_path)
Expand Down
6 changes: 3 additions & 3 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ json-logging = "^1.3.0"
pytest = "^7.2.1"
moto = {extras = ["all", "ec2", "s3"], version = "^4.1.6"}
pytest-asyncio = "^0.21.0"
cpr-data-access = {git = "https://github.com/climatepolicyradar/data-access.git", tag = "0.4.0"}
cpr-data-access = {git = "https://github.com/climatepolicyradar/data-access.git", tag = "0.4.1"}

[tool.poetry.dev-dependencies]
black = "^22.1.0"
Expand Down
1 change: 1 addition & 0 deletions src/navigator_data_ingest/base/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Event(BaseModel): # noqa: D101
UpdateTypes.DESCRIPTION: "document_description",
UpdateTypes.SOURCE_URL: "document_source_url",
UpdateTypes.METADATA: "document_metadata",
UpdateTypes.SLUG: "document_slug",
}


Expand Down
187 changes: 121 additions & 66 deletions src/navigator_data_ingest/base/updated_document_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,63 +161,66 @@ def update_dont_parse(
prefix_path, document_id, suffix_filter="json"
)
for document_file in document_files:
errors.append(
update_file_field(
document_path=document_file,
field=str(document_update.type.value),
new_value=document_update.db_value,
existing_value=document_update.s3_value,
)
error = update_file_field(
document_path=document_file,
field=str(document_update.type.value),
new_value=document_update.db_value,
existing_value=document_update.s3_value,
)
if error:
errors.append(error)

timestamp = datetime.now().strftime("%Y-%m-%d-%H-%M-%S")
errors.extend(
[
# Archive npy file
rename(
existing_path=S3Path(
os.path.join(
"s3://",
update_config.pipeline_bucket,
update_config.indexer_input,
f"{document_id}.npy",
)
),
rename_path=S3Path(
os.path.join(
"s3://",
update_config.pipeline_bucket,
update_config.archive_prefix,
update_config.indexer_input,
document_id,
f"{timestamp}.npy",
)
),
),
# Archive json file
rename(
existing_path=S3Path(
os.path.join(
"s3://",
update_config.pipeline_bucket,
update_config.indexer_input,
f"{document_id}.json",
)
),
rename_path=S3Path(
os.path.join(
"s3://",
update_config.pipeline_bucket,
update_config.archive_prefix,
update_config.indexer_input,
document_id,
f"{timestamp}.json",
)
),
),
]

# Archive npy file
archive_npy_error = rename(
existing_path=S3Path(
os.path.join(
"s3://",
update_config.pipeline_bucket,
update_config.indexer_input,
f"{document_id}.npy",
)
),
rename_path=S3Path(
os.path.join(
"s3://",
update_config.pipeline_bucket,
update_config.archive_prefix,
update_config.indexer_input,
document_id,
f"{timestamp}.npy",
)
),
)
return [error for error in errors if error is not None]
if archive_npy_error:
errors.append(archive_npy_error)

# Archive json file
archive_json_error = rename(
existing_path=S3Path(
os.path.join(
"s3://",
update_config.pipeline_bucket,
update_config.indexer_input,
f"{document_id}.json",
)
),
rename_path=S3Path(
os.path.join(
"s3://",
update_config.pipeline_bucket,
update_config.archive_prefix,
update_config.indexer_input,
document_id,
f"{timestamp}.json",
)
),
)
if archive_json_error:
errors.append(archive_json_error)

return errors


def parse(
Expand Down Expand Up @@ -256,26 +259,77 @@ def parse(
prefix_path, document_id, suffix_filter="json"
) + get_document_files(prefix_path, document_id, suffix_filter="npy")
for document_file in document_files:
errors.append(
rename(
existing_path=document_file,
rename_path=S3Path(
f"s3://{update_config.pipeline_bucket}"
f"/{update_config.archive_prefix}"
f"/{prefix}/{document_id}"
f"/{timestamp}{document_file.suffix} "
),
)
error = rename(
existing_path=document_file,
rename_path=S3Path(
f"s3://{update_config.pipeline_bucket}"
f"/{update_config.archive_prefix}"
f"/{prefix}/{document_id}"
f"/{timestamp}{document_file.suffix} "
),
)
if error:
errors.append(error)

return errors


def update_field_in_all_occurences(
update: Tuple[str, Update],
update_config: UpdateConfig,
) -> List[Union[str, None]]:
"""Update the document slug in all occurences of the document in s3."""
# TODO Do we need to archive on slug updates? The reason for this was expensive
# translation and text extraction costs not slug updates?
# TODO This can be made more generic.

return [error for error in errors if error is not None]
document_id, document_update = update
THOR300 marked this conversation as resolved.
Show resolved Hide resolved
_LOGGER.info(
"Updating document field in all document occurences in s3.",
extra={
"props": {
"document_id": document_id,
}
},
)
errors = []
for prefix_path in [
S3Path(
os.path.join(
"s3://", update_config.pipeline_bucket, update_config.parser_input
)
),
S3Path(
os.path.join(
"s3://", update_config.pipeline_bucket, update_config.embeddings_input
)
),
S3Path(
os.path.join(
"s3://", update_config.pipeline_bucket, update_config.indexer_input
)
),
]:
document_files = get_document_files(
prefix_path, document_id, suffix_filter="json"
)
for document_file in document_files:
error = update_file_field(
document_path=document_file,
field=str(document_update.type.value),
new_value=document_update.db_value,
existing_value=document_update.s3_value,
)
if error:
errors.append(error)
return errors


def update_file_field(
document_path: S3Path,
field: str,
new_value: Union[str, datetime, dict],
existing_value: Union[str, datetime, dict],
new_value: Union[str, datetime, dict, None],
existing_value: Union[str, datetime, dict, None],
) -> Union[str, None]:
"""Update the value of a field in a json object within s3 with the new value."""
if document_path.exists():
Expand Down Expand Up @@ -387,4 +441,5 @@ def rename(existing_path: S3Path, rename_path: S3Path) -> Union[str, None]:
UpdateTypes.NAME: update_dont_parse,
UpdateTypes.DESCRIPTION: update_dont_parse,
UpdateTypes.METADATA: update_dont_parse,
UpdateTypes.SLUG: update_field_in_all_occurences,
}
Loading