Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf(ui-ingestion): cache on creation or deletion of ingestion sources to reduce latency #6647

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 20 additions & 3 deletions datahub-web-react/src/app/ingest/source/IngestionSourceList.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import {
import { Message } from '../../shared/Message';
import TabToolbar from '../../entity/shared/components/styled/TabToolbar';
import { IngestionSourceBuilderModal } from './builder/IngestionSourceBuilderModal';
import { CLI_EXECUTOR_ID } from './utils';
import { addToListIngestionSourcesCache, CLI_EXECUTOR_ID, removeFromListIngestionSourcesCache } from './utils';
import { DEFAULT_EXECUTOR_ID, SourceBuilderState } from './builder/types';
import { IngestionSource, UpdateIngestionSourceInput } from '../../../types.generated';
import { SearchBar } from '../../search/SearchBar';
Expand All @@ -27,6 +27,8 @@ import useRefreshIngestionData from './executions/useRefreshIngestionData';
import { isExecutionRequestActive } from './executions/IngestionSourceExecutionList';
import analytics, { EventType } from '../../analytics';

const PLACEHOLDER_URN = 'placeholder-urn';

const SourceContainer = styled.div``;

const SourcePaginationContainer = styled.div`
Expand Down Expand Up @@ -96,14 +98,15 @@ export const IngestionSourceList = () => {
const [sourceFilter, setSourceFilter] = useState(IngestionSourceType.ALL);

// Ingestion Source Queries
const { loading, error, data, refetch } = useListIngestionSourcesQuery({
const { loading, error, data, client, refetch } = useListIngestionSourcesQuery({
variables: {
input: {
start,
count: pageSize,
query,
},
},
fetchPolicy: 'cache-first',
});
const [createIngestionSource] = useCreateIngestionSourceMutation();
const [updateIngestionSource] = useUpdateIngestionSourceMutation();
Expand Down Expand Up @@ -197,6 +200,19 @@ export const IngestionSourceList = () => {
});
} else {
// Create
const newSource = {
urn: PLACEHOLDER_URN,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we still do the 3 second timeout? To refetch. I bet if we return the URN from the createSource call we wouldn't even need that followup

name: input.name,
type: input.type,
config: null,
schedule: {
interval: input.schedule?.interval || null,
timezone: input.schedule?.timezone || null,
},
platform: null,
executions: null,
};
addToListIngestionSourcesCache(client, newSource, pageSize, query);
createIngestionSource({ variables: { input } })
.then((result) => {
message.loading({ content: 'Loading...', duration: 2 });
Expand All @@ -218,14 +234,14 @@ export const IngestionSourceList = () => {
setIsBuildingSource(false);
setFocusSourceUrn(undefined);
resetState();
// onCreateOrUpdateIngestionSourceSuccess();
})
.catch((e) => {
message.destroy();
message.error({
content: `Failed to create ingestion source!: \n ${e.message || ''}`,
duration: 3,
});
removeFromListIngestionSourcesCache(client, PLACEHOLDER_URN, page, pageSize, query);
});
}
};
Expand All @@ -236,6 +252,7 @@ export const IngestionSourceList = () => {
};

const deleteIngestionSource = async (urn: string) => {
removeFromListIngestionSourcesCache(client, urn, page, pageSize, query);
removeIngestionSourceMutation({
variables: { urn },
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ function IngestionSourceTable({
source.executions?.total &&
source.executions?.total > 0 &&
source.executions?.executionRequests[0].result?.status,
cliIngestion: source.config.executorId === CLI_EXECUTOR_ID,
cliIngestion: source.config?.executorId === CLI_EXECUTOR_ID,
}));

return (
Expand Down
83 changes: 83 additions & 0 deletions datahub-web-react/src/app/ingest/source/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { EntityType, FacetMetadata } from '../../../types.generated';
import { capitalizeFirstLetterOnly, pluralize } from '../../shared/textUtil';
import EntityRegistry from '../../entity/EntityRegistry';
import { SourceConfig } from './builder/types';
import { ListIngestionSourcesDocument, ListIngestionSourcesQuery } from '../../../graphql/ingestion.generated';

export const getSourceConfigs = (ingestionSources: SourceConfig[], sourceType: string) => {
const sourceConfigs = ingestionSources.find((source) => source.name === sourceType);
Expand Down Expand Up @@ -171,3 +172,85 @@ export const extractEntityTypeCountsFromFacets = (

return finalCounts;
};

/**
* Add an entry to the ListIngestionSources cache.
*/
export const addToListIngestionSourcesCache = (client, newSource, pageSize, query) => {
// Read the data from our cache for this query.
const currData: ListIngestionSourcesQuery | null = client.readQuery({
query: ListIngestionSourcesDocument,
variables: {
input: {
start: 0,
count: pageSize,
query,
},
},
});

// Add our new source into the existing list.
const newSources = [newSource, ...(currData?.listIngestionSources?.ingestionSources || [])];

// Write our data back to the cache.
client.writeQuery({
query: ListIngestionSourcesDocument,
variables: {
input: {
start: 0,
count: pageSize,
query,
},
},
data: {
listIngestionSources: {
start: 0,
count: (currData?.listIngestionSources?.count || 0) + 1,
total: (currData?.listIngestionSources?.total || 0) + 1,
ingestionSources: newSources,
},
},
});
};

/**
* Remove an entry from the ListIngestionSources cache.
*/
export const removeFromListIngestionSourcesCache = (client, urn, page, pageSize, query) => {
// Read the data from our cache for this query.
const currData: ListIngestionSourcesQuery | null = client.readQuery({
query: ListIngestionSourcesDocument,
variables: {
input: {
start: (page - 1) * pageSize,
count: pageSize,
query,
},
},
});

// Remove the source from the existing sources set.
const newSources = [
...(currData?.listIngestionSources?.ingestionSources || []).filter((source) => source.urn !== urn),
];

// Write our data back to the cache.
client.writeQuery({
query: ListIngestionSourcesDocument,
variables: {
input: {
start: (page - 1) * pageSize,
count: pageSize,
query,
},
},
data: {
listIngestionSources: {
start: currData?.listIngestionSources?.start || 0,
count: (currData?.listIngestionSources?.count || 1) - 1,
total: (currData?.listIngestionSources?.total || 1) - 1,
ingestionSources: newSources,
},
},
});
};