Skip to content

Commit

Permalink
Merge branch 'master' into cus2192-report-lineage-issue
Browse files Browse the repository at this point in the history
  • Loading branch information
sid-acryl authored Jul 18, 2024
2 parents 6ebed8c + 8266b02 commit 48bd34b
Show file tree
Hide file tree
Showing 13 changed files with 368 additions and 230 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -255,4 +255,19 @@ public static Filter viewFilter(
Filter result = SearchUtils.combineFilters(null, viewInfo.getDefinition().getFilter());
return result;
}

/**
* Simply resolves the end time filter for the search across lineage query. If the start time is
* provided, but end time is not provided, we will default to the current time.
*/
public static Long getLineageEndTimeMillis(
@Nullable Long startTimeMillis, @Nullable Long endTimeMillis) {
if (endTimeMillis != null) {
return endTimeMillis;
}
if (startTimeMillis != null) {
return System.currentTimeMillis();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.linkedin.datahub.graphql.generated.LineageInput;
import com.linkedin.datahub.graphql.generated.LineageRelationship;
import com.linkedin.datahub.graphql.generated.Restricted;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.common.mappers.UrnToEntityMapper;
import com.linkedin.metadata.graph.SiblingGraphService;
import graphql.schema.DataFetcher;
Expand Down Expand Up @@ -63,7 +64,10 @@ public CompletableFuture<EntityLineageResult> get(DataFetchingEnvironment enviro
@Nullable final Integer count = input.getCount(); // Optional!
@Nullable final Boolean separateSiblings = input.getSeparateSiblings(); // Optional!
@Nullable final Long startTimeMillis = input.getStartTimeMillis(); // Optional!
@Nullable final Long endTimeMillis = input.getEndTimeMillis(); // Optional!
@Nullable
final Long endTimeMillis =
ResolverUtils.getLineageEndTimeMillis(
input.getStartTimeMillis(), input.getEndTimeMillis()); // Optional!

com.linkedin.metadata.graph.LineageDirection resolvedDirection =
com.linkedin.metadata.graph.LineageDirection.valueOf(lineageDirection.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ public CompletableFuture<ScrollAcrossLineageResults> get(DataFetchingEnvironment
@Nullable
Long startTimeMillis = input.getStartTimeMillis() == null ? null : input.getStartTimeMillis();
@Nullable
Long endTimeMillis = input.getEndTimeMillis() == null ? null : input.getEndTimeMillis();
Long endTimeMillis =
ResolverUtils.getLineageEndTimeMillis(input.getStartTimeMillis(), input.getEndTimeMillis());

final LineageFlags lineageFlags = LineageFlagsInputMapper.map(context, input.getLineageFlags());
if (lineageFlags.getStartTimeMillis() == null && startTimeMillis != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,8 @@ public CompletableFuture<SearchAcrossLineageResults> get(DataFetchingEnvironment
@Nullable
Long startTimeMillis = input.getStartTimeMillis() == null ? null : input.getStartTimeMillis();
@Nullable
Long endTimeMillis = input.getEndTimeMillis() == null ? null : input.getEndTimeMillis();
Long endTimeMillis =
ResolverUtils.getLineageEndTimeMillis(input.getStartTimeMillis(), input.getEndTimeMillis());

final LineageFlags lineageFlags = LineageFlagsInputMapper.map(context, input.getLineageFlags());
if (lineageFlags.getStartTimeMillis() == null && startTimeMillis != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import com.linkedin.datahub.graphql.QueryContext;
import com.linkedin.datahub.graphql.generated.EntityTypeToPlatforms;
import com.linkedin.datahub.graphql.generated.LineageFlags;
import com.linkedin.datahub.graphql.resolvers.ResolverUtils;
import com.linkedin.datahub.graphql.types.entitytype.EntityTypeMapper;
import com.linkedin.datahub.graphql.types.mappers.ModelMapper;
import java.util.Collections;
Expand Down Expand Up @@ -42,12 +43,16 @@ public com.linkedin.metadata.query.LineageFlags apply(
if (lineageFlags.getIgnoreAsHops() != null) {
result.setIgnoreAsHops(mapIgnoreAsHops(lineageFlags.getIgnoreAsHops()));
}
if (lineageFlags.getEndTimeMillis() != null) {
result.setEndTimeMillis(lineageFlags.getEndTimeMillis());
}
if (lineageFlags.getStartTimeMillis() != null) {
result.setStartTimeMillis(lineageFlags.getStartTimeMillis());
}
// Default to "now" if no end time is provided, but start time is provided.
Long endTimeMillis =
ResolverUtils.getLineageEndTimeMillis(
lineageFlags.getStartTimeMillis(), lineageFlags.getEndTimeMillis());
if (endTimeMillis != null) {
result.setEndTimeMillis(endTimeMillis);
}
if (lineageFlags.getEntitiesExploredPerHopLimit() != null) {
result.setEntitiesExploredPerHopLimit(lineageFlags.getEntitiesExploredPerHopLimit());
}
Expand Down
15 changes: 15 additions & 0 deletions datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import {
INGESTION_REFRESH_SOURCES_ID,
} from '../../onboarding/config/IngestionOnboardingConfig';
import { ONE_SECOND_IN_MS } from '../../entity/shared/tabs/Dataset/Queries/utils/constants';
import { useCommandS } from './hooks';

const PLACEHOLDER_URN = 'placeholder-urn';

Expand All @@ -51,6 +52,8 @@ const FilterWrapper = styled.div`
display: flex;
`;

const SYSTEM_INTERNAL_SOURCE_TYPE = 'SYSTEM';

export enum IngestionSourceType {
ALL,
UI,
Expand Down Expand Up @@ -102,6 +105,17 @@ export const IngestionSourceList = () => {
// Set of removed urns used to account for eventual consistency
const [removedUrns, setRemovedUrns] = useState<string[]>([]);
const [sourceFilter, setSourceFilter] = useState(IngestionSourceType.ALL);
const [hideSystemSources, setHideSystemSources] = useState(true);

/**
* Show or hide system ingestion sources using a hidden command S command.
*/
useCommandS(() => setHideSystemSources(!hideSystemSources));

// Ingestion Source Default Filters
const filters = hideSystemSources
? [{ field: 'sourceType', values: [SYSTEM_INTERNAL_SOURCE_TYPE], negated: true }]
: undefined;

// Ingestion Source Queries
const { loading, error, data, client, refetch } = useListIngestionSourcesQuery({
Expand All @@ -110,6 +124,7 @@ export const IngestionSourceList = () => {
start,
count: pageSize,
query: (query?.length && query) || undefined,
filters,
},
},
fetchPolicy: (query?.length || 0) > 0 ? 'no-cache' : 'cache-first',
Expand Down
16 changes: 16 additions & 0 deletions datahub-web-react/src/app/ingest/source/hooks.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
import { useEffect } from 'react';

export const useCommandS = (onPress: () => void) => {
useEffect(() => {
const handleKeyDown = (event: KeyboardEvent) => {
if (event.metaKey && event.key === 's') {
event.preventDefault();
onPress();
}
};
window.addEventListener('keydown', handleKeyDown);
return () => {
window.removeEventListener('keydown', handleKeyDown);
};
}, [onPress]);
};
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,24 @@ record DataHubIngestionSourceInfo {
*/
extraArgs: optional map[string, string]
}

/**
* The source or origin of the Ingestion Source
*
* Currently CLI and UI do not provide an explicit source.
*/
source: optional record DataHubIngestionSourceSource {
/**
* The source type of the ingestion source
*/
@Searchable = {
"fieldName": "sourceType"
}
type: enum DataHubIngestionSourceSourceType {
/**
* A system internal source, e.g. for running search indexing operations, feature computation, etc.
*/
SYSTEM
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,15 @@
import io.swagger.v3.oas.annotations.OpenAPIDefinition;
import io.swagger.v3.oas.annotations.info.Info;
import io.swagger.v3.oas.annotations.servers.Server;
import io.swagger.v3.oas.models.Components;
import io.swagger.v3.oas.models.OpenAPI;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.springdoc.core.models.GroupedOpenApi;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -38,8 +43,6 @@ public class SpringWebConfig implements WebMvcConfigurer {
private static final Set<String> V1_PACKAGES = Set.of("io.datahubproject.openapi.v1");
private static final Set<String> V2_PACKAGES = Set.of("io.datahubproject.openapi.v2");
private static final Set<String> V3_PACKAGES = Set.of("io.datahubproject.openapi.v3");
private static final Set<String> SCHEMA_REGISTRY_PACKAGES =
Set.of("io.datahubproject.openapi.schema.registry");

private static final Set<String> OPENLINEAGE_PACKAGES =
Set.of("io.datahubproject.openapi.openlineage");
Expand Down Expand Up @@ -74,14 +77,31 @@ public void addFormatters(FormatterRegistry registry) {
public GroupedOpenApi v3OpenApiGroup(final EntityRegistry entityRegistry) {
return GroupedOpenApi.builder()
.group("10-openapi-v3")
.displayName("DataHub Entities v3 (OpenAPI)")
.displayName("DataHub v3 (OpenAPI)")
.addOpenApiCustomizer(
openApi -> {
OpenAPI v3OpenApi = OpenAPIV3Generator.generateOpenApiSpec(entityRegistry);
openApi.setInfo(v3OpenApi.getInfo());
openApi.setTags(Collections.emptyList());
openApi.setPaths(v3OpenApi.getPaths());
openApi.setComponents(v3OpenApi.getComponents());
openApi.getPaths().putAll(v3OpenApi.getPaths());
// Merge components. Swagger does not provide append method to add components.
final Components components = new Components();
final Components oComponents = openApi.getComponents();
final Components v3Components = v3OpenApi.getComponents();
components
.callbacks(concat(oComponents::getCallbacks, v3Components::getCallbacks))
.examples(concat(oComponents::getExamples, v3Components::getExamples))
.extensions(concat(oComponents::getExtensions, v3Components::getExtensions))
.headers(concat(oComponents::getHeaders, v3Components::getHeaders))
.links(concat(oComponents::getLinks, v3Components::getLinks))
.parameters(concat(oComponents::getParameters, v3Components::getParameters))
.requestBodies(
concat(oComponents::getRequestBodies, v3Components::getRequestBodies))
.responses(concat(oComponents::getResponses, v3Components::getResponses))
.schemas(concat(oComponents::getSchemas, v3Components::getSchemas))
.securitySchemes(
concat(oComponents::getSecuritySchemes, v3Components::getSecuritySchemes));
openApi.setComponents(components);
})
.packagesToScan(V3_PACKAGES.toArray(String[]::new))
.build();
Expand Down Expand Up @@ -122,4 +142,14 @@ public GroupedOpenApi openlineageOpenApiGroup() {
.packagesToScan(OPENLINEAGE_PACKAGES.toArray(String[]::new))
.build();
}

/** Concatenates two maps. */
private <K, V> Map<K, V> concat(Supplier<Map<K, V>> a, Supplier<Map<K, V>> b) {
return a.get() == null
? b.get()
: b.get() == null
? a.get()
: Stream.concat(a.get().entrySet().stream(), b.get().entrySet().stream())
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}
}
Loading

0 comments on commit 48bd34b

Please sign in to comment.