Skip to content

Commit

Permalink
refactor(backend): ノートのエクスポート処理でStreams APIを使うように (#13465)
Browse files Browse the repository at this point in the history
* refactor(backend): ノートのエクスポート処理でStreams APIを使うように

* fixup! refactor(backend): ノートのエクスポート処理でStreams APIを使うように

`await`忘れにより、ジョブがすぐに完了したことになり削除されてしまっていた。
それによって、`NoteStream`内での`updateProgress`メソッドの呼び出しで、`Missing key for job`のエラーが発生することがあった。

---------

Co-authored-by: syuilo <4439005+syuilo@users.noreply.github.com>
  • Loading branch information
okayurisotto and syuilo authored Feb 28, 2024
1 parent 0d47877 commit b7d9d16
Show file tree
Hide file tree
Showing 3 changed files with 146 additions and 79 deletions.
31 changes: 31 additions & 0 deletions packages/backend/src/misc/FileWriterStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import * as fs from 'node:fs/promises';
import type { PathLike } from 'node:fs';

/**
* `fs.createWriteStream()`相当のことを行う`WritableStream` (Web標準)
*/
export class FileWriterStream extends WritableStream<Uint8Array> {
constructor(path: PathLike) {
let file: fs.FileHandle | null = null;

super({
start: async () => {
file = await fs.open(path, 'a');
},
write: async (chunk, controller) => {
if (file === null) {
controller.error();
throw new Error();
}

await file.write(chunk);
},
close: async () => {
await file?.close();
},
abort: async () => {
await file?.close();
},
});
}
}
30 changes: 30 additions & 0 deletions packages/backend/src/misc/JsonArrayStream.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import { TransformStream } from 'node:stream/web';

/**
* ストリームに流れてきた各データについて`JSON.stringify()`した上で、それらを一つの配列にまとめる
*/
export class JsonArrayStream extends TransformStream<unknown, string> {
constructor() {
/** 最初の要素かどうかを変数に記録 */
let isFirst = true;

super({
start(controller) {
controller.enqueue('[');
},
flush(controller) {
controller.enqueue(']');
},
transform(chunk, controller) {
if (isFirst) {
isFirst = false;
} else {
// 妥当なJSON配列にするためには最初以外の要素の前に`,`を挿入しなければならない
controller.enqueue(',\n');
}

controller.enqueue(JSON.stringify(chunk));
},
});
}
}
164 changes: 85 additions & 79 deletions packages/backend/src/queue/processors/ExportNotesProcessorService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
* SPDX-License-Identifier: AGPL-3.0-only
*/

import * as fs from 'node:fs';
import { ReadableStream, TextEncoderStream } from 'node:stream/web';
import { Inject, Injectable } from '@nestjs/common';
import { MoreThan } from 'typeorm';
import { format as dateFormat } from 'date-fns';
Expand All @@ -18,10 +18,82 @@ import { bindThis } from '@/decorators.js';
import { DriveFileEntityService } from '@/core/entities/DriveFileEntityService.js';
import { Packed } from '@/misc/json-schema.js';
import { IdService } from '@/core/IdService.js';
import { JsonArrayStream } from '@/misc/JsonArrayStream.js';
import { FileWriterStream } from '@/misc/FileWriterStream.js';
import { QueueLoggerService } from '../QueueLoggerService.js';
import type * as Bull from 'bullmq';
import type { DbJobDataWithUser } from '../types.js';

class NoteStream extends ReadableStream<Record<string, unknown>> {
constructor(
job: Bull.Job,
notesRepository: NotesRepository,
pollsRepository: PollsRepository,
driveFileEntityService: DriveFileEntityService,
idService: IdService,
userId: string,
) {
let exportedNotesCount = 0;
let cursor: MiNote['id'] | null = null;

const serialize = (
note: MiNote,
poll: MiPoll | null,
files: Packed<'DriveFile'>[],
): Record<string, unknown> => {
return {
id: note.id,
text: note.text,
createdAt: idService.parse(note.id).date.toISOString(),
fileIds: note.fileIds,
files: files,
replyId: note.replyId,
renoteId: note.renoteId,
poll: poll,
cw: note.cw,
visibility: note.visibility,
visibleUserIds: note.visibleUserIds,
localOnly: note.localOnly,
reactionAcceptance: note.reactionAcceptance,
};
};

super({
async pull(controller): Promise<void> {
const notes = await notesRepository.find({
where: {
userId,
...(cursor !== null ? { id: MoreThan(cursor) } : {}),
},
take: 100, // 100件ずつ取得
order: { id: 1 },
});

if (notes.length === 0) {
job.updateProgress(100);
controller.close();
}

cursor = notes.at(-1)?.id ?? null;

for (const note of notes) {
const poll = note.hasPoll
? await pollsRepository.findOneByOrFail({ noteId: note.id }) // N+1
: null;
const files = await driveFileEntityService.packManyByIds(note.fileIds); // N+1
const content = serialize(note, poll, files);

controller.enqueue(content);
exportedNotesCount++;
}

const total = await notesRepository.countBy({ userId });
job.updateProgress(exportedNotesCount / total);
},
});
}
}

@Injectable()
export class ExportNotesProcessorService {
private logger: Logger;
Expand Down Expand Up @@ -59,67 +131,19 @@ export class ExportNotesProcessorService {
this.logger.info(`Temp file is ${path}`);

try {
const stream = fs.createWriteStream(path, { flags: 'a' });

const write = (text: string): Promise<void> => {
return new Promise<void>((res, rej) => {
stream.write(text, err => {
if (err) {
this.logger.error(err);
rej(err);
} else {
res();
}
});
});
};

await write('[');

let exportedNotesCount = 0;
let cursor: MiNote['id'] | null = null;

while (true) {
const notes = await this.notesRepository.find({
where: {
userId: user.id,
...(cursor ? { id: MoreThan(cursor) } : {}),
},
take: 100,
order: {
id: 1,
},
}) as MiNote[];

if (notes.length === 0) {
job.updateProgress(100);
break;
}

cursor = notes.at(-1)?.id ?? null;
// メモリが足りなくならないようにストリームで処理する
await new NoteStream(
job,
this.notesRepository,
this.pollsRepository,
this.driveFileEntityService,
this.idService,
user.id,
)
.pipeThrough(new JsonArrayStream())
.pipeThrough(new TextEncoderStream())
.pipeTo(new FileWriterStream(path));

for (const note of notes) {
let poll: MiPoll | undefined;
if (note.hasPoll) {
poll = await this.pollsRepository.findOneByOrFail({ noteId: note.id });
}
const files = await this.driveFileEntityService.packManyByIds(note.fileIds);
const content = JSON.stringify(this.serialize(note, poll, files));
const isFirst = exportedNotesCount === 0;
await write(isFirst ? content : ',\n' + content);
exportedNotesCount++;
}

const total = await this.notesRepository.countBy({
userId: user.id,
});

job.updateProgress(exportedNotesCount / total);
}

await write(']');

stream.end();
this.logger.succ(`Exported to: ${path}`);

const fileName = 'notes-' + dateFormat(new Date(), 'yyyy-MM-dd-HH-mm-ss') + '.json';
Expand All @@ -130,22 +154,4 @@ export class ExportNotesProcessorService {
cleanup();
}
}

private serialize(note: MiNote, poll: MiPoll | null = null, files: Packed<'DriveFile'>[]): Record<string, unknown> {
return {
id: note.id,
text: note.text,
createdAt: this.idService.parse(note.id).date.toISOString(),
fileIds: note.fileIds,
files: files,
replyId: note.replyId,
renoteId: note.renoteId,
poll: poll,
cw: note.cw,
visibility: note.visibility,
visibleUserIds: note.visibleUserIds,
localOnly: note.localOnly,
reactionAcceptance: note.reactionAcceptance,
};
}
}

0 comments on commit b7d9d16

Please sign in to comment.