Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

13-6 [BE] [논문 상세 - 논문 정보] Request batch 작업 #82

Merged
merged 13 commits into from
Dec 2, 2022
7 changes: 7 additions & 0 deletions backend/src/batch/batch.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export const TIME_INTERVAL = 3 * 1000;
export const MAX_RETRY = 3;
export const MAX_DEPTH = 1;
export const SEARCH_BATCH_SIZE = 10;
export const DOI_BATCH_SIZE = 40;
export const DOI_REGEXP = new RegExp(/^[\d]{2}\.[\d]{1,}\/.*/);
export const ALLOW_UPDATE = process.env.ALLOW_UPDATE ? (eval(process.env.ALLOW_UPDATE) as boolean) : false;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Time_INTERVAL 이나, MAX_DEPTH같이 추후에 변동이 될 수 있는 요소들을 변수로 관리를 할 수 있게 하는 부분이 참 좋은 것 같습니다.

20 changes: 20 additions & 0 deletions backend/src/batch/batch.queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import Redis from 'ioredis';
Copy link
Collaborator

@JunYupK JunYupK Dec 1, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Redis를 이용한 queue 군요 redis 자체를 큐로 관리하는 것은 생각 못해봤습니다.


export class RedisQueue {
constructor(private redis: Redis, public name: string) {}
async push(value: string, pushLeft = false) {
if (pushLeft) {
await this.redis.lpush(this.name, value);
} else {
await this.redis.rpush(this.name, value);
}

return value;
}
async pop(count = 1) {
return await this.redis.lpop(this.name, count);
}
async size() {
return await this.redis.llen(this.name);
}
}
60 changes: 60 additions & 0 deletions backend/src/batch/batch.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { InjectRedis } from '@liaoliaots/nestjs-redis';
import { HttpService } from '@nestjs/axios';
import { Interval } from '@nestjs/schedule';
import Redis from 'ioredis';
import { SearchService } from 'src/search/search.service';
import { TIME_INTERVAL, SEARCH_BATCH_SIZE, DOI_BATCH_SIZE } from './batch.config';
import { DoiBatcher } from './batcher.doi';
import { SearchBatcher } from './batcher.search';

export class BatchService {
searchBatcher: SearchBatcher;
doiBatcher: DoiBatcher;
constructor(
@InjectRedis() private readonly redis: Redis,
private readonly httpService: HttpService,
private readonly searchService: SearchService,
) {
this.searchBatcher = new SearchBatcher(
this.redis,
this.httpService.axiosRef,
this.searchService,
'url batch queue',
);
this.doiBatcher = new DoiBatcher(this.redis, this.httpService.axiosRef, this.searchService, 'paper batch queue');
}
keywordToRedisKey(keyword: string) {
return `s:${keyword}`;
}
async keywordExist(keyword: string) {
const key = this.keywordToRedisKey(keyword);
return (await this.redis.ttl(key)) >= 0;
}
async setKeyword(keyword: string) {
if (await this.keywordExist(keyword)) return false;
const key = this.keywordToRedisKey(keyword);
this.redis.set(key, 1);
this.redis.expire(key, 60 * 60 * 24);
this.searchBatcher.pushToQueue(0, 0, -1, true, keyword);
return true;
}
async setDoi(doi: string) {
this.doiBatcher.pushToQueue(0, 0, -1, true, doi);
}

@Interval(TIME_INTERVAL)
async batchSearchQueue(batchSize = SEARCH_BATCH_SIZE) {
const referencesDoiWithDepth = await this.searchBatcher.runBatch(batchSize);
referencesDoiWithDepth?.forEach((v) => {
this.doiBatcher.pushToQueue(0, v.depth + 1, -1, false, v.doi);
});
}

@Interval(TIME_INTERVAL)
async batchDoiQueue(batchSize = DOI_BATCH_SIZE) {
const referencesDoiWithDepth = await this.doiBatcher.runBatch(batchSize);
referencesDoiWithDepth?.forEach((v) => {
this.doiBatcher.pushToQueue(0, v.depth + 1, -1, false, v.doi);
});
}
}
171 changes: 171 additions & 0 deletions backend/src/batch/batcher.abstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import { GetGetResult } from '@elastic/elasticsearch/lib/api/types';
import { Injectable } from '@nestjs/common';
import { AxiosError, AxiosInstance, AxiosResponse } from 'axios';
import Redis from 'ioredis';
import {
CrossRefItem,
CrossRefPaperResponse,
CrossRefResponse,
PaperInfoDetail,
ReferenceInfo,
} from 'src/search/entities/crossRef.entity';
import { SearchService } from 'src/search/search.service';
import { ALLOW_UPDATE, MAX_DEPTH } from './batch.config';
import { RedisQueue } from './batch.queue';

export interface QueueItemParsed {
retries: number;
depth: number;
pagesLeft: number;
url: string;
}
export interface UrlParams {
doi?: string;
keyword?: string;
cursor?: string;
}

