diff --git a/index.js b/index.js index 1596982..0ede6f5 100755 --- a/index.js +++ b/index.js @@ -2,39 +2,65 @@ import { logger, getSymbol } from './lib/logger.js' import { options, configValid } from './lib/args.js' if (!configValid) { - logger.error({ component: 'main', message: 'invalid configuration... Exiting'}) + logger.error({ component, message: 'invalid configuration... Exiting'}) logger.end() process.exit(1) } import startFsEventWatcher from './lib/events.js' -import { getOpenIDConfiguration, getToken } from './lib/auth.js' +import * as auth from './lib/auth.js' import * as api from './lib/api.js' import { serializeError } from 'serialize-error' import { initScanner } from './lib/scan.js' import semverGte from 'semver/functions/gte.js' +import Alarm from './lib/alarm.js' +import * as CONSTANTS from './lib/consts.js' const minApiVersion = '1.2.7' +const component = 'index' process.on('SIGINT', () => { logger.info({ - component: 'main', + component, message: 'received SIGINT, exiting' }) process.exit(0) -}) +}) -run() +Alarm.on('shutdown', (exitCode) => { + logger.error({ + component, + message: `received shutdown event with code ${exitCode}, exiting` + }) + process.exit(exitCode) +}) + +Alarm.on('alarmRaised', (alarmType) => { + logger.error({ + component, + message: `Alarm raised: ${alarmType}` + }) +}) + +Alarm.on('alarmLowered', (alarmType) => { + logger.info({ + component, + message: `Alarm lowered: ${alarmType}` + }) +}) + +await run() async function run() { try { logger.info({ - component: 'main', + component, message: 'running', pid: process.pid, options: getObfuscatedConfig(options) }) await preflightServices() + setupAlarmHandlers() if (options.mode === 'events') { startFsEventWatcher() } @@ -45,12 +71,25 @@ async function run() { catch (e) { logError(e) logger.end() + process.exitCode = CONSTANTS.ERR_FAILINIT } } +function setupAlarmHandlers() { + const alarmHandlers = { + apiOffline: api.offlineRetryHandler, + authOffline: auth.offlineRetryHandler, + noGrant: () => Alarm.shutdown(CONSTANTS.ERR_NOGRANT), + noToken: () => Alarm.shutdown(CONSTANTS.ERR_NOTOKEN) + } + Alarm.on('alarmRaised', (alarmType) => { + alarmHandlers[alarmType]?.() + }) +} + function logError(e) { const errorObj = { - component: e.component || 'main', + component: e.component || 'index', message: e.message, } if (e.request) { @@ -74,7 +113,7 @@ function logError(e) { async function hasMinApiVersion () { const [remoteApiVersion] = await api.getDefinition('$.info.version') - logger.info({ component: 'main', message: `preflight API version`, minApiVersion, remoteApiVersion}) + logger.info({ component, message: `preflight API version`, minApiVersion, remoteApiVersion}) if (semverGte(remoteApiVersion, minApiVersion)) { return true } @@ -85,9 +124,9 @@ async function hasMinApiVersion () { async function preflightServices () { await hasMinApiVersion() - await getOpenIDConfiguration() - await getToken() - logger.info({ component: 'main', message: `preflight token request suceeded`}) + await auth.getOpenIDConfiguration() + await auth.getToken() + logger.info({ component, message: `preflight token request suceeded`}) const promises = [ api.getCollection(options.collectionId), api.getCollectionAssets(options.collectionId), @@ -104,9 +143,10 @@ async function preflightServices () { setInterval(refreshUser, 10 * 60000) } catch (e) { - logger.warn({ component: 'main', message: `preflight user request failed; token may be missing scope 'stig-manager:user:read'? Watcher will not set {"status": "accepted"}`}) + logger.warn({ component, message: `preflight user request failed; token may be missing scope 'stig-manager:user:read'? Watcher will not set {"status": "accepted"}`}) + Alarm.noGrant(false) } - logger.info({ component: 'main', message: `prefilght api requests suceeded`}) + logger.info({ component, message: `prefilght api requests suceeded`}) } function getObfuscatedConfig (options) { @@ -119,6 +159,11 @@ function getObfuscatedConfig (options) { async function refreshUser() { try { + if (Alarm.isAlarmed()) return + logger.info({ + component, + message: 'refreshing user cache' + }) await api.getUser() } catch (e) { @@ -128,6 +173,11 @@ async function refreshUser() { async function refreshCollection() { try { + if (Alarm.isAlarmed()) return + logger.info({ + component, + message: 'refreshing collection cache' + }) await api.getCollection(options.collectionId) } catch (e) { diff --git a/lib/alarm.js b/lib/alarm.js new file mode 100644 index 0000000..46feadc --- /dev/null +++ b/lib/alarm.js @@ -0,0 +1,122 @@ +import { EventEmitter } from "node:events" + +/** + * Represents the state of each alarm type. + * @typedef {Object} Alarms + * @property {boolean} apiOffline - the STIG manager API is unreachable. + * @property {boolean} authOffline - the OIDC IdP is unreachable. + * @property {boolean} noToken - the OIDC IdP did not issue the client a token. + * @property {boolean} noGrant - the client has an insufficient grant on the configured Collection. + */ + +/** + * @typedef {'apiOffline' | 'authOffline' | 'noToken' | 'noGrant'} AlarmType + */ + +class Alarm extends EventEmitter { + /** @type {Alarms} */ + #alarms + + constructor () { + super() + this.#alarms = { + apiOffline: false, + authOffline: false, + noToken: false, + noGrant: false, + } + } + + /** + * Emits 'alarmRaised' or 'alarmLowered' based on 'state', passing the alarmType + * @param {AlarmType} event + * @param {boolean} state + */ + #emitAlarmEvent (alarmType, state) { + if (alarmType === 'shutdown') { + this.emit('shutdown', state) + return + } + if (state) { + this.emit('alarmRaised', alarmType) + } + else { + this.emit('alarmLowered', alarmType) + } + } + + /** + * Sets the state of the apiOffline alarm + * and emits an alarmRaised or alarmLowered event + * @param {boolean} state + */ + apiOffline (state) { + if (this.#alarms.apiOffline === state) return + this.#alarms.apiOffline = state + this.#emitAlarmEvent( 'apiOffline', state) + } + + /** + * Sets the state of the authOffline alarm + * and emits an alarmRaised or alarmLowered event + * @param {boolean} state + */ + authOffline (state) { + if (this.#alarms.authOffline === state) return + this.#alarms.authOffline = state + this.#emitAlarmEvent( 'authOffline', state) + } + + /** + * Sets the state of the noToken alarm + * and emits an alarmRaised or alarmLowered event + * @param {boolean} state + */ + noToken (state) { + if (this.#alarms.noToken === state) return + this.#alarms.noToken = state + this.#emitAlarmEvent( 'noToken', state) + } + + /** + * Sets the state of the noGrant alarm + * and emits an alarmRaised or alarmLowered event + * @param {boolean} state + */ + noGrant (state) { + if (this.#alarms.noGrant === state) return + this.#alarms.noGrant = state + this.#emitAlarmEvent( 'noGrant', state) + } + + /** + * Returns an array of the raised alarm types + * @returns {string[]} + */ + raisedAlarms () { + return Object.keys(this.#alarms).filter(key=>this.#alarms[key]) + } + + /** + * Returns true if any alarm is raised + * @returns {boolean} + */ + isAlarmed () { + return Object.values(this.#alarms).some(value=>value) + } + + /** + * Emits a shutdown event with the provied exitCode + * @param {number} exitCode + */ + shutdown (exitCode) { + this.#emitAlarmEvent('shutdown', exitCode) + } + + /** @type {Alarms} */ + get alarms() { + return this.#alarms + } +} + +export default new Alarm() \ No newline at end of file diff --git a/lib/api.js b/lib/api.js index 00fd696..f663bdc 100644 --- a/lib/api.js +++ b/lib/api.js @@ -2,6 +2,8 @@ import got from 'got' import { options } from './args.js' import { getToken, tokens } from './auth.js' import { logger, getSymbol } from './logger.js' +import Alarm from './alarm.js' +import * as CONSTANTS from './consts.js' const cache = { collection: null, @@ -11,120 +13,130 @@ const cache = { scapBenchmarkMap: null, stigs: null } -const _cache = cache -export { _cache as cache } +export {cache} -async function apiGet(endpoint, authenticate = true) { - try { - const requestOptions = { - responseType: 'json' - } - if (authenticate) { +/** + * Sends requests to the STIG Manager API and returns the response + * @async + * @function apiRequest + * @param {Object} options + * @property {'GET'|'POST'|'PATCH'} [options.method='GET'] optional, HTTP method + * @property {string} options.endpoint required, API endpoint + * @property {Object=} options.json optional, object to be stringified into request body + * @property {boolean} [options.authorize=true] optional, whether the request should be authorized with a JWT + * @property {boolean} [options.fullResponse=false] optional, whether to return the response body or the got response object + * @returns {Promise} the response body as a JS object or the got response object + * @throws {Error} If there was an error making the request + */ +async function apiRequest({method = 'GET', endpoint, json, authorize = true, fullResponse = false}) { + const requestOptions = { + method, + url: `${options.api}${endpoint}`, + responseType: 'json', + timeout: { + request: CONSTANTS.REQUEST_TIMEOUT + } + } + + if (authorize) { + try { await getToken() - requestOptions.headers = { - Authorization: `Bearer ${tokens.access_token}` - } } - const response = await got.get(`${options.api}${endpoint}`, requestOptions) + catch (e) { + e.component = 'api' + logError(e) + throw(e) + } + requestOptions.headers = { + Authorization: `Bearer ${tokens.access_token}` + } + } + + if (json) requestOptions.json = json + + try { + const response = await got(requestOptions) logResponse (response ) - return response.body + return fullResponse ? response : response.body } catch (e) { + // accept a client error for POST /assets if it reports a duplicate name + if (e.response?.statusCode === 400 && e.response?.body?.message === 'Duplicate name') { + logResponse(e.response) + return fullResponse ? e?.response : e?.response?.body + } e.component = 'api' logError(e) + // grant problem + if (e.response?.statusCode === 403) { + Alarm.noGrant(true) + } + else { + Alarm.apiOffline(true) + } throw (e) } -} +} export async function getScapBenchmarkMap() { - const response = await apiGet('/stigs/scap-maps') - cache.scapBenchmarkMap = new Map(response.map(apiScapMap => [apiScapMap.scapBenchmarkId, apiScapMap.benchmarkId])) + const body = await apiRequest({endpoint: '/stigs/scap-maps'}) + cache.scapBenchmarkMap = new Map(body.map(apiScapMap => [apiScapMap.scapBenchmarkId, apiScapMap.benchmarkId])) return cache.scapBenchmarkMap } -export async function getDefinition(jsonPath) { - cache.definition = await apiGet(`/op/definition${jsonPath ? '?jsonpath=' + encodeURIComponent(jsonPath) : ''}`, false) +async function getDefinition(jsonPath) { + cache.definition = await apiRequest({ + endpoint: `/op/definition${jsonPath ? '?jsonpath=' + encodeURIComponent(jsonPath) : ''}`, + authorize: false + }) return cache.definition } +export {getDefinition} export async function getCollection(collectionId) { - cache.collection = await apiGet(`/collections/${collectionId}`) + cache.collection = await apiRequest({endpoint: `/collections/${collectionId}`}) return cache.collection } export async function getCollectionAssets(collectionId) { - cache.assets = await apiGet(`/assets?collectionId=${collectionId}&projection=stigs`) + cache.assets = await apiRequest({endpoint: `/assets?collectionId=${collectionId}&projection=stigs`}) return cache.assets } export async function getInstalledStigs() { - cache.stigs = await apiGet('/stigs') + cache.stigs = await apiRequest({endpoint: '/stigs'}) return cache.stigs } export async function createOrGetAsset(asset) { - try { - await getToken() - const response = await got.post(`${options.api}/assets?projection=stigs`, { - headers: { - Authorization: `Bearer ${tokens.access_token}` - }, - json: asset, - responseType: 'json' - }) - logResponse(response) - return { created: true, apiAsset: response.body } - } - catch (e) { - if (e.response.statusCode === 400 && e.response.body.message === 'Duplicate name') { - logResponse(e.response) - return { created: false, apiAsset: e.response.body.data } - } - e.component = 'api' - throw (e) - } + const response = await apiRequest({ + method: 'POST', + endpoint: '/assets?projection=stigs', + json: asset, + fullResponse: true + }) + const created = response.statusCode === 201 + return { created, apiAsset: response.body } } -export async function patchAsset(assetId, body) { - try { - await getToken() - const response = await got.patch(`${options.api}/assets/${assetId}?projection=stigs`, { - headers: { - Authorization: `Bearer ${tokens.access_token}` - }, - json: body, - responseType: 'json' - }) - logResponse(response) - return response.body - } - catch (e) { - e.component = 'api' - throw (e) - } +export function patchAsset(assetId, assetProperties) { + return apiRequest({ + method: 'PATCH', + endpoint: `/assets/${assetId}?projection=stigs`, + json: assetProperties + }) } -export async function postReviews(collectionId, assetId, reviews) { - try { - await getToken() - const response = await got.post(`${options.api}/collections/${collectionId}/reviews/${assetId}`, { - headers: { - Authorization: `Bearer ${tokens.access_token}` - }, - json: reviews, - responseType: 'json' - }) - logResponse(response) - return response.body - } - catch (e) { - e.component = 'api' - throw (e) - } +export function postReviews(collectionId, assetId, reviews) { + return apiRequest({ + method: 'POST', + endpoint: `/collections/${collectionId}/reviews/${assetId}`, + json: reviews + }) } export async function getUser() { - cache.user = await apiGet('/user') + cache.user = await apiRequest({endpoint: '/user'}) return cache.user } @@ -166,7 +178,7 @@ function logResponse (response) { function logError (e) { logger.error({ component: 'api', - message: 'query error', + message: e.message, request: { method: e.request?.options?.method, url: e.request?.requestUrl @@ -178,3 +190,51 @@ function logError (e) { }) } +/** + * interval between API connectivity tests when in alarm condition + * @type {number} + */ +const alarmRetryDelay = 5000 +/** + * max number of API connectivity tests when in alarm condition + * @type {number} + */ +const alarmRetryLimit = 5 + +/** + * count of API connectivity tests when in alarm condition + * @type {number} + */ +let alarmRetryCount = 0 + +/** + * Handler for when 'apiOffline' alarm is raised. + * Tests for API connectivity by calling the /op/defintion endpoint + * and increments alarmRetryCount until reaching the alarmRetryLimit + */ +function offlineRetryHandler() { + logger.info({ + component: 'api', + message: 'Testing if API is online' + }) + alarmRetryCount++ + getDefinition('$.info.version') + .then(() => { + alarmRetryCount = 0 + Alarm.apiOffline(false) + }) + .catch(() => { + if (alarmRetryCount >= alarmRetryLimit) { + logger.info({ + component: 'api', + message: 'API connectivity maximum tries reached, requesting shutdown' + }) + Alarm.shutdown(CONSTANTS.ERR_APIOFFLINE) + } + else { + setTimeout(offlineRetryHandler, alarmRetryDelay) + } + }) +} +export {offlineRetryHandler} + diff --git a/lib/args.js b/lib/args.js index 5da7fbb..9ef150c 100644 --- a/lib/args.js +++ b/lib/args.js @@ -103,8 +103,8 @@ program .option('--ignore-glob [glob...]', 'File or directory glob(s) to ignore. Can be invoked multiple times.(`WATCHER_IGNORE_GLOBS=`)', pe.WATCHER_IGNORE_GLOBS?.split(',')) .option('--event-polling', 'Use polling with `--mode events`, necessary for watching network files (`WATCHER_EVENT_POLLING=1`). Ignored if `--mode scan`, negate with `--no-event-polling`.', getBoolean('WATCHER_EVENT_POLLING', true)) .option('--no-event-polling', 'Don\'t use polling with `--mode events`, reduces CPU usage (`WATCHER_EVENT_POLLING=0`).') -.option('--stability-threshold ', 'If `--mode events`, milliseconds to wait for file size to stabilize. May be helpful when watching network shares. (`WATCHER_STABILITY_THRESHOLD`). Ignored with `--mode scan`', parseIntegerArg, parseIntegerEnv(pe.WATCHER_STABILITY_THRESHOLD) ?? 0) -.option('--one-shot', 'Process existing files in the path and exit. Sets `--add-existing`.', false) +.option('--stability-threshold ', 'If `--mode events`, milliseconds to wait for file size to stabilize. May be helpful when watching network shares. (`WATCHER_STABILITY_THRESHOLD`). Igonred with `--mode scan`', parseIntegerArg, parseIntegerEnv(pe.WATCHER_STABILITY_THRESHOLD) ?? 0) +.option('--one-shot', 'Process existing files in the path and exit. Sets `--add-existing`.', getBoolean('WATCHER_ONE_SHOT', false)) .option('--log-color', 'Colorize the console log output. Might confound downstream piped processes.', false) .option('-d, --debug', 'Shortcut for `--log-level debug --log-file-level debug`', false) .option('--scan-interval ', 'If `--mode scan`, the interval between scans. Ignored if `--mode events`.', parseIntegerArg, parseIntegerEnv(pe.WATCHER_SCAN_INTERVAL) ?? 300000) diff --git a/lib/auth.js b/lib/auth.js index 22859d6..f3c039d 100644 --- a/lib/auth.js +++ b/lib/auth.js @@ -4,6 +4,8 @@ import atob from 'atob' import {options} from './args.js' import jwt from 'jsonwebtoken' import { randomBytes } from 'crypto' +import Alarm from './alarm.js' +import * as CONSTANTS from './consts.js' const self = {} @@ -28,6 +30,7 @@ if (options.extraScopes) { self.scope = scopeArray.join(" ") let tokens, tokenDecoded +const component = 'auth' /** * Fetches OpenID configuration from the specified authority URL. * @async @@ -36,27 +39,37 @@ let tokens, tokenDecoded * @throws {Error} - If there's an error fetching the OpenID configuration. */ async function getOpenIDConfiguration () { + const wellKnownUrl = `${options.authority}/.well-known/openid-configuration` + logger.debug({ + component, + message: `sending openId configuration request`, + request: { + method: 'GET', + url: wellKnownUrl + } + }) + const requestOptions = { + responseType: 'json', + timeout: { + request: CONSTANTS.REQUEST_TIMEOUT + } + } + let response try { - const wellKnownUrl = `${options.authority}/.well-known/openid-configuration` - logger.debug({ - component: 'auth', - message: `sending openId configuration request`, - request: { - method: 'GET', - url: wellKnownUrl - } - }) - const response = await got.get(wellKnownUrl, { responseType: 'json' }) - logResponse(response) - self.url = response.body.token_endpoint - return response.body + response = await got.get(wellKnownUrl, requestOptions) } catch (e) { if (e.response) { logResponse(e.response) } + else { + Alarm.authOffline(true) + } throw e } + logResponse(response) + self.url = response.body.token_endpoint + return response.body } /** @@ -67,32 +80,29 @@ async function getOpenIDConfiguration () { * @throws {Error} If there was an error retrieving the token. */ async function getToken () { + if (tokenDecoded?.exp - Math.ceil(new Date().getTime() / 1000) >= self.threshold) + return tokenDecoded try { - if (tokenDecoded) { - let expiresIn = - tokenDecoded.exp - Math.ceil(new Date().getTime() / 1000) - expiresIn -= self.threshold - if (expiresIn > self.threshold) { - return tokenDecoded - } - } - // getting new token tokens = await self.authenticateFn() - tokenDecoded = decodeToken(tokens.access_token) - logger.debug({ - component: 'auth', - message: `received token response`, - tokens: tokens, - tokenDecoded: tokenDecoded - }) - return tokenDecoded } catch (e) { if (e.response) { logResponse(e.response) + Alarm.noToken(true) + } + else { + Alarm.authOffline(true) } throw e } + tokenDecoded = decodeToken(tokens.access_token) + logger.debug({ + component, + message: `received token response`, + tokens, + tokenDecoded + }) + return tokenDecoded } /** @@ -103,27 +113,30 @@ async function getToken () { * @returns {Promise} The response from auth provider. */ async function authenticateClientSecret () { - const parameters = { + const requestOptions = { form: { grant_type: 'client_credentials', scope: self.scope }, username: options.clientId, password: options.clientSecret, - responseType: 'json' + responseType: 'json', + timeout: { + request: CONSTANTS.REQUEST_TIMEOUT + } } logger.debug({ - component: 'auth', + component, message: 'sending client secret authentication request', request: { method: 'POST', url: self.url, - parameters + requestOptions } }) - const response = await got.post(self.url, parameters) + const response = await got.post(self.url, requestOptions) logResponse(response) return response.body } @@ -157,7 +170,7 @@ async function authenticateSignedJwt () { signedJwt }) - const parameters = { + const requestOptions = { form: { grant_type: 'client_credentials', client_assertion_type: @@ -165,7 +178,10 @@ async function authenticateSignedJwt () { client_assertion: signedJwt, scope: self.scope }, - responseType: 'json' + responseType: 'json', + timeout: { + request: CONSTANTS.REQUEST_TIMEOUT + } } logger.debug({ message: 'sending signed JWT authentication request', @@ -173,11 +189,11 @@ async function authenticateSignedJwt () { request: { method: 'POST', url: self.url, - parameters + requestOptions } }) - const response = await got.post(self.url, parameters) + const response = await got.post(self.url, requestOptions) logResponse(response) return response.body } @@ -223,7 +239,7 @@ function decodeToken (str) { */ function logResponse (response) { logger.http({ - component: 'auth', + component, message: 'http response', request: { method: response.request.options?.method, @@ -232,9 +248,54 @@ function logResponse (response) { }, response: { status: response.statusCode, - body: response.body + body: {...response.body, access_token: true, id_token: true} } }) } -export { getToken, getOpenIDConfiguration, tokens } +/** + * interval between IdP connectivity tests when in alarm condition + * @type {number} + */ +const alarmRetryDelay = 5000 +/** + * max number of IdP connectivity tests when in alarm condition + * @type {number} + */ +const alarmRetryLimit = 5 + +/** + * count of IdP connectivity tests when in alarm condition + * @type {number} + */ +let alarmRetryCount = 1 + +/** + * Handler for when 'authiOffline' alarm is raised. + * Tests for IdP connectivity by calling the OIDC metadata endpoint + * and increments alarmRetryCount until reaching the alarmRetryLimit + */ +function offlineRetryHandler() { + if (alarmRetryCount >= alarmRetryLimit) { + logger.info({ + component, + message: 'OIDC Provider connectivity maximum tries reached, requesting shutdown' + }) + Alarm.shutdown(CONSTANTS.ERR_AUTHOFFLINE) + } + logger.info({ + component, + message: 'Testing if OIDC Provider is online' + }) + getOpenIDConfiguration() + .then(() => { + alarmRetryCount = 1 + Alarm.authOffline(false) + }) + .catch(() => { + alarmRetryCount++ + setTimeout(offlineRetryHandler, alarmRetryDelay) + }) +} + +export { getToken, getOpenIDConfiguration, offlineRetryHandler, tokens } diff --git a/lib/consts.js b/lib/consts.js new file mode 100644 index 0000000..a584ca2 --- /dev/null +++ b/lib/consts.js @@ -0,0 +1,7 @@ +export const ERR_APIOFFLINE = 1 +export const ERR_AUTHOFFLINE = 2 +export const ERR_NOTOKEN = 3 +export const ERR_NOGRANT = 4 +export const ERR_UNKNOWN = 5 +export const ERR_FAILINIT = 6 +export const REQUEST_TIMEOUT = 5000 diff --git a/lib/events.js b/lib/events.js index 08cdcb8..8508565 100644 --- a/lib/events.js +++ b/lib/events.js @@ -3,45 +3,72 @@ import { logger } from './logger.js' import { queue } from './parse.js' import { serializeError } from 'serialize-error' import { watch } from 'chokidar' +import Alarm from './alarm.js' const component = 'events' -export default function startFsEventWatcher () { - const awaitWriteFinishVal = options.stabilityThreshold ? { stabilityThreshold: options.stabilityThreshold } : false + +function startFsEventWatcher () { + const awaitWriteFinish = options.stabilityThreshold ? { stabilityThreshold: options.stabilityThreshold } : false const ignored = options.ignoreGlob ?? [] - if (options.ignoreDot) ignored.push(/(^|[\/\\])\../) + if (options.ignoreDot) ignored.push(/(^|[/\\])\../) + const watcher = watch(options.path, { ignored, ignoreInitial: !options.addExisting, persistent: true, usePolling: options.usePolling, - awaitWriteFinish: awaitWriteFinishVal + awaitWriteFinish }) - logger.info({component: component, message: `watching`, path: options.path}) + logger.info({component, message: `watching`, path: options.path}) - watcher.on('ready', e => { + watcher.on('ready', () => { if (options.oneShot) { watcher.close() } }) - - watcher.on('error', e => { - logger.error({ - component: component, - error: serializeError(e) + watcher.on('error', onError ) + watcher.on('add', onAdd ) + Alarm.on('alarmRaised', onAlarmRaised) + Alarm.on('alarmLowered', onAlarmLowered) +} + +function onAdd (file) { + // chokidar glob argument doesn't work for UNC Windows, so we check file extension here + const extension = file.substring(file.lastIndexOf(".") + 1) + if (extension.toLowerCase() === 'ckl' || extension.toLowerCase() === 'xml' || extension.toLowerCase() === 'cklb') { + logger.info({ + component, + message: 'file system event', + event: 'add', + file }) + queue.push( file ) + } +} + +function onError (e) { + logger.error({ + component, + error: serializeError(e) }) - - watcher.on('add', file => { - // chokidar glob argument doesn't work for UNC Windows, so we check file extension here - const extension = file.substring(file.lastIndexOf(".") + 1) - if (extension.toLowerCase() === 'ckl' || extension.toLowerCase() === 'xml' || extension.toLowerCase() === 'cklb') { - logger.info({ - component: component, - message: 'file system event', - event: 'add', - file: file - }) - queue.push( file ) - } +} + +function onAlarmRaised (alarmType) { + logger.info({ + component, + message: `pausing parse queue on alarm raised`, + alarmType }) + queue.pause() } + +function onAlarmLowered (alarmType) { + logger.info({ + component, + message: `resuming parse queue on alarm lowered`, + alarmType + }) + queue.resume() +} + +export default startFsEventWatcher \ No newline at end of file diff --git a/lib/scan.js b/lib/scan.js index bde0814..0b96eea 100644 --- a/lib/scan.js +++ b/lib/scan.js @@ -5,10 +5,12 @@ import { serializeError } from 'serialize-error' import fg from 'fast-glob' import lineByLine from 'n-readlines' import fs from 'node:fs' +import Alarm from './alarm.js' const component = 'scan' const historySet = new Set() // in memory history set let isWriteScheduled = false // flag to indicate if there is pending files to write to the history file +let timeoutId // id of the active setTimeout /** * Utility function that calls initHistory() and startScanner() @@ -16,6 +18,8 @@ let isWriteScheduled = false // flag to indicate if there is pending files to wr function initScanner() { initHistory() startScanner() + Alarm.on('alarmRaised', onAlarmRaised) + Alarm.on('alarmLowered', onAlarmLowered) } /** @@ -79,7 +83,7 @@ function removeStaleFiles(currentFilesSet){ * References options properties {path, scanInterval}. */ function scheduleNextScan() { - setTimeout(() => { + timeoutId = setTimeout(() => { startScanner().catch(e => { logger.error({ component, error: serializeError(e) }) }) @@ -93,6 +97,19 @@ function scheduleNextScan() { }) } +/** + * Cancels the next scan and logs. + * References options properties {path}. + */ +function cancelNextScan() { + clearTimeout(timeoutId) + logger.info({ + component, + message: `scan cancelled`, + path: options.path + }) +} + /** * Returns immediately if options.historyFile is falsy. * Initializes the history Set by reading it from a file and adding each line to the history set. @@ -131,8 +148,9 @@ function initHistory() { } if (isHistoryFileWriteable()) { - // Handle the interrupt signal + // Handle the interrupt signal and shutdown event process.prependListener('SIGINT', interruptHandler) + Alarm.prependListener('shutdown', interruptHandler) // Set the write interval handler setInterval(writeIntervalHandler, options.historyWriteInterval) logger.verbose({ @@ -158,7 +176,7 @@ function initHistory() { function interruptHandler() { logger.info({ component, - message: `received SIGINT, try writing history to file` + message: `received SIGINT or shutdown event, try writing history to file` }) writeHistoryToFile() } @@ -297,6 +315,39 @@ function isHistoryFileWriteable() { } } +/** + * @typedef {import('./alarm.js').AlarmType} AlarmType + */ + +/** + * Handles raised alarms + * @param {AlarmType} alarmType - The type of alarm. + * Intended to be a callback function of Alarm.on('alarmRaised') + */ +function onAlarmRaised(alarmType) { + logger.verbose({ + component, + message: `handling raised alarm`, + alarmType + }) + cancelNextScan() +} + +/** + * Handles lowered alarms + * @param {AlarmType} alarmType - The type of alarm. + * Intended to be a callback function of Alarm.on('alarmRaised') + */ +function onAlarmLowered(alarmType) { + logger.verbose({ + component, + message: `handling lowered alarm`, + alarmType + }) + startScanner() +} + + export { startScanner, initHistory,