Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(run): Create a describe run endpoint for fetching aspects created by the ingestion run #4964

Merged
merged 6 commits into from
May 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 16 additions & 13 deletions metadata-ingestion/src/datahub/cli/cli_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,21 @@ def parse_run_restli_response(response: requests.Response) -> dict:
return summary


def format_aspect_summaries(summaries: list) -> typing.List[typing.List[str]]:
local_timezone = datetime.now().astimezone().tzinfo
return [
[
row.get("urn"),
row.get("aspectName"),
datetime.fromtimestamp(row.get("timestamp") / 1000).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ f" ({local_timezone})",
]
for row in summaries
]


def post_rollback_endpoint(
payload_obj: dict,
path: str,
Expand All @@ -266,19 +281,7 @@ def post_rollback_endpoint(
if len(rows) == 0:
click.secho(f"No entities found. Payload used: {payload}", fg="yellow")

local_timezone = datetime.now().astimezone().tzinfo
structured_rolled_back_results = [
[
row.get("urn"),
row.get("aspectName"),
datetime.fromtimestamp(row.get("timestamp") / 1000).strftime(
"%Y-%m-%d %H:%M:%S"
)
+ f" ({local_timezone})",
]
for row in rolled_back_aspects
]

structured_rolled_back_results = format_aspect_summaries(rolled_back_aspects)
return (
structured_rolled_back_results,
entities_affected,
Expand Down
55 changes: 32 additions & 23 deletions metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
from datahub.cli import cli_utils
from datahub.cli.cli_utils import (
CONDENSED_DATAHUB_CONFIG_PATH,
format_aspect_summaries,
get_session_and_host,
post_rollback_endpoint,
)
Expand Down Expand Up @@ -203,36 +204,44 @@ def list_runs(page_offset: int, page_size: int, include_soft_deletes: bool) -> N

@ingest.command()
@click.option("--run-id", required=True, type=str)
@click.option("--start", type=int, default=0)
@click.option("--count", type=int, default=100)
@click.option(
"--include-soft-deletes",
is_flag=True,
default=False,
help="If enabled, will include aspects that have been soft deleted",
)
@click.option("-a", "--show-aspect", required=False, is_flag=True)
@telemetry.with_telemetry
def show(run_id: str) -> None:
def show(
run_id: str, start: int, count: int, include_soft_deletes: bool, show_aspect: bool
) -> None:
"""Describe a provided ingestion run to datahub"""
session, gms_host = get_session_and_host()

payload_obj = {"runId": run_id, "dryRun": True, "hardDelete": True}
(
structured_rows,
entities_affected,
aspects_modified,
aspects_affected,
unsafe_entity_count,
unsafe_entities,
) = post_rollback_endpoint(payload_obj, "/runs?action=rollback")
url = f"{gms_host}/runs?action=describe"

payload_obj = {
"runId": run_id,
"start": start,
"count": count,
"includeSoft": include_soft_deletes,
"includeAspect": show_aspect,
}

payload = json.dumps(payload_obj)

response = session.post(url, data=payload)

if aspects_modified >= ELASTIC_MAX_PAGE_SIZE:
rows = parse_restli_response(response)
if not show_aspect:
click.echo(
f"this run created at least {entities_affected} new entities and updated at least {aspects_modified} aspects"
tabulate(format_aspect_summaries(rows), RUN_TABLE_COLUMNS, tablefmt="grid")
)
else:
click.echo(
f"this run created {entities_affected} new entities and updated {aspects_modified} aspects"
)
click.echo(
"rolling back will delete the entities created and revert the updated aspects"
)
click.echo()
click.echo(
f"showing first {len(structured_rows)} of {aspects_modified} aspects touched by this run"
)
click.echo(tabulate(structured_rows, RUN_TABLE_COLUMNS, tablefmt="grid"))
for row in rows:
click.echo(json.dumps(row, indent=4))


@ingest.command()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,7 @@ public BulkByScrollResponse deleteByUrn(@Nonnull final String urn) {
return null;
}

public BulkByScrollResponse deleteByUrnAspect(
@Nonnull final String urn,
@Nonnull final String aspect
) {
public BulkByScrollResponse deleteByUrnAspect(@Nonnull final String urn, @Nonnull final String aspect) {
BoolQueryBuilder finalQuery = QueryBuilders.boolQuery();
finalQuery.must(QueryBuilders.termQuery("urn", urn));
finalQuery.must(QueryBuilders.termQuery("aspect", aspect));
Expand All @@ -114,7 +111,7 @@ public BulkByScrollResponse deleteByUrnAspect(
return null;
}

public SearchResponse findByParams(Map<String, String> searchParams, boolean includeSoftDeleted) {
public SearchResponse findByParams(Map<String, String> searchParams, boolean includeSoftDeleted, int from, int size) {
SearchRequest searchRequest = new SearchRequest();

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
Expand All @@ -131,8 +128,8 @@ public SearchResponse findByParams(Map<String, String> searchParams, boolean inc

searchSourceBuilder.query(finalQuery);

// this is the max page size elastic will return
searchSourceBuilder.size(10000);
searchSourceBuilder.from(from);
searchSourceBuilder.size(size);

searchRequest.source(searchSourceBuilder);

Expand All @@ -147,15 +144,16 @@ public SearchResponse findByParams(Map<String, String> searchParams, boolean inc
return null;
}

public SearchResponse findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted) {
public SearchResponse findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted,
int from, int size) {
Map<String, String> params = new HashMap<>();
params.put("registryName", registryName);
params.put("registryVersion", registryVersion);
return findByParams(params, includeSoftDeleted);
return findByParams(params, includeSoftDeleted, from, size);
}

public SearchResponse findByRunId(String runId, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap("runId", runId), includeSoftDeleted);
public SearchResponse findByRunId(String runId, boolean includeSoftDeleted, int from, int size) {
return findByParams(Collections.singletonMap("runId", runId), includeSoftDeleted, from, size);
}

public SearchResponse findRuns(Integer pageOffset, Integer pageSize) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.run.IngestionRunSummary;
import com.linkedin.metadata.search.elasticsearch.indexbuilder.ESIndexBuilder;
import com.linkedin.metadata.search.utils.ESUtils;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.mxe.SystemMetadata;
import java.io.IOException;
Expand Down Expand Up @@ -100,7 +101,8 @@ public void setDocStatus(String urn, boolean removed) {
// searchBy findByParams
// If status.removed -> false (from removed to not removed) --> get soft deleted entities.
// If status.removed -> true (from not removed to removed) --> do not get soft deleted entities.
final List<AspectRowSummary> aspectList = findByParams(ImmutableMap.of("urn", urn), !removed);
final List<AspectRowSummary> aspectList =
findByParams(ImmutableMap.of("urn", urn), !removed, 0, ESUtils.MAX_RESULT_SIZE);
// for each -> toDocId and set removed to true for all
aspectList.forEach(aspect -> {
final String docId = toDocId(aspect.getUrn(), aspect.getAspectName());
Expand All @@ -123,18 +125,19 @@ public void insert(@Nullable SystemMetadata systemMetadata, String urn, String a
}

@Override
public List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap(FIELD_RUNID, runId), includeSoftDeleted);
public List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted, int from, int size) {
return findByParams(Collections.singletonMap(FIELD_RUNID, runId), includeSoftDeleted, from, size);
}

@Override
public List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted) {
return findByParams(Collections.singletonMap(FIELD_URN, urn), includeSoftDeleted);
public List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted, int from, int size) {
return findByParams(Collections.singletonMap(FIELD_URN, urn), includeSoftDeleted, from, size);
}

@Override
public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted) {
SearchResponse searchResponse = _esDAO.findByParams(systemMetaParams, includeSoftDeleted);
public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted, int from,
int size) {
SearchResponse searchResponse = _esDAO.findByParams(systemMetaParams, includeSoftDeleted, from, size);
if (searchResponse != null) {
SearchHits hits = searchResponse.getHits();
List<AspectRowSummary> summaries = Arrays.stream(hits.getHits()).map(hit -> {
Expand All @@ -159,11 +162,12 @@ public List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams,
}

@Override
public List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted) {
public List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted,
int from, int size) {
Map<String, String> registryParams = new HashMap<>();
registryParams.put(FIELD_REGISTRY_NAME, registryName);
registryParams.put(FIELD_REGISTRY_VERSION, registryVersion);
return findByParams(registryParams, includeSoftDeleted);
return findByParams(registryParams, includeSoftDeleted, from, size);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,13 @@ public interface SystemMetadataService {

void insert(@Nullable SystemMetadata systemMetadata, String urn, String aspect);

List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted);
List<AspectRowSummary> findByRunId(String runId, boolean includeSoftDeleted, int from, int size);

List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted);
List<AspectRowSummary> findByUrn(String urn, boolean includeSoftDeleted, int from, int size);

List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted);
List<AspectRowSummary> findByParams(Map<String, String> systemMetaParams, boolean includeSoftDeleted, int from, int size);

List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted);
List<AspectRowSummary> findByRegistry(String registryName, String registryVersion, boolean includeSoftDeleted, int from, int size);

List<IngestionRunSummary> listRuns(Integer pageOffset, Integer pageSize, boolean includeSoftDeleted);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.linkedin.metadata.run.AspectRowSummary;
import com.linkedin.metadata.run.IngestionRunSummary;
import com.linkedin.metadata.search.elasticsearch.ElasticSearchServiceTest;
import com.linkedin.metadata.search.utils.ESUtils;
import com.linkedin.metadata.utils.elasticsearch.IndexConvention;
import com.linkedin.metadata.utils.elasticsearch.IndexConventionImpl;
import com.linkedin.mxe.SystemMetadata;
Expand Down Expand Up @@ -137,7 +138,7 @@ public void testFindByRunId() throws Exception {

syncAfterWrite(_searchClient, _indexName);

List<AspectRowSummary> rows = _client.findByRunId("abc-456", false);
List<AspectRowSummary> rows = _client.findByRunId("abc-456", false, 0, ESUtils.MAX_RESULT_SIZE);

assertEquals(rows.size(), 4);
rows.forEach(row -> assertEquals(row.getRunId(), "abc-456"));
Expand Down Expand Up @@ -169,7 +170,7 @@ public void testDelete() throws Exception {

syncAfterWrite(_searchClient, _indexName);

List<AspectRowSummary> rows = _client.findByRunId("abc-456", false);
List<AspectRowSummary> rows = _client.findByRunId("abc-456", false, 0, ESUtils.MAX_RESULT_SIZE);

assertEquals(rows.size(), 2);
rows.forEach(row -> assertEquals(row.getRunId(), "abc-456"));
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
namespace com.linkedin.metadata.run

import com.linkedin.entity.Aspect

record AspectRowSummary {
runId: string
aspectName: string
Expand All @@ -8,4 +10,5 @@ record AspectRowSummary {
metadata: string
version: long
keyAspect: boolean
aspect: optional Aspect
}
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,27 @@
},
"supports" : [ ],
"actions" : [ {
"name" : "describe",
"parameters" : [ {
"name" : "runId",
"type" : "string"
}, {
"name" : "start",
"type" : "int"
}, {
"name" : "count",
"type" : "int"
}, {
"name" : "includeSoft",
"type" : "boolean",
"optional" : true
}, {
"name" : "includeAspect",
"type" : "boolean",
"optional" : true
} ],
"returns" : "{ \"type\" : \"array\", \"items\" : \"com.linkedin.metadata.run.AspectRowSummary\" }"
}, {
"name" : "list",
"doc" : "Retrieves the value for an entity that is made up of latest versions of specified aspects.",
"parameters" : [ {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,8 +702,10 @@
"doc" : "Instance of the data platform (e.g. db instance)",
"optional" : true,
"Searchable" : {
"addToFilters" : true,
"fieldName" : "platformInstance",
"fieldType" : "URN"
"fieldType" : "URN",
"filterNameOverride" : "Platform Instance"
}
} ],
"Aspect" : {
Expand Down Expand Up @@ -804,6 +806,10 @@
}
},
"doc" : "Urn of the applied tag",
"Relationship" : {
"entityTypes" : [ "tag" ],
"name" : "TaggedWith"
},
"Searchable" : {
"addToFilters" : true,
"fieldName" : "tags",
Expand Down Expand Up @@ -881,6 +887,10 @@
}
},
"doc" : "Urn of the applied glossary term",
"Relationship" : {
"entityTypes" : [ "glossaryTerm" ],
"name" : "TermedWith"
},
"Searchable" : {
"addToFilters" : true,
"fieldName" : "glossaryTerms",
Expand Down Expand Up @@ -2483,7 +2493,7 @@
"Relationship" : {
"/tags/*/tag" : {
"entityTypes" : [ "tag" ],
"name" : "FieldTaggedWith"
"name" : "SchemaFieldTaggedWith"
}
},
"Searchable" : {
Expand All @@ -2501,7 +2511,7 @@
"Relationship" : {
"/terms/*/urn" : {
"entityTypes" : [ "glossaryTerm" ],
"name" : "FieldWithGlossaryTerm"
"name" : "SchemaFieldWithGlossaryTerm"
}
},
"Searchable" : {
Expand Down Expand Up @@ -2670,7 +2680,7 @@
"Relationship" : {
"/tags/*/tag" : {
"entityTypes" : [ "tag" ],
"name" : "EditableFieldTaggedWith"
"name" : "EditableSchemaFieldTaggedWith"
}
},
"Searchable" : {
Expand All @@ -2688,7 +2698,7 @@
"Relationship" : {
"/terms/*/urn" : {
"entityTypes" : [ "glossaryTerm" ],
"name" : "EditableFieldWithGlossaryTerm"
"name" : "EditableSchemaFieldWithGlossaryTerm"
}
},
"Searchable" : {
Expand Down
Loading