Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(run): Create a describe run endpoint for fetching aspects created by the ingestion run #4964

Merged
merged 6 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,20 @@ def parse_run_restli_response(response: requests.Response) -> dict:
return summary


def format_aspect_summaries(summaries: list) -> typing.List[dict]:
local_timezone = datetime.now().astimezone().tzinfo
return [
[
row.get("urn"),
row.get("aspectName"),
datetime.fromtimestamp(row.get("timestamp") / 1000).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ f" ({local_timezone})",
]
for row in summaries
]

def post_rollback_endpoint(
payload_obj: dict,
path: str,
Expand All @@ -266,19 +280,7 @@ def post_rollback_endpoint(
if len(rows) == 0:
click.secho(f"No entities found. Payload used: {payload}", fg="yellow")

local_timezone = datetime.now().astimezone().tzinfo
structured_rolled_back_results = [
[
row.get("urn"),
row.get("aspectName"),
datetime.fromtimestamp(row.get("timestamp") / 1000).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ f" ({local_timezone})",
]
for row in rolled_back_aspects
]

structured_rolled_back_results = format_aspect_summaries(rolled_back_aspects)
return (
structured_rolled_back_results,
entities_affected,
Expand Down
63 changes: 38 additions & 25 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
from datahub.cli.cli_utils import (
CONDENSED_DATAHUB_CONFIG_PATH,
get_session_and_host,
format_aspect_summaries,
post_rollback_endpoint,
)
from datahub.configuration import SensitiveError
Expand Down Expand Up @@ -203,36 +204,48 @@ def list_runs(page_offset: int, page_size: int, include_soft_deletes: bool) -> N

@ingest.command()
@click.option("--run-id", required=True, type=str)
@click.option(
"--start",
type=int,
default=0
)
@click.option(
"--count",
type=int,
default=100
)
@click.option(
"--include-soft-deletes",
is_flag=True,
default=False,
help="If enabled, will list ingestion runs which have been soft deleted",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't this say "if enabled, will show soft deletions that occurred in the ingestion run"?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed!

)
@click.option("-a", "--show-aspect", required=False, is_flag=True)
@telemetry.with_telemetry
def show(run_id: str) -> None:
def show(run_id: str, start: int, count: int, include_soft_deletes: bool, show_aspect: bool) -> None:
"""Describe a provided ingestion run to datahub"""
session, gms_host = get_session_and_host()

url = f"{gms_host}/runs?action=describe"

payload_obj = {"runId": run_id, "dryRun": True, "hardDelete": True}
(
structured_rows,
entities_affected,
aspects_modified,
aspects_affected,
unsafe_entity_count,
unsafe_entities,
) = post_rollback_endpoint(payload_obj, "/runs?action=rollback")
payload_obj = {
"runId": run_id,
"start": start,
"count": count,
"includeSoft": include_soft_deletes,
"includeAspect": show_aspect,
}

if aspects_modified >= ELASTIC_MAX_PAGE_SIZE:
click.echo(
f"this run created at least {entities_affected} new entities and updated at least {aspects_modified} aspects"
)
payload = json.dumps(payload_obj)

response = session.post(url, data=payload)

rows = parse_restli_response(response)
if not show_aspect:
click.echo(tabulate(format_aspect_summaries(rows), RUN_TABLE_COLUMNS, tablefmt="grid"))
else:
click.echo(
f"this run created {entities_affected} new entities and updated {aspects_modified} aspects"
)
click.echo(
"rolling back will delete the entities created and revert the updated aspects"
)
click.echo()
click.echo(
f"showing first {len(structured_rows)} of {aspects_modified} aspects touched by this run"
)
click.echo(tabulate(structured_rows, RUN_TABLE_COLUMNS, tablefmt="grid"))
for row in rows:
click.echo(json.dumps(row, indent=4))


@ingest.command()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,7 @@ public BulkByScrollResponse deleteByUrn(@Nonnull final String urn) {
return null;
}

public BulkByScrollResponse deleteByUrnAspect(
@Nonnull final String urn,
@Nonnull final String aspect
) {
public BulkByScrollResponse deleteByUrnAspect(@Nonnull final String urn, @Nonnull final String aspect) {
BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();
finalQuery.must(QueryBuilders.termQuery("urn", urn));
finalQuery.must(QueryBuilders.termQuery("aspect", aspect));
Expand All @@ -114,7 +111,7 @@ public BulkByScrollResponse deleteByUrnAspect(
return null;
}

public SearchResponse findByParams(Map<String, String> searchParams, boolean includeSoftDeleted) {
public SearchResponse findByParams(Map<String, String> searchParams, boolean includeSoftDeleted, int from, int size) {
SearchRequest searchRequest = new SearchRequest();

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
Expand All @@ -131,8 +128,8 @@ public SearchResponse findByParams(Map<String, String> searchParams, boolean inc

searchSourceBuilder.query(finalQuery);

// this is the max page size elastic will return
searchSourceBuilder.size(10000);
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);

searchRequest.source(searchSourceBuilder);

Expand All @@ -147,15 +144,16 @@ public SearchResponse findByParams(Map<String, String> searchParams, boolean inc
return null;
}

public SearchResponse findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted) {
public SearchResponse findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted,
int from, int size) {
Map<String, String> params = new HashMap<>();
params.put("registryName", registryName);
params.put("registryVersion", registryVersion);
return findByParams(params, includeSoftDeleted);
return findByParams(params, includeSoftDeleted, from, size);
}

public SearchResponse findByRunId(String runId, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap("runId", runId), includeSoftDeleted);
public SearchResponse findByRunId(String runId, boolean includeSoftDeleted, int from, int size) {
return findByParams(Collections.singletonMap("runId", runId), includeSoftDeleted, from, size);
}

public SearchResponse findRuns(Integer pageOffset, Integer pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.run.IngestionRunSummary;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.utils.ESUtils;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.mxe.SystemMetadata;
import java.io.IOException;
Expand Down Expand Up @@ -100,7 +101,8 @@ public void setDocStatus(String urn, boolean removed) {
// searchBy findByParams
// If status.removed -> false (from removed to not removed) --> get soft deleted entities.
// If status.removed -> true (from not removed to removed) --> do not get soft deleted entities.
final List<AspectRowSummary> aspectList = findByParams(ImmutableMap.of("urn", urn), !removed);
final List<AspectRowSummary> aspectList =
findByParams(ImmutableMap.of("urn", urn), !removed, 0, ESUtils.MAX_RESULT_SIZE);
// for each -> toDocId and set removed to true for all
aspectList.forEach(aspect -> {
final String docId = toDocId(aspect.getUrn(), aspect.getAspectName());
Expand All @@ -123,18 +125,19 @@ public void insert(@Nullable SystemMetadata systemMetadata, String urn, String a
}

@Override
public List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap(FIELD_RUNID, runId), includeSoftDeleted);
public List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted, int from, int size) {
return findByParams(Collections.singletonMap(FIELD_RUNID, runId), includeSoftDeleted, from, size);
}

@Override
public List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap(FIELD_URN, urn), includeSoftDeleted);
public List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted, int from, int size) {
return findByParams(Collections.singletonMap(FIELD_URN, urn), includeSoftDeleted, from, size);
}

@Override
public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted) {
SearchResponse searchResponse = _esDAO.findByParams(systemMetaParams, includeSoftDeleted);
public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted, int from,
int size) {
SearchResponse searchResponse = _esDAO.findByParams(systemMetaParams, includeSoftDeleted, from, size);
if (searchResponse != null) {
SearchHits hits = searchResponse.getHits();
List<AspectRowSummary> summaries = Arrays.stream(hits.getHits()).map(hit -> {
Expand All @@ -159,11 +162,12 @@ public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams,
}

@Override
public List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted) {
public List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted,
int from, int size) {
Map<String, String> registryParams = new HashMap<>();
registryParams.put(FIELD_REGISTRY_NAME, registryName);
registryParams.put(FIELD_REGISTRY_VERSION, registryVersion);
return findByParams(registryParams, includeSoftDeleted);
return findByParams(registryParams, includeSoftDeleted, from, size);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public interface SystemMetadataService {

void insert(@Nullable SystemMetadata systemMetadata, String urn, String aspect);

List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted);
List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted, int from, int size);

List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted);
List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted, int from, int size);

List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted);
List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted, int from, int size);

List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted);
List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted, int from, int size);

List<IngestionRunSummary> listRuns(Integer pageOffset, Integer pageSize, boolean includeSoftDeleted);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.run.IngestionRunSummary;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchServiceTest;
import com.linkedin.metadata.search.utils.ESUtils;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
import com.linkedin.mxe.SystemMetadata;
Expand Down Expand Up @@ -137,7 +138,7 @@ public void testFindByRunId() throws Exception {

syncAfterWrite(_searchClient, _indexName);

List<AspectRowSummary> rows = _client.findByRunId("abc-456", false);
List<AspectRowSummary> rows = _client.findByRunId("abc-456", false, 0, ESUtils.MAX_RESULT_SIZE);

assertEquals(rows.size(), 4);
rows.forEach(row -> assertEquals(row.getRunId(), "abc-456"));
Expand Down Expand Up @@ -169,7 +170,7 @@ public void testDelete() throws Exception {

syncAfterWrite(_searchClient, _indexName);

List<AspectRowSummary> rows = _client.findByRunId("abc-456", false);
List<AspectRowSummary> rows = _client.findByRunId("abc-456", false, 0, ESUtils.MAX_RESULT_SIZE);

assertEquals(rows.size(), 2);
rows.forEach(row -> assertEquals(row.getRunId(), "abc-456"));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace com.linkedin.metadata.run

import com.linkedin.entity.Aspect

record AspectRowSummary {
runId: string
aspectName: string
Expand All @@ -8,4 +10,5 @@ record AspectRowSummary {
metadata: string
version: long
keyAspect: boolean
aspect: optional Aspect
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,27 @@
},
"supports" : [ ],
"actions" : [ {
"name" : "describe",
"parameters" : [ {
"name" : "runId",
"type" : "string"
}, {
"name" : "start",
"type" : "int"
}, {
"name" : "count",
"type" : "int"
}, {
"name" : "includeSoft",
"type" : "boolean",
"optional" : true
}, {
"name" : "includeAspect",
"type" : "boolean",
"optional" : true
} ],
"returns" : "{ \"type\" : \"array\", \"items\" : \"com.linkedin.metadata.run.AspectRowSummary\" }"
}, {
"name" : "list",
"doc" : "Retrieves the value for an entity that is made up of latest versions of specified aspects.",
"parameters" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,10 @@
"doc" : "Instance of the data platform (e.g. db instance)",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldName" : "platformInstance",
"fieldType" : "URN"
"fieldType" : "URN",
"filterNameOverride" : "Platform Instance"
}
} ],
"Aspect" : {
Expand Down Expand Up @@ -804,6 +806,10 @@
}
},
"doc" : "Urn of the applied tag",
"Relationship" : {
"entityTypes" : [ "tag" ],
"name" : "TaggedWith"
},
"Searchable" : {
"addToFilters" : true,
"fieldName" : "tags",
Expand Down Expand Up @@ -881,6 +887,10 @@
}
},
"doc" : "Urn of the applied glossary term",
"Relationship" : {
"entityTypes" : [ "glossaryTerm" ],
"name" : "TermedWith"
},
"Searchable" : {
"addToFilters" : true,
"fieldName" : "glossaryTerms",
Expand Down Expand Up @@ -2483,7 +2493,7 @@
"Relationship" : {
"/tags/*/tag" : {
"entityTypes" : [ "tag" ],
"name" : "FieldTaggedWith"
"name" : "SchemaFieldTaggedWith"
}
},
"Searchable" : {
Expand All @@ -2501,7 +2511,7 @@
"Relationship" : {
"/terms/*/urn" : {
"entityTypes" : [ "glossaryTerm" ],
"name" : "FieldWithGlossaryTerm"
"name" : "SchemaFieldWithGlossaryTerm"
}
},
"Searchable" : {
Expand Down Expand Up @@ -2670,7 +2680,7 @@
"Relationship" : {
"/tags/*/tag" : {
"entityTypes" : [ "tag" ],
"name" : "EditableFieldTaggedWith"
"name" : "EditableSchemaFieldTaggedWith"
}
},
"Searchable" : {
Expand All @@ -2688,7 +2698,7 @@
"Relationship" : {
"/terms/*/urn" : {
"entityTypes" : [ "glossaryTerm" ],
"name" : "EditableFieldWithGlossaryTerm"
"name" : "EditableSchemaFieldWithGlossaryTerm"
}
},
"Searchable" : {
Expand Down
Loading