type CrossRefAble = CrossRefResponse | CrossRefPaperResponse;

@Injectable()
export abstract class Batcher {
queue: RedisQueue;
// failedQueue: RedisQueue;
constructor(
private readonly redis: Redis,
private readonly axios: AxiosInstance,
readonly searchService: SearchService,
readonly name: string,
) {
this.queue = new RedisQueue(this.redis, name);
// this.failedQueue = new RedisQueue(this.redis, name);
}
abstract makeUrl(...params: string[]): string;
abstract getParamsFromUrl(url: string): UrlParams;
abstract onFulfilled(
item: QueueItemParsed,
params: UrlParams,
res: AxiosResponse<CrossRefAble, any>,
i?: number,
): { papers: PaperInfoDetail[]; referenceDOIs: string[] }; // paper들의 reference들에 대한 doi 목록
abstract onRejected(item: QueueItemParsed, params: UrlParams, res?: PromiseRejectedResult, i?: number): any;
abstract validateBatchItem(item: QueueItemParsed): boolean;

parseQueueItem(value: string) {
const splits = value.split(':');
return {
retries: parseInt(splits[0]),
depth: parseInt(splits[1]),
pagesLeft: parseInt(splits[2]),
url: splits.slice(3).join(':'),
} as QueueItemParsed;
}

pushToQueue(retries = 0, depth = 0, page = -1, pushLeft = false, ...params: string[]) {
const url = this.makeUrl(...params);
this.queue.push(`${retries}:${depth}:${page}:${url}`, pushLeft);
}
async batchLog(queue: RedisQueue, batched: string[]) {
const urlQueueSize = await queue.size();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue가 redis 기반이다 보니 queue의 사이즈를 알아야 할때도 redis에 접근을 해야 하군요.
이러한 점은 단점이 될 수도 있겠네요

const batchedSize = batched?.length || 0;
(urlQueueSize || batchedSize) && console.log(`${queue.name} size`, urlQueueSize, ', batch size ', batchedSize);
}
fetchCrossRef<T = CrossRefAble>(url: string) {
return this.axios.get<T>(url);
}

async runBatch(batchSize: number) {
const queue = this.queue;
// const failedQueue = this.failedQueue;
const batched = await queue.pop(batchSize);

// await this.batchLog(queue, batched);
if (!batched) return;
const items = batched.map((item) => this.parseQueueItem(item)).filter((item) => this.validateBatchItem(item));
const responses = await this.batchRequest(items);
const { papers, doiWithDepth } = this.responsesParser(items, responses);
const bulkPapers = await this.makeBulkIndex(papers);
this.doBulkIndex(bulkPapers);
return doiWithDepth;
}
batchRequest<T = CrossRefAble>(items: QueueItemParsed[]) {
return Promise.allSettled(items.map((item) => this.fetchCrossRef<T>(item.url)));
}

responsesParser(items: QueueItemParsed[], responses: PromiseSettledResult<AxiosResponse<CrossRefAble, any>>[]) {
return responses
.map((res, i) => {
const params = this.getParamsFromUrl(items[i].url);
if (res.status === 'fulfilled') {
return this.onFulfilled(items[i], params, res.value, i);
} else {
const error = res.reason as AxiosError;
// Resource not found.
if (error.response?.status === 404) return;

// timeout of 20000ms exceeded
this.onRejected(items[i], params, res, i);
}
})
.reduce(
(acc, cur, i) => {
if (cur?.papers) Array.prototype.push.apply(acc.papers, cur.papers);
if (cur?.referenceDOIs) {
const doiWithDepth = cur.referenceDOIs.map((doi) => {
return { doi, depth: items[i].depth };
});
Array.prototype.push.apply(acc.doiWithDepth, doiWithDepth);
}
return acc;
},
{ papers: [], doiWithDepth: [] as { doi: string; depth: number }[] },
);
}

async makeBulkIndex(papers: PaperInfoDetail[]): Promise<PaperInfoDetail[]> {
if (ALLOW_UPDATE) return papers;
const dois = papers.map((paper) => {
return paper.doi;
});
const { docs } = await this.searchService.multiGet(dois);
const indexes = docs
.map((doc, i) => {
if ((doc as GetGetResult).found) return;
return papers[i];
})
.filter(Boolean);

// console.log(`${this.queue.name} skipped papers:`, papers.length - indexes.length);
return indexes;
}
doBulkIndex(papers: PaperInfoDetail[]) {
return this.searchService.bulkInsert(papers);
}

getPapersToRequest(item: CrossRefItem, depth: number) {
const hasHope = this.paperHasInformation(item);
const papers: string[] = [];
if (hasHope && depth < MAX_DEPTH) {
if (item.DOI) {
papers.push(item.DOI);
}
item.reference?.forEach((ref) => {
// doi가 있는 reference에 대해 paperQueue에 집어넣는다.
if (this.referenceHasInformation(ref)) {
// return [ref.DOI, 0, depth + 1];
papers.push(ref.DOI);
}
});
}
return papers;
}

paperHasInformation(paper: CrossRefItem) {
// DOI와 제목이 있는 논문만 db에 저장한다.
return paper.DOI && paper.title;
}

referenceHasInformation(reference: ReferenceInfo) {
return reference['DOI'];
}
}
50 changes: 50 additions & 0 deletions backend/src/batch/batcher.doi.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { AxiosInstance, AxiosResponse } from 'axios';
import Redis from 'ioredis';
import { CrossRefPaperResponse, PaperInfoDetail } from 'src/search/entities/crossRef.entity';
import { SearchService } from 'src/search/search.service';
import { CROSSREF_API_PAPER_URL } from 'src/util';
import { DOI_REGEXP, MAX_RETRY } from './batch.config';
import { Batcher, UrlParams, QueueItemParsed } from './batcher.abstract';

