Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Automation logging updates #10626

Merged
merged 5 commits into from
May 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 21 additions & 0 deletions packages/backend-core/src/context/mainContext.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,22 @@ async function newContext(updates: ContextMap, task: any) {
return Context.run(context, task)
}

export async function doInAutomationContext(params: {
appId: string,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use an object as a parameter to make it more verbose in its usage?

export async function doInAutomationContext(params: {
  appId: string,
  automationId: string,
  task: any
})

automationId: string,
task: any
}): Promise<any> {
const tenantId = getTenantIDFromAppID(params.appId)
return newContext(
{
tenantId,
appId: params.appId,
automationId: params.automationId,
},
params.task
)
}

export async function doInContext(appId: string, task: any): Promise<any> {
const tenantId = getTenantIDFromAppID(appId)
return newContext(
Expand Down Expand Up @@ -187,6 +203,11 @@ export function getTenantId(): string {
return tenantId
}

export function getAutomationId(): string | undefined {
const context = Context.get()
return context?.automationId
}

export function getAppId(): string | undefined {
const context = Context.get()
const foundId = context?.appId
Expand Down
1 change: 1 addition & 0 deletions packages/backend-core/src/context/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,5 @@ export type ContextMap = {
identity?: IdentityContext
environmentVariables?: Record<string, string>
isScim?: boolean
automationId?: string
}
12 changes: 12 additions & 0 deletions packages/backend-core/src/logging/pino/logger.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ if (!env.DISABLE_PINO_LOGGER) {
objects?: any[]
tenantId?: string
appId?: string
automationId?: string
identityId?: string
identityType?: IdentityType
correlationId?: string
Expand Down Expand Up @@ -86,6 +87,7 @@ if (!env.DISABLE_PINO_LOGGER) {
contextObject = {
tenantId: getTenantId(),
appId: getAppId(),
automationId: getAutomationId(),
identityId: identity?._id,
identityType: identity?.type,
correlationId: correlation.getId(),
Expand Down Expand Up @@ -159,6 +161,16 @@ if (!env.DISABLE_PINO_LOGGER) {
return appId
}

const getAutomationId = () => {
let appId
try {
appId = context.getAutomationId()
} catch (e) {
// do nothing
jvcalderon marked this conversation as resolved.
Show resolved Hide resolved
}
return appId
}

const getIdentity = () => {
let identity
try {
Expand Down
1 change: 1 addition & 0 deletions packages/backend-core/src/queue/inMemoryQueue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class InMemoryQueue {

on() {
// do nothing
return this
}

async waitForCompletion() {
Expand Down
173 changes: 125 additions & 48 deletions packages/backend-core/src/queue/listeners.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Job, JobId, Queue } from "bull"
import { JobQueue } from "./constants"
import * as context from "../context"

export type StalledFn = (job: Job) => Promise<void>

Expand Down Expand Up @@ -31,77 +32,153 @@ function handleStalled(queue: Queue, removeStalledCb?: StalledFn) {
})
}

function getLogParams(
eventType: QueueEventType,
event: BullEvent,
opts: {
job?: Job
jobId?: JobId
error?: Error
} = {},
extra: any = {}
) {
const message = `[BULL] ${eventType}=${event}`
const err = opts.error

const data = {
eventType,
event,
job: opts.job,
jobId: opts.jobId || opts.job?.id,
...extra,
}

return [message, err, data]
}

enum BullEvent {
ERROR = "error",
WAITING = "waiting",
ACTIVE = "active",
STALLED = "stalled",
PROGRESS = "progress",
COMPLETED = "completed",
FAILED = "failed",
PAUSED = "paused",
RESUMED = "resumed",
CLEANED = "cleaned",
DRAINED = "drained",
REMOVED = "removed",
}

enum QueueEventType {
AUTOMATION_EVENT = "automation-event",
APP_BACKUP_EVENT = "app-backup-event",
AUDIT_LOG_EVENT = "audit-log-event",
SYSTEM_EVENT = "system-event",
}

const EventTypeMap: { [key in JobQueue]: QueueEventType } = {
[JobQueue.AUTOMATION]: QueueEventType.AUTOMATION_EVENT,
[JobQueue.APP_BACKUP]: QueueEventType.APP_BACKUP_EVENT,
[JobQueue.AUDIT_LOG]: QueueEventType.AUDIT_LOG_EVENT,
[JobQueue.SYSTEM_EVENT_QUEUE]: QueueEventType.SYSTEM_EVENT,
}

function logging(queue: Queue, jobQueue: JobQueue) {
let eventType: string
switch (jobQueue) {
case JobQueue.AUTOMATION:
eventType = "automation-event"
break
case JobQueue.APP_BACKUP:
eventType = "app-backup-event"
break
case JobQueue.AUDIT_LOG:
eventType = "audit-log-event"
break
case JobQueue.SYSTEM_EVENT_QUEUE:
eventType = "system-event"
break
const eventType = EventTypeMap[jobQueue]

function doInJobContext(job: Job, task: any) {
// if this is an automation job try to get the app id
const appId = job.data.event?.appId
if (appId) {
return context.doInContext(appId, task)
} else {
task()
}
}

queue
.on(BullEvent.STALLED, async (job: Job) => {
// A job has been marked as stalled. This is useful for debugging job
// workers that crash or pause the event loop.
await doInJobContext(job, () => {
console.error(...getLogParams(eventType, BullEvent.STALLED, { job }))
})
})
.on(BullEvent.ERROR, (error: any) => {
// An error occurred.
console.error(...getLogParams(eventType, BullEvent.ERROR, { error }))
})

if (process.env.NODE_DEBUG?.includes("bull")) {
Copy link
Contributor Author

@Rory-Powell Rory-Powell May 17, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm planning on turning off bull debug logging by default (after investigation) to bring down logging costs, hence the stalled and error logs above being promoted to logging by default

queue
.on("error", (error: any) => {
// An error occurred.
console.error(`${eventType}=error error=${JSON.stringify(error)}`)
})
.on("waiting", (jobId: JobId) => {
.on(BullEvent.WAITING, (jobId: JobId) => {
// A Job is waiting to be processed as soon as a worker is idling.
console.log(`${eventType}=waiting jobId=${jobId}`)
console.info(...getLogParams(eventType, BullEvent.WAITING, { jobId }))
})
.on("active", (job: Job, jobPromise: any) => {
.on(BullEvent.ACTIVE, async (job: Job, jobPromise: any) => {
// A job has started. You can use `jobPromise.cancel()`` to abort it.
console.log(`${eventType}=active jobId=${job.id}`)
await doInJobContext(job, () => {
console.info(...getLogParams(eventType, BullEvent.ACTIVE, { job }))
})
})
.on("stalled", (job: Job) => {
// A job has been marked as stalled. This is useful for debugging job
// workers that crash or pause the event loop.
console.error(
`${eventType}=stalled jobId=${job.id} job=${JSON.stringify(job)}`
)
.on(BullEvent.PROGRESS, async (job: Job, progress: any) => {
// A job's progress was updated
await doInJobContext(job, () => {
console.info(
...getLogParams(
eventType,
BullEvent.PROGRESS,
{ job },
{ progress }
)
)
})
})
.on("progress", (job: Job, progress: any) => {
// A job's progress was updated!
console.log(
`${eventType}=progress jobId=${job.id} progress=${progress}`
)
})
.on("completed", (job: Job, result) => {
.on(BullEvent.COMPLETED, async (job: Job, result) => {
// A job successfully completed with a `result`.
console.log(`${eventType}=completed jobId=${job.id} result=${result}`)
await doInJobContext(job, () => {
console.info(
...getLogParams(eventType, BullEvent.COMPLETED, { job }, { result })
)
})
})
.on("failed", (job, err: any) => {
.on(BullEvent.FAILED, async (job: Job, error: any) => {
// A job failed with reason `err`!
console.log(`${eventType}=failed jobId=${job.id} error=${err}`)
await doInJobContext(job, () => {
console.error(
...getLogParams(eventType, BullEvent.FAILED, { job, error })
)
})
})
.on("paused", () => {
.on(BullEvent.PAUSED, () => {
// The queue has been paused.
console.log(`${eventType}=paused`)
console.info(...getLogParams(eventType, BullEvent.PAUSED))
})
.on("resumed", (job: Job) => {
.on(BullEvent.RESUMED, () => {
// The queue has been resumed.
console.log(`${eventType}=paused jobId=${job.id}`)
console.info(...getLogParams(eventType, BullEvent.RESUMED))
})
.on("cleaned", (jobs: Job[], type: string) => {
.on(BullEvent.CLEANED, (jobs: Job[], type: string) => {
// Old jobs have been cleaned from the queue. `jobs` is an array of cleaned
// jobs, and `type` is the type of jobs cleaned.
console.log(`${eventType}=cleaned length=${jobs.length} type=${type}`)
console.info(
...getLogParams(
eventType,
BullEvent.CLEANED,
{},
{ length: jobs.length, type }
)
)
})
.on("drained", () => {
.on(BullEvent.DRAINED, () => {
// Emitted every time the queue has processed all the waiting jobs (even if there can be some delayed jobs not yet processed)
console.log(`${eventType}=drained`)
console.info(...getLogParams(eventType, BullEvent.DRAINED))
})
.on("removed", (job: Job) => {
.on(BullEvent.REMOVED, (job: Job) => {
// A job successfully removed.
console.log(`${eventType}=removed jobId=${job.id}`)
console.info(...getLogParams(eventType, BullEvent.REMOVED, { job }))
})
}
}
Loading