Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: remove logs from embedder #9718

Merged
merged 1 commit into from
May 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions packages/embedder/EmbeddingsJobQueueStream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ import ms from 'ms'
import sleep from 'parabol-client/utils/sleep'
import 'parabol-server/initSentry'
import getKysely from 'parabol-server/postgres/getKysely'
import {Logger} from '../server/utils/Logger'
import {WorkflowOrchestrator} from './WorkflowOrchestrator'
import {DBJob} from './custom'

Expand Down Expand Up @@ -47,7 +46,6 @@ export class EmbeddingsJobQueueStream implements AsyncIterableIterator<DBJob> {
try {
const job = (await getJob(false)) || (await getJob(true))
if (!job) {
Logger.log('JobQueueStream: no jobs found')
// queue is empty, so sleep for a while
await sleep(ms('10s'))
return this.next()
Expand Down
10 changes: 7 additions & 3 deletions packages/embedder/JobQueueError.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,19 @@ export class JobQueueError extends Error {
jobData?: Record<string, any>

constructor(
message: string,
message: string | Error,
retryDelay?: number,
maxRetries?: number,
jobData?: Record<string, any>
) {
super(message)
this.message = message
const strMessage = message instanceof Error ? message.message : message
super(strMessage)
this.message = strMessage
this.retryDelay = retryDelay
this.maxRetries = maxRetries
this.jobData = jobData
if (message instanceof Error) {
this.stack = message.stack
}
}
}
2 changes: 0 additions & 2 deletions packages/embedder/WorkflowOrchestrator.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import {sql} from 'kysely'
import RootDataLoader from 'parabol-server/dataloader/RootDataLoader'
import getKysely from 'parabol-server/postgres/getKysely'
import {Logger} from '../server/utils/Logger'
import {EmbedderJobType} from './EmbedderJobType'
import {JobQueueError} from './JobQueueError'
import {DBJob, JobType, Workflow} from './custom'
Expand Down Expand Up @@ -37,7 +36,6 @@ export class WorkflowOrchestrator {
}

private failJob = async (jobId: number, retryCount: number, error: JobQueueError) => {
Logger.error(error)
const pg = getKysely()
const {message, retryDelay, jobData} = error
const maxRetries = error.maxRetries ?? 10
Expand Down
1 change: 1 addition & 0 deletions packages/embedder/ai_models/TextEmbeddingsInference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ export class TextEmbeddingsInference extends AbstractEmbeddingsModel {
this.client = client
}
async getTokens(content: string) {
if (!content) return []
const {data, error} = await this.client.POST('/tokenize', {
body: {add_special_tokens: true, inputs: content}
})
Expand Down
1 change: 0 additions & 1 deletion packages/embedder/embedder.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ const run = async () => {
const counter = logPerformance(10, 6)

for await (const _ of streams) {
// Logger.log(`Worker ${idx} finished job ${message.id}`)
counter.i++
}

Expand Down
13 changes: 7 additions & 6 deletions packages/embedder/indexing/retrospectiveDiscussionTopic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ async function formatThread(
? 'Anonymous'
: await getPreferredNameByUserId(comment.createdBy, dataLoader)
const how = depth === 0 ? 'wrote' : 'replied'
const content = comment.plaintextContent.slice(0, MAX_TEXT_LENGTH)
const content = comment.plaintextContent?.slice(0, MAX_TEXT_LENGTH)
if (!content) return ''
const formattedPost = `${indent}- ${author} ${how}, "${content}"\n`

// Recursively format child threads
Expand Down Expand Up @@ -98,14 +99,14 @@ export const createTextFromRetrospectiveDiscussionTopic = async (
if (prompt.description) markdown += `: ${prompt.description}`
markdown += `".\n`
}
for (const reflection of reflections.filter((r) => r.promptId === prompt.id)) {
const matchingReflections = reflections.filter((r) => r.promptId === prompt.id)
for (const reflection of matchingReflections) {
const content = reflection.plaintextContent?.slice(0, MAX_TEXT_LENGTH)
if (!content) continue
const author = newMeeting.disableAnonymity
? await getPreferredNameByUserId(reflection.creatorId, dataLoader)
: 'Anonymous'
markdown += ` - ${author} wrote, "${reflection.plaintextContent.slice(
0,
MAX_TEXT_LENGTH
)}"\n`
markdown += ` - ${author} wrote, "${content}"\n`
}
markdown += `\n`
}
Expand Down
7 changes: 6 additions & 1 deletion packages/embedder/logPerformance.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ export const logPerformance = (logEvery: number, resetEvery: number) => {
const counter = {i: 0}
let start = performance.now()
let logs = 0
let lastJobs = 0
setInterval(() => {
const duration = performance.now() - start
Logger.log(`Jobs per second: ${Math.round((counter.i / duration) * 1000)}`)
const jobs = Math.round((counter.i / duration) * 1000)
if (jobs !== lastJobs) {
Logger.log(`Jobs per second: ${jobs}`)
}
lastJobs = jobs
if (++logs >= resetEvery) {
counter.i = 0
logs = 0
Expand Down
4 changes: 1 addition & 3 deletions packages/embedder/workflows/embedMetadata.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,7 @@ export const embedMetadata: JobQueueStepRun<
.where('id', '=', embeddingsMetadataId)
.execute()
} catch (e) {
// get the trace since the error message may be unobvious
console.trace(e)
return new JobQueueError(`unable to create embedding text: ${e}`, undefined, 0, {
return new JobQueueError(e as Error, undefined, 0, {
forceBuildText: true
})
}
Expand Down
4 changes: 1 addition & 3 deletions packages/embedder/workflows/rerankRetroTopics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,7 @@ export const rerankRetroTopics: JobQueueStepRun<{
const {body} = await createEmbeddingTextFrom(metadata, dataLoader, true)
rerankText = body
} catch (e) {
// get the trace since the error message may be unobvious
console.trace(e)
return new JobQueueError(`unable to create embedding text: ${e}`)
return new JobQueueError(e as Error)
}

const modelManager = getModelManager()
Expand Down
Loading