export class DoiBatcher extends Batcher {
constructor(redis: Redis, axios: AxiosInstance, searchService: SearchService, name: string) {
super(redis, axios, searchService, name);
}
makeUrl(doi: string) {
return CROSSREF_API_PAPER_URL(doi);
}
getParamsFromUrl(url: string): UrlParams {
const u = new URL(url);
const doi = u.pathname.replace(/\/works\//, '');
return { doi };
}
validateBatchItem(item: QueueItemParsed): boolean {
const { doi } = this.getParamsFromUrl(item.url);
// DOI 대문자일 경우 검색 안 되는 경우 발생
item.url = item.url.toLowerCase();
return DOI_REGEXP.test(doi);
}
onFulfilled(
item: QueueItemParsed,
params: UrlParams,
res: AxiosResponse<CrossRefPaperResponse, any>,
): { papers: PaperInfoDetail[]; referenceDOIs: string[] } {
const paper = res.data.message;
const { depth } = item;
const referenceDOIs = this.getPapersToRequest(paper, depth);
const p = this.searchService.parsePaperInfoDetail(paper);
return { papers: [p], referenceDOIs };
}

onRejected(item: QueueItemParsed, params: UrlParams) {
const { doi } = params;
if (item.retries + 1 > MAX_RETRY) {
// this.failedQueue.push(item.url);
return;
}
console.log('error', item.url);
item.retries++;
this.pushToQueue(item.retries + 1, item.depth, item.pagesLeft - 1, false, doi);
return;
}
}
57 changes: 57 additions & 0 deletions backend/src/batch/batcher.search.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { AxiosInstance, AxiosResponse } from 'axios';
import Redis from 'ioredis';
import { CrossRefResponse } from 'src/search/entities/crossRef.entity';
import { SearchService } from 'src/search/search.service';
import { CROSSREF_API_URL_CURSOR, MAX_ROWS } from 'src/util';
import { MAX_RETRY } from './batch.config';
import { Batcher, QueueItemParsed, UrlParams } from './batcher.abstract';

export class SearchBatcher extends Batcher {
constructor(redis: Redis, axios: AxiosInstance, searchService: SearchService, name: string) {
super(redis, axios, searchService, name);
}

makeUrl(keyword: string, cursor: string) {
return CROSSREF_API_URL_CURSOR(keyword, cursor);
}

getParamsFromUrl(url: string): UrlParams {
const u = new URL(url);
const params = new URLSearchParams(u.search);
const keyword = params.get('query');
const cursor = params.get('cursor') || '*';
return { keyword, cursor };
}
validateBatchItem(item: QueueItemParsed): boolean {
return true;
}
onFulfilled(item: QueueItemParsed, params: UrlParams, res: AxiosResponse<CrossRefResponse, any>) {
const { cursor: presentCursor, keyword } = params;
if (presentCursor === '*') {
const cursor = res.data.message['next-cursor'];
const maxPage = Math.floor(res.data.message['total-results'] / MAX_ROWS);
this.pushToQueue(0, item.depth + 1, maxPage, true, keyword, cursor);
} else if (item.pagesLeft > 0) {
this.pushToQueue(0, item.depth, item.pagesLeft - 1, false, keyword, presentCursor);
}
const referenceDOIs = res.data.message.items.flatMap((paper) => {
return this.getPapersToRequest(paper, item.depth);
});
const papers = res.data.message.items.map((paper) => {
return this.searchService.parsePaperInfoDetail(paper);
});
return { papers, referenceDOIs };
}

onRejected(item: QueueItemParsed, params: UrlParams) {
const { keyword, cursor } = params;
if (item.retries + 1 > MAX_RETRY) {
// this.failedQueue.push(item.url);
return;
}
console.log('error', item.url);
item.retries++;
this.pushToQueue(item.retries + 1, item.depth, item.pagesLeft - 1, false, keyword, cursor);
return;
}
}
Loading