Skip to content

Commit

Permalink
chore(release): Test v7.30.2 (#9719)
Browse files Browse the repository at this point in the history
Signed-off-by: dependabot[bot] <support@github.com>
Signed-off-by: Matt Krick <matt.krick@gmail.com>
Co-authored-by: Rafa <101704572+rafaelromcar-parabol@users.noreply.github.com>
Co-authored-by: Georg Bremer <github@dschoordsch.de>
Co-authored-by: Nick O'Ferrall <nickoferrall@gmail.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: parabol-release-bot[bot] <150284312+parabol-release-bot[bot]@users.noreply.github.com>
Co-authored-by: Matt Krick <matt.krick@gmail.com>
Co-authored-by: Jordan Husney <jordan.husney@gmail.com>
Co-authored-by: adaniels-parabol <71724289+adaniels-parabol@users.noreply.github.com>
Co-authored-by: snyk-bot <snyk-bot@snyk.io>
Co-authored-by: Dale Bumblis <135627447+dbumblis-parabol@users.noreply.github.com>
Co-authored-by: Bartosz Jarocki <jarocki.bartek@gmail.com>
Co-authored-by: Marcus Wermuth <hello@marcuswermuth.com>
Co-authored-by: Rafael Romero <rafael@parabol.co>
Co-authored-by: Bruce Tian <tianrunhe@gmail.com>
Co-authored-by: GitHub Action <action@github.com>
Co-authored-by: Mohd Muneeb <mr.mohdmuneeb123@gmail.com>
Co-authored-by: Muneeb-Ventures <mohd.muneeb@m0.ventures>
Co-authored-by: github-actions <github-actions@github.com>
  • Loading branch information
19 people authored May 7, 2024
1 parent 9867e76 commit 614333c
Show file tree
Hide file tree
Showing 24 changed files with 150 additions and 117 deletions.
2 changes: 1 addition & 1 deletion .release-please-manifest.json
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
{
".": "7.30.1"
".": "7.30.2"
}
12 changes: 12 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,18 @@ This project adheres to [Semantic Versioning](http://semver.org/).

This CHANGELOG follows conventions [outlined here](http://keepachangelog.com/).

## [7.30.2](https://github.com/ParabolInc/parabol/compare/v7.30.1...v7.30.2) (2024-05-07)


### Fixed

* remove logs from embedder ([#9718](https://github.com/ParabolInc/parabol/issues/9718)) ([93b26bb](https://github.com/ParabolInc/parabol/commit/93b26bb4fd94615d683f1f9fb69386f6104005fd))


### Changed

* Gracefully shutdown the embedder ([#9693](https://github.com/ParabolInc/parabol/issues/9693)) ([695ccad](https://github.com/ParabolInc/parabol/commit/695ccadd5f67e57786b65a036e9368c27c9b619c))

## [7.30.1](https://github.com/ParabolInc/parabol/compare/v7.30.0...v7.30.1) (2024-05-02)


Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"description": "An open-source app for building smarter, more agile teams.",
"author": "Parabol Inc. <love@parabol.co> (http://github.com/ParabolInc)",
"license": "AGPL-3.0",
"version": "7.30.1",
"version": "7.30.2",
"repository": {
"type": "git",
"url": "https://github.com/ParabolInc/parabol"
Expand Down
4 changes: 2 additions & 2 deletions packages/chronos/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "chronos",
"version": "7.30.1",
"version": "7.30.2",
"description": "A cron job scheduler",
"author": "Matt Krick <matt.krick@gmail.com>",
"homepage": "https://github.com/ParabolInc/parabol/tree/master/packages/chronos#readme",
Expand All @@ -25,6 +25,6 @@
},
"dependencies": {
"cron": "^2.3.1",
"parabol-server": "7.30.1"
"parabol-server": "7.30.2"
}
}
2 changes: 1 addition & 1 deletion packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
"description": "An open-source app for building smarter, more agile teams.",
"author": "Parabol Inc. <love@parabol.co> (http://github.com/ParabolInc)",
"license": "AGPL-3.0",
"version": "7.30.1",
"version": "7.30.2",
"repository": {
"type": "git",
"url": "https://github.com/ParabolInc/parabol"
Expand Down
10 changes: 8 additions & 2 deletions packages/embedder/EmbeddingsJobQueueStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import ms from 'ms'
import sleep from 'parabol-client/utils/sleep'
import 'parabol-server/initSentry'
import getKysely from 'parabol-server/postgres/getKysely'
import {Logger} from '../server/utils/Logger'
import {WorkflowOrchestrator} from './WorkflowOrchestrator'
import {DBJob} from './custom'

Expand All @@ -13,10 +12,16 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
}

orchestrator: WorkflowOrchestrator
done: boolean

constructor(orchestrator: WorkflowOrchestrator) {
this.orchestrator = orchestrator
this.done = false
}
async next(): Promise<IteratorResult<DBJob>> {
if (this.done) {
return {done: true as const, value: undefined}
}
const pg = getKysely()
const getJob = (isFailed: boolean) => {
return pg
Expand Down Expand Up @@ -47,7 +52,6 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
try {
const job = (await getJob(false)) || (await getJob(true))
if (!job) {
Logger.log('JobQueueStream: no jobs found')
// queue is empty, so sleep for a while
await sleep(ms('10s'))
return this.next()
Expand All @@ -60,9 +64,11 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
}
}
return() {
this.done = true
return Promise.resolve({done: true as const, value: undefined})
}
throw(error: any) {
this.done = true
return Promise.resolve({done: true as const, value: error})
}
}
10 changes: 7 additions & 3 deletions packages/embedder/JobQueueError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ export class JobQueueError extends Error {
jobData?: Record<string, any>

constructor(
message: string,
message: string | Error,
retryDelay?: number,
maxRetries?: number,
jobData?: Record<string, any>
) {
super(message)
this.message = message
const strMessage = message instanceof Error ? message.message : message
super(strMessage)
this.message = strMessage
this.retryDelay = retryDelay
this.maxRetries = maxRetries
this.jobData = jobData
if (message instanceof Error) {
this.stack = message.stack
}
}
}
2 changes: 0 additions & 2 deletions packages/embedder/WorkflowOrchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {sql} from 'kysely'
import RootDataLoader from 'parabol-server/dataloader/RootDataLoader'
import getKysely from 'parabol-server/postgres/getKysely'
import {Logger} from '../server/utils/Logger'
import {EmbedderJobType} from './EmbedderJobType'
import {JobQueueError} from './JobQueueError'
import {DBJob, JobType, Workflow} from './custom'
Expand Down Expand Up @@ -37,7 +36,6 @@ export class WorkflowOrchestrator {
}

private failJob = async (jobId: number, retryCount: number, error: JobQueueError) => {
Logger.error(error)
const pg = getKysely()
const {message, retryDelay, jobData} = error
const maxRetries = error.maxRetries ?? 10
Expand Down
1 change: 1 addition & 0 deletions packages/embedder/ai_models/TextEmbeddingsInference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class TextEmbeddingsInference extends AbstractEmbeddingsModel {
this.client = client
}
async getTokens(content: string) {
if (!content) return []
const {data, error} = await this.client.POST('/tokenize', {
body: {add_special_tokens: true, inputs: content}
})
Expand Down
11 changes: 6 additions & 5 deletions packages/embedder/embedder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ const run = async () => {
const streams = mergeAsyncIterators(jobQueueStreams)

const kill: NodeJS.SignalsListener = (signal) => {
Logger.log(`Kill signal received: ${signal}`)
primaryLock?.release()
Logger.log(
`Server ID: ${SERVER_ID}. Kill signal received: ${signal}, starting graceful shutdown.`
)
primaryLock?.release().catch(() => {})
streams.return?.()
process.exit()
}
process.on('SIGTERM', kill)
process.on('SIGINT', kill)
Expand All @@ -61,12 +62,12 @@ const run = async () => {
const counter = logPerformance(10, 6)

for await (const _ of streams) {
// Logger.log(`Worker ${idx} finished job ${message.id}`)
counter.i++
}

// On graceful shutdown
Logger.log('Streaming Complete. Goodbye!')
Logger.log(`Server ID: ${SERVER_ID}. Graceful shutdown complete, exiting.`)
process.exit()
}

run()
13 changes: 7 additions & 6 deletions packages/embedder/indexing/retrospectiveDiscussionTopic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ async function formatThread(
? 'Anonymous'
: await getPreferredNameByUserId(comment.createdBy, dataLoader)
const how = depth === 0 ? 'wrote' : 'replied'
const content = comment.plaintextContent.slice(0, MAX_TEXT_LENGTH)
const content = comment.plaintextContent?.slice(0, MAX_TEXT_LENGTH)
if (!content) return ''
const formattedPost = `${indent}- ${author} ${how}, "${content}"\n`

// Recursively format child threads
Expand Down Expand Up @@ -98,14 +99,14 @@ export const createTextFromRetrospectiveDiscussionTopic = async (
if (prompt.description) markdown += `: ${prompt.description}`
markdown += `".\n`
}
for (const reflection of reflections.filter((r) => r.promptId === prompt.id)) {
const matchingReflections = reflections.filter((r) => r.promptId === prompt.id)
for (const reflection of matchingReflections) {
const content = reflection.plaintextContent?.slice(0, MAX_TEXT_LENGTH)
if (!content) continue
const author = newMeeting.disableAnonymity
? await getPreferredNameByUserId(reflection.creatorId, dataLoader)
: 'Anonymous'
markdown += ` - ${author} wrote, "${reflection.plaintextContent.slice(
0,
MAX_TEXT_LENGTH
)}"\n`
markdown += ` - ${author} wrote, "${content}"\n`
}
markdown += `\n`
}
Expand Down
5 changes: 5 additions & 0 deletions packages/embedder/lint-staged.config.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
module.exports = {
'*.{ts,tsx}': ['eslint --fix', 'prettier --config ../../.prettierrc --ignore-path ./.eslintignore --write'],
'*.graphql': ['prettier --config ../../.prettierrc --ignore-path ./.eslintignore --write'],
'**/*.{ts,tsx}': () => 'tsc --noEmit -p tsconfig.json'
}
2 changes: 1 addition & 1 deletion packages/embedder/logMemoryUse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,5 +6,5 @@ export const logMemoryUse = () => {
const {rss} = memoryUsage
const usedMB = Math.floor(rss / MB)
console.log('Memory use:', usedMB, 'MB')
}, 10000)
}, 10000).unref()
}
9 changes: 7 additions & 2 deletions packages/embedder/logPerformance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,19 @@ export const logPerformance = (logEvery: number, resetEvery: number) => {
const counter = {i: 0}
let start = performance.now()
let logs = 0
let lastJobs = 0
setInterval(() => {
const duration = performance.now() - start
Logger.log(`Jobs per second: ${Math.round((counter.i / duration) * 1000)}`)
const jobs = Math.round((counter.i / duration) * 1000)
if (jobs !== lastJobs) {
Logger.log(`Jobs per second: ${jobs}`)
}
lastJobs = jobs
if (++logs >= resetEvery) {
counter.i = 0
logs = 0
start = performance.now()
}
}, logEvery * 1000)
}, logEvery * 1000).unref()
return counter
}
135 changes: 65 additions & 70 deletions packages/embedder/mergeAsyncIterators.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,85 +12,80 @@ type Result<T extends AsyncIterator<any>> = UnYield<Awaited<ReturnType<T['next']

// Promise.race has a memory leak
// To avoid: https://github.com/tc39/proposal-async-iterator-helpers/issues/15#issuecomment-1937011820
export function mergeAsyncIterators<T extends AsyncIterator<any>[] | []>(
iterators: T
): AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result<T[P]>]}[number]> {
return (async function* () {
type ResultThunk = () => [number, Result<T[number]>]
let count = iterators.length as number
let capability: PromiseCapability<ResultThunk | null> | undefined
const queuedResults: ResultThunk[] = []
const getNext = async (idx: number, iterator: T[number]) => {
try {
const next = await iterator.next()
if (next.done) {
if (--count === 0 && capability !== undefined) {
capability.resolve(null)
}
} else {
resolveResult(() => {
void getNext(idx, iterator)
return [idx, next.value]
})
export function mergeAsyncIterators<T extends AsyncIterator<any>[] | []>(iterators: T) {
type ResultThunk = () => [number, Result<T[number]>]
let count = iterators.length as number
let capability: PromiseCapability<ResultThunk | null> | undefined
const queuedResults: ResultThunk[] = []
const getNext = async (idx: number, iterator: T[number]) => {
try {
const next = await iterator.next()
if (next.done) {
if (--count === 0 && capability !== undefined) {
capability.resolve(null)
}
} catch (error) {
} else {
resolveResult(() => {
throw error
void getNext(idx, iterator)
return [idx, next.value]
})
}
} catch (error) {
resolveResult(() => {
throw error
})
}
const resolveResult = (resultThunk: ResultThunk) => {
if (capability === undefined) {
queuedResults.push(resultThunk)
} else {
capability.resolve(resultThunk)
}
}
const resolveResult = (resultThunk: ResultThunk) => {
if (capability === undefined) {
queuedResults.push(resultThunk)
} else {
capability.resolve(resultThunk)
}
}

try {
// Begin all iterators
for (const [idx, iterable] of iterators.entries()) {
void getNext(idx, iterable)
// Begin all iterators
for (const [idx, iterable] of iterators.entries()) {
void getNext(idx, iterable)
}

const it: AsyncIterableIterator<{[P in keyof T]: [ParseInt<`${P}`>, Result<T[P]>]}[number]> = {
[Symbol.asyncIterator]: () => it,
next: async () => {
const nextQueuedResult = queuedResults.shift()
if (nextQueuedResult !== undefined) {
return {done: false as const, value: nextQueuedResult()}
}
if (count === 0) {
return {done: true as const, value: undefined}
}

// Delegate to iterables as results complete
while (true) {
while (true) {
const nextQueuedResult = queuedResults.shift()
if (nextQueuedResult === undefined) {
break
} else {
yield nextQueuedResult()
}
}
if (count === 0) {
break
} else {
// Promise.withResolvers() is not yet implemented in node
capability = {
resolve: undefined as any,
reject: undefined as any,
promise: undefined as any
}
capability.promise = new Promise((res, rej) => {
capability!.resolve = res
capability!.reject = rej
})
const nextResult = await capability.promise
if (nextResult === null) {
break
} else {
capability = undefined
yield nextResult()
}
}
// Promise.withResolvers() is not yet implemented in node
capability = {
resolve: undefined as any,
reject: undefined as any,
promise: undefined as any
}
capability.promise = new Promise((res, rej) => {
capability!.resolve = res
capability!.reject = rej
})
const nextResult = await capability.promise
if (nextResult === null) {
return {done: true as const, value: undefined}
} else {
capability = undefined
return {done: false as const, value: nextResult()}
}
} catch (err) {
// Unwind remaining iterators on failure
try {
await Promise.all(iterators.map((iterator) => iterator.return?.()))
} catch {}
throw err
},
return: async () => {
await Promise.allSettled(iterators.map((iterator) => iterator.return?.()))
return {done: true as const, value: undefined}
},
throw: async () => {
await Promise.allSettled(iterators.map((iterator) => iterator.return?.()))
return {done: true as const, value: undefined}
}
})()
}
return it
}
Loading

0 comments on commit 614333c

Please sign in to comment.