Skip to content

Commit

Permalink
[backend] Add feed queue control. Only add jobs if current queue is e…
Browse files Browse the repository at this point in the history
…mpty
  • Loading branch information
richard-julien committed Oct 4, 2024
1 parent c255a58 commit 6b5b292
Showing 1 changed file with 64 additions and 34 deletions.
98 changes: 64 additions & 34 deletions opencti-platform/opencti-graphql/src/manager/ingestionManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { parseStringPromise as xmlParse } from 'xml2js';
import TurndownService from 'turndown';
import * as R from 'ramda';
import { v4 as uuidv4 } from 'uuid';
import { clearIntervalAsync, setIntervalAsync } from 'set-interval-async/dynamic';
import { clearIntervalAsync, setIntervalAsync } from 'set-interval-async/fixed';
import type { SetIntervalAsyncTimer } from 'set-interval-async/fixed';
import type { Moment } from 'moment';
import { lockResource } from '../database/redis';
Expand Down Expand Up @@ -33,12 +33,12 @@ import { compareHashSHA256, hashSHA256 } from '../utils/hash';
import type { StixBundle, StixObject } from '../types/stix-common';
import { patchAttribute } from '../database/middleware';
import { ENTITY_TYPE_CONNECTOR } from '../schema/internalObject';
import { connectorIdFromIngestId } from '../domain/connector';
import { connectorIdFromIngestId, queueDetails } from '../domain/connector';

// Ingestion manager responsible to cleanup old data
// Each API will start is ingestion manager.
// If the lock is free, every API as the right to take it.
const SCHEDULE_TIME = conf.get('ingestion_manager:interval') || 300000;
const SCHEDULE_TIME = conf.get('ingestion_manager:interval') || 60000;
const INGESTION_MANAGER_KEY = conf.get('ingestion_manager:lock_key') || 'ingestion_manager_lock';

let running = false;
Expand All @@ -54,7 +54,12 @@ const asArray = (data: unknown) => {
return [];
};

