Skip to content

Commit

Permalink
feature/improve-missing-event-processing (#431)
Browse files Browse the repository at this point in the history
* feature/improve-missing-event-processing

https://eaflood.atlassian.net/browse/CI-327

* Update version number

* Update Azure Functions extension bundle version

* Schedule message replay for partial task run data

* Improve code maintainability

* Revert "Improve code maintainability"

This reverts commit 3481b71.

* Improve code maintainability

* Pass minimum required to software under test

* Make delay based functions shared utilities

* Remove file extension

* Pause before potential message replay

* Prepare mock data for missing event detection

* Remove obsolete Jest mock

* Detect and handle missing events

* Improve code maintainability

* Improve comments

* Propagate message publication error after pausing
  • Loading branch information
pwadmore-ea authored Jul 12, 2024
1 parent 90c88b8 commit ecd15f4
Show file tree
Hide file tree
Showing 34 changed files with 1,453 additions and 374 deletions.
86 changes: 86 additions & 0 deletions ImportFromFews/helpers/prepare-fews-data-for-import.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
const doIfMaximumDelayForPiServerIndexingIsNotExceeded =
require('../../Shared/timeseries-functions/do-if-maximum-delay-for-pi-server-indexing-is-not-exceeded')
const PartialFewsDataError = require('../../Shared/message-replay/partial-fews-data-error')
const JSONStream = require('jsonstream-next')
const { pipeline, Transform } = require('stream')
const { createGzip } = require('zlib')
const { promisify } = require('util')
const pipe = promisify(pipeline)

module.exports = async function (context, taskRunData, jsonStream) {
const buffers = []
let buffersLength = 0

const preStringifyObjectTransform = new Transform({
// Object mode is required to manipulate JSON.
objectMode: true,
transform: async (object, encoding, done) => {
try {
await checkForMissingEventsIfNeeded(context, taskRunData, object)
done(null, [object.key, object.value])
} catch (err) {
done(err)
}
}
})

const byteArrayTransform = new Transform({
transform: (chunk, encoding, done) => {
buffers.push(chunk)
buffersLength += chunk.length
done()
}
})

const transforms = { preStringifyObjectTransform, byteArrayTransform }
await runPipe(jsonStream, transforms)
return Buffer.concat(buffers, buffersLength)
}
async function runPipe (jsonStream, transforms) {
// Check for missing events if required before applying minification and gzip compression.
// Minification is achieved using a stream compatible version of JSON.stringify(JSON.parse(jsonString)).
const gzip = createGzip()
await pipe(
jsonStream,
// Emit keys and values from the stream.
JSONStream.parse('$*'),
// Check for missing events and transform the keys and values into the form required by JSONStream.stringifyObject.
// NOTE - Missing event processing is performed using asynchronous functions because the utility function
// used during processng (doIfMaximumDelayForPiServerIndexingIsNotExceeded) is asynchronous.
await transforms.preStringifyObjectTransform,
// Minify the contents of the stream through the removal of new lines and use of
// JSON.stringify with no indentation.
JSONStream.stringifyObject('{', ',', '}', 0),
gzip,
transforms.byteArrayTransform
)
}

async function checkForMissingEventsIfNeeded (context, taskRunData, fewsData) {
const noActionTakenMessage = `Skipping missing event detection for ${taskRunData.sourceDetails}`

await doIfMaximumDelayForPiServerIndexingIsNotExceeded(
{ fn: checkForMissingEvents, context, taskRunData, noActionTakenMessage }, fewsData
)
}

async function checkForMissingEvents (context, taskRunData, fewsData) {
// NOTE - This function is asynchronous because it is called asynchronously by the
// utility function doIfMaximumDelayForPiServerIndexingIsNotExceeded. This function
// could be enhanced to include asynchronous processng (for example, to record missing
// event details in the staging database), but this is not required currently.
//
// Missing events occur when timeSeries header data has an empty array of associated
// events rather than no associated events. Missing events can be an indication that
// PI Server indexing for a task run has not completed yet, so prepare to schedule
// replay of the message.
if (fewsData.key === 'timeSeries' && fewsData.value.filter(data => data?.events?.length === 0).length > 0) {
throw new PartialFewsDataError(
context,
taskRunData.message,
`Missing events detected for ${taskRunData.sourceDetails} - preparing to schedule message replay`
)
} else if (fewsData.key === 'timeSeries') {
context.log(`No missing events detected for ${taskRunData.sourceDetails}`)
}
}
6 changes: 3 additions & 3 deletions ImportFromFews/helpers/retrieve-and-load-fews-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ const sql = require('mssql')
const deactivateTimeseriesStagingExceptionsForTaskRunPlotOrFilter = require('./deactivate-timeseries-staging-exceptions-for-task-run-plot-or-filter')
const { executePreparedStatementInTransaction } = require('../../Shared/transaction-helper')
const getPiServerErrorMessage = require('../../Shared/timeseries-functions/get-pi-server-error-message')
const { minifyAndGzip } = require('../../Shared/utils')
const prepareFewsDataForImport = require('./prepare-fews-data-for-import')
const processImportError = require('./process-import-error')

module.exports = async function (context, taskRunData) {
Expand Down Expand Up @@ -49,8 +49,8 @@ async function retrieveAndCompressFewsData (context, taskRunData) {
await logTaskRunProgress(context, taskRunData, 'Retrieving data')
const fewsResponse = await axios(axiosConfig)
await logTaskRunProgress(context, taskRunData, 'Retrieved data')
taskRunData.fewsData = await minifyAndGzip(fewsResponse.data)
await logTaskRunProgress(context, taskRunData, 'Compressed data')
taskRunData.fewsData = await prepareFewsDataForImport(context, taskRunData, fewsResponse.data)
await logTaskRunProgress(context, taskRunData, 'Prepared data for import')
}

async function processFewsDataRetrievalResults (context, taskRunData) {
Expand Down
40 changes: 38 additions & 2 deletions ImportFromFews/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,40 @@ const processTaskRunDataForNonForecastOrLatestTaskRunForWorkflowIfPossible = req
const isSpanWorkflow = require('../Shared/timeseries-functions/check-spanning-workflow')
const processImportError = require('./helpers/process-import-error')
const retrieveAndLoadFewsData = require('./helpers/retrieve-and-load-fews-data')
const PartialFewsDataError = require('../Shared/message-replay/partial-fews-data-error')
const processPartialFewsDataError = require('../Shared/message-replay/process-partial-fews-data-error')
const publishScheduledMessagesIfNeeded = require('../Shared/timeseries-functions/publish-scheduled-messages-if-needed')

module.exports = async function (context, message) {
context.log(`Processing timeseries import message: ${JSON.stringify(message)}`)
const errorMessage = 'The FEWS data import function has failed with the following error:'
const isolationLevel = null

const taskRunData = Object.assign({}, message)
await doInTransaction({ fn: processMessage, context, errorMessage, isolationLevel }, message, taskRunData)
await doInTransaction({ fn: processMessageAndHandleMissingEvents, context, errorMessage, isolationLevel }, message, taskRunData)
// If all plots/filters for the task run have been processed, associated staging exceptions can be deactivated.
// This is performed in a new transaction to avoid deadlocks when plots/filters are processed concurrently.
await doInTransaction({ fn: deactivateStagingExceptionBySourceFunctionAndTaskRunIdIfPossible, context, errorMessage, isolationLevel }, taskRunData)

// If context.bindings.importFromFews exists, this indicates that the
// plot/filter being processed has missing events. This could be due
// to incomplete PI Server indexing for the task run or there could be
// genuine missing events. Without further information to determine
// if the missing events are genuine, the message needs to be replayed.
// Message replay will be attempted until either of the following occur:
// - There are no missing events for the plot/filter.
// - The maximum amount of time allowed for PI Server indexing to complete
// is exceeded. If events are still missing at this time, available data
// for the plot/filter will be loaded. This scenario will always occur
// for genuine missing events and will result in delayed loading accordingly.
const scheduledMessageConfig = {
destinationName: 'fews-import-queue',
outputBinding: 'importFromFews'
}

// In common with messages published using context bindings, publish scheduled messages outside of the
// transactions used during message processing.
await publishScheduledMessagesIfNeeded(context, scheduledMessageConfig)
}

async function processMessageIfPossible (taskRunData, context, message) {
Expand All @@ -49,11 +72,24 @@ async function processMessage (transaction, context, message, taskRunData) {
await setSourceConfig(taskRunData)
await processMessageIfPossible(taskRunData, context, message)
} else {
taskRunData.errorMessage = 'Messages processed by the ImportFromFews endpoint require must contain taskRunId and either plotId or filterId attributes'
taskRunData.errorMessage =
'Messages processed by the ImportFromFews endpoint require must contain taskRunId and either plotId or filterId attributes'
await createStagingException(context, taskRunData)
}
}

async function processMessageAndHandleMissingEvents (transaction, context, message, taskRunData) {
try {
await processMessage(transaction, context, message, taskRunData)
} catch (err) {
if (err instanceof PartialFewsDataError) {
processPartialFewsDataError(err.context, err.incomingMessage, 'importFromFews')
} else {
throw err
}
}
}

async function setSourceConfig (taskRunData) {
if (taskRunData.plotId) {
taskRunData.sourceId = taskRunData.plotId
Expand Down
Loading

0 comments on commit ecd15f4

Please sign in to comment.