Skip to content

Commit

Permalink
Merge remote-tracking branch 'parcelvoy/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
reaper committed Jan 6, 2025
2 parents f063f1d + e8d2cd9 commit c699033
Show file tree
Hide file tree
Showing 28 changed files with 637 additions and 330 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
exports.up = async function(knex) {
await knex.schema.alterTable('journey_user_step', function(table) {
table.index(['journey_id', 'type', 'delay_until'])
table.dropIndex(['type', 'delay_until'])
})
}

exports.down = async function(knex) {
await knex.schema.alterTable('journey_user_step', function(table) {
table.index(['type', 'delay_until'])
table.dropIndex(['journey_id', 'type', 'delay_until'])
})
}
2 changes: 1 addition & 1 deletion apps/platform/src/client/EventPostJob.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ export default class EventPostJob extends Job {

options = {
delay: 0,
attempts: 1,
attempts: 2,
}

static from(data: EventPostTrigger): EventPostJob {
Expand Down
2 changes: 2 additions & 0 deletions apps/platform/src/config/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,8 @@ export default (type?: EnvType): Env => {
redis: {
host: process.env.REDIS_HOST!,
port: parseInt(process.env.REDIS_PORT!),
username: process.env.REDIS_USERNAME,
password: process.env.REDIS_PASSWORD,
tls: process.env.REDIS_TLS === 'true',
},
queue: driver<QueueConfig>(process.env.QUEUE_DRIVER, {
Expand Down
12 changes: 8 additions & 4 deletions apps/platform/src/config/redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,18 @@ import IORedis, { Redis } from 'ioredis'
export interface RedisConfig {
host: string
port: number
username?: string
password?: string
tls: boolean
}

export const DefaultRedis = (config: RedisConfig, extraOptions = {}): Redis => {
export const DefaultRedis = ({ port, host, username, password, tls }: RedisConfig, extraOptions = {}): Redis => {
return new IORedis({
port: config.port,
host: config.host,
tls: config.tls
port,
host,
...username && { username },
...password && { password },
tls: tls
? { rejectUnauthorized: false }
: undefined,
...extraOptions,
Expand Down
2 changes: 1 addition & 1 deletion apps/platform/src/config/stats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export class Stats {

const results = (await multi.exec()) ?? []
return results.map(([_, value]: any, index) => ({
date: new Date(keys[index] * 1000),
date: keys[index] * 1000,
count: parseInt(value ?? 0),
}))
}
Expand Down
7 changes: 0 additions & 7 deletions apps/platform/src/lists/ListEvaluateUserJob.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import App from '../app'
import { cacheIncr } from '../config/redis'
import { Job } from '../queue'
import { JobPriority } from '../queue/Job'
import { getUser } from '../users/UserRepository'
import { DynamicList } from './List'
import { CacheKeys, cleanupList, evaluateUserList, getList } from './ListService'
Expand All @@ -17,12 +16,6 @@ interface ListEvaluateUserParams {
export default class ListEvaluateUserJob extends Job {
static $name = 'list_evaluate_user_job'

options = {
delay: 0,
attempts: 3,
priority: JobPriority.low,
}

static from(params: ListEvaluateUserParams): ListEvaluateUserJob {
return new this(params)
}
Expand Down
32 changes: 31 additions & 1 deletion apps/platform/src/organizations/OrganizationController.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,37 @@ router.get('/performance/jobs', async ctx => {
})

router.get('/performance/jobs/:job', async ctx => {
ctx.body = await App.main.stats.list(ctx.params.job)
const jobName = ctx.params.job

const added = await App.main.stats.list(jobName)
const completed = await App.main.stats.list(`${jobName}:completed`)
const duration = await App.main.stats.list(`${jobName}:duration`)
const average = duration.map((item, index) => {
const count = completed[index]?.count
return {
date: item.date,
count: count ? item.count / count : 0,
}
})

ctx.body = {
throughput: [
{
label: 'added',
data: added,
},
{
label: 'completed',
data: completed,
},
],
timing: [
{
label: 'average',
data: average,
},
],
}
})

router.get('/performance/failed', async ctx => {
Expand Down
2 changes: 1 addition & 1 deletion apps/platform/src/providers/text/TelnyxTextProvider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ export default class TelnyxTextProvider extends TextProvider {
const response = await fetch('https://api.telnyx.com/v2/messages', {
method: 'POST',
headers: {
Authorization: `Basic ${this.api_key}`,
Authorization: `Bearer ${this.api_key}`,
'Content-Type': 'application/json',
'User-Agent': 'parcelvoy/v1 (+https://github.com/parcelvoy/platform)',
},
Expand Down
8 changes: 6 additions & 2 deletions apps/platform/src/queue/Queue.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,10 @@ export default class Queue {
App.main.error.notify(new Error(`No handler found for job: ${job.name}`))
}

const start = Date.now()
await this.started(job)
await handler(job.data, job)
await this.completed(job)
await this.completed(job, Date.now() - start)
return true
}

Expand Down Expand Up @@ -80,8 +81,11 @@ export default class Queue {
App.main.error.notify(error, job)
}

async completed(job: EncodedJob) {
async completed(job: EncodedJob, duration: number) {
logger.trace(job, 'queue:job:completed')

await App.main.stats.increment(`${job.name}:completed`)
await App.main.stats.increment(`${job.name}:duration`, duration)
}

async start() {
Expand Down
15 changes: 14 additions & 1 deletion apps/platform/src/utilities/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -224,9 +224,22 @@ export const chunk = async <T>(
modifier: (result: any) => T = (result) => result,
) => {
const chunker = new Chunker(callback, size)
const handler = async (result: any, retries = 3) => {
try {
await chunker.add(modifier(result))
} catch (error: any) {

// In the case of deadlocks, retry the operation
if (['ER_LOCK_WAIT_TIMEOUT', 'ER_LOCK_DEADLOCK'].includes(error.code) && retries > 0) {
setTimeout(() => handler(result, retries - 1), 250)
} else {
throw error
}
}
}
await query.stream(async function(stream) {
for await (const result of stream) {
await chunker.add(modifier(result))
await handler(result)
}
})
await chunker.flush()
Expand Down
Loading

0 comments on commit c699033

Please sign in to comment.