const updateBuiltInConnectorInfo = async (context: AuthContext, user_id: string | undefined, id: string, state?: object) => {
interface UpdateInfo {
state?: any
buffering?: boolean
messages_number?: number
}
const updateBuiltInConnectorInfo = async (context: AuthContext, user_id: string | undefined, id: string, opts: UpdateInfo = {}) => {
// Patch the related connector
const csvNow = utcDate();
const connectorPatch: any = {
Expand All @@ -63,14 +68,14 @@ const updateBuiltInConnectorInfo = async (context: AuthContext, user_id: string
last_run_datetime: csvNow.toISOString(),
next_run_datetime: csvNow.add(SCHEDULE_TIME, 'milliseconds').toISOString(),
run_and_terminate: false,
buffering: false,
buffering: opts.buffering ?? false,
queue_threshold: 0,
queue_messages_size: 0
queue_messages_size: opts.messages_number ?? 0
},
connector_user_id: user_id,
};
if (state) {
connectorPatch.connector_state = JSON.stringify(state);
if (opts.state) {
connectorPatch.connector_state = JSON.stringify(opts.state);
}
const connectorId = connectorIdFromIngestId(id);
await patchAttribute(context, SYSTEM_USER, connectorId, ENTITY_TYPE_CONNECTOR, connectorPatch);
Expand Down Expand Up @@ -224,7 +229,8 @@ const rssDataHandler = async (context: AuthContext, httpRssGet: Getter, turndown
lastPubDate = R.last(items)?.pubDate;
await patchRssIngestion(context, SYSTEM_USER, ingestion.internal_id, { current_state_date: lastPubDate });
// Patch the related connector
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { current_state_date: lastPubDate });
const state = { current_state_date: lastPubDate };
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { state });
} else {
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id);
}
Expand All @@ -242,12 +248,20 @@ const rssExecutor = async (context: AuthContext, turndownService: TurndownServic
const ingestionPromises = [];
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
const ingestionPromise = rssDataHandler(context, httpGet, turndownService, ingestion)
.catch((e) => {
logApp.error(`[OPENCTI-MODULE] INGESTION - Error with rss handler ${ingestion.name}`);
logApp.error(e, { name: ingestion.name, context: 'RSS ingestion execution' });
});
ingestionPromises.push(ingestionPromise);
// If ingestion have remaining messages in the queue, dont fetch any new data
const { messages_number } = await queueDetails(connectorIdFromIngestId(ingestion.id));
if (messages_number === 0) {
const ingestionPromise = rssDataHandler(context, httpGet, turndownService, ingestion)
.catch((e) => {
logApp.error(`[OPENCTI-MODULE] INGESTION - Error with rss handler ${ingestion.name}`);
logApp.error(e, { name: ingestion.name, context: 'RSS ingestion execution' });
});
ingestionPromises.push(ingestionPromise);
} else {
// Update the state
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_number });
ingestionPromises.push(ingestionPromise);
}
}
return Promise.all(ingestionPromises);
};
Expand Down Expand Up @@ -331,7 +345,7 @@ export const processTaxiiResponse = async (context: AuthContext, ingestion: Basi
const state = { current_state_cursor: data.next, last_execution_date: now() };
const ingestionUpdate = await patchTaxiiIngestion(context, SYSTEM_USER, ingestion.internal_id, state);
const connectorState = { current_state_cursor: ingestionUpdate.current_state_cursor, added_after_start: ingestionUpdate.added_after_start };
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, connectorState);
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { state: connectorState });
} else {
// Reset the pagination cursor, and update date
const state = {
Expand All @@ -341,12 +355,12 @@ export const processTaxiiResponse = async (context: AuthContext, ingestion: Basi
};
const ingestionUpdate = await patchTaxiiIngestion(context, SYSTEM_USER, ingestion.internal_id, state);
const connectorState = { current_state_cursor: ingestionUpdate.current_state_cursor, added_after_start: ingestionUpdate.added_after_start };
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, connectorState);
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { state: connectorState });
}
} else {
const ingestionUpdate = await patchTaxiiIngestion(context, SYSTEM_USER, ingestion.internal_id, { last_execution_date: now(), current_state_cursor: undefined });
const connectorState = { current_state_cursor: ingestionUpdate.current_state_cursor, added_after_start: ingestionUpdate.added_after_start };
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, connectorState);
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { state: connectorState });
logApp.info('[OPENCTI-MODULE] Taxii ingestion - taxii server has not sent any object.', {
next: data.next,
more: data.more,
Expand Down Expand Up @@ -375,16 +389,24 @@ const taxiiExecutor = async (context: AuthContext) => {
const ingestionPromises = [];
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
const taxiiHandler = TAXII_HANDLERS[ingestion.version];
if (!taxiiHandler) {
throw UnsupportedError(`[OPENCTI-MODULE] Taxii version ${ingestion.version} is not yet supported`);
// If ingestion have remaining messages in the queue, dont fetch any new data
const { messages_number } = await queueDetails(connectorIdFromIngestId(ingestion.id));
if (messages_number === 0) {
const taxiiHandler = TAXII_HANDLERS[ingestion.version];
if (!taxiiHandler) {
throw UnsupportedError(`[OPENCTI-MODULE] Taxii version ${ingestion.version} is not yet supported`);
}
const ingestionPromise = taxiiHandler(context, ingestion)
.catch((e) => {
logApp.error(`[OPENCTI-MODULE] INGESTION - Error with taxii handler ${ingestion.name}`);
logApp.error(e, { name: ingestion.name, context: 'Taxii ingestion execution' });
});
ingestionPromises.push(ingestionPromise);
} else {
// Update the state
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_number });
ingestionPromises.push(ingestionPromise);
}
const ingestionPromise = taxiiHandler(context, ingestion)
.catch((e) => {
logApp.error(`[OPENCTI-MODULE] INGESTION - Error with taxii handler ${ingestion.name}`);
logApp.error(e, { name: ingestion.name, context: 'Taxii ingestion execution' });
});
ingestionPromises.push(ingestionPromise);
}
return Promise.all(ingestionPromises);
};
Expand Down Expand Up @@ -435,7 +457,7 @@ const csvDataHandler = async (context: AuthContext, ingestion: BasicStoreEntityI
// Update the state
const state = { current_state_hash: hashedIncomingData, added_after_start: utcDate(addedLast) };
await patchCsvIngestion(context, SYSTEM_USER, ingestion.internal_id, state);
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, state);
await updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { state });
}
};

Expand All @@ -450,12 +472,20 @@ const csvExecutor = async (context: AuthContext) => {
const ingestionPromises = [];
for (let i = 0; i < ingestions.length; i += 1) {
const ingestion = ingestions[i];
const ingestionPromise = csvDataHandler(context, ingestion)
.catch((e) => {
logApp.error(`[OPENCTI-MODULE] INGESTION - Error with csv handler ${ingestion.name}`);
logApp.error(e, { name: ingestion.name, context: 'CSV ingestion execution' });
});
ingestionPromises.push(ingestionPromise);
// If ingestion have remaining messages in the queue, dont fetch any new data
const { messages_number } = await queueDetails(connectorIdFromIngestId(ingestion.id));
if (messages_number === 0) {
const ingestionPromise = csvDataHandler(context, ingestion)
.catch((e) => {
logApp.error(`[OPENCTI-MODULE] INGESTION - Error with csv handler ${ingestion.name}`);
logApp.error(e, { name: ingestion.name, context: 'CSV ingestion execution' });
});
ingestionPromises.push(ingestionPromise);
} else {
// Update the state
const ingestionPromise = updateBuiltInConnectorInfo(context, ingestion.user_id, ingestion.id, { buffering: true, messages_number });
ingestionPromises.push(ingestionPromise);
}
}
return Promise.all(ingestionPromises);
};
Expand Down

0 comments on commit 6b5b292

Please sign in to comment.