Skip to content

Commit

Permalink
13-6 [BE] [논문 상세 - 논문 정보] Request batch 작업 (#82)
Browse files Browse the repository at this point in the history
  • Loading branch information
leesungbin committed Dec 2, 2022
1 parent a34f12b commit f807ca1
Show file tree
Hide file tree
Showing 15 changed files with 520 additions and 134 deletions.
7 changes: 7 additions & 0 deletions backend/src/batch/batch.config.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
export const TIME_INTERVAL = 3 * 1000;
export const MAX_RETRY = 3;
export const MAX_DEPTH = 1;
export const SEARCH_BATCH_SIZE = 10;
export const DOI_BATCH_SIZE = 40;
export const DOI_REGEXP = new RegExp(/^[\d]{2}\.[\d]{1,}\/.*/);
export const ALLOW_UPDATE = process.env.ALLOW_UPDATE ? (eval(process.env.ALLOW_UPDATE) as boolean) : false;
20 changes: 20 additions & 0 deletions backend/src/batch/batch.queue.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import Redis from 'ioredis';

export class RedisQueue {
constructor(private redis: Redis, public name: string) {}
async push(value: string, pushLeft = false) {
if (pushLeft) {
await this.redis.lpush(this.name, value);
} else {
await this.redis.rpush(this.name, value);
}

return value;
}
async pop(count = 1) {
return await this.redis.lpop(this.name, count);
}
async size() {
return await this.redis.llen(this.name);
}
}
60 changes: 60 additions & 0 deletions backend/src/batch/batch.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import { InjectRedis } from '@liaoliaots/nestjs-redis';
import { HttpService } from '@nestjs/axios';
import { Interval } from '@nestjs/schedule';
import Redis from 'ioredis';
import { SearchService } from 'src/search/search.service';
import { TIME_INTERVAL, SEARCH_BATCH_SIZE, DOI_BATCH_SIZE } from './batch.config';
import { DoiBatcher } from './batcher.doi';
import { SearchBatcher } from './batcher.search';

export class BatchService {
searchBatcher: SearchBatcher;
doiBatcher: DoiBatcher;
constructor(
@InjectRedis() private readonly redis: Redis,
private readonly httpService: HttpService,
private readonly searchService: SearchService,
) {
this.searchBatcher = new SearchBatcher(
this.redis,
this.httpService.axiosRef,
this.searchService,
'url batch queue',
);
this.doiBatcher = new DoiBatcher(this.redis, this.httpService.axiosRef, this.searchService, 'paper batch queue');
}
keywordToRedisKey(keyword: string) {
return `s:${keyword}`;
}
async keywordExist(keyword: string) {
const key = this.keywordToRedisKey(keyword);
return (await this.redis.ttl(key)) >= 0;
}
async setKeyword(keyword: string) {
if (await this.keywordExist(keyword)) return false;
const key = this.keywordToRedisKey(keyword);
this.redis.set(key, 1);
this.redis.expire(key, 60 * 60 * 24);
this.searchBatcher.pushToQueue(0, 0, -1, true, keyword);
return true;
}
async setDoi(doi: string) {
this.doiBatcher.pushToQueue(0, 0, -1, true, doi);
}

@Interval(TIME_INTERVAL)
async batchSearchQueue(batchSize = SEARCH_BATCH_SIZE) {
const referencesDoiWithDepth = await this.searchBatcher.runBatch(batchSize);
referencesDoiWithDepth?.forEach((v) => {
this.doiBatcher.pushToQueue(0, v.depth + 1, -1, false, v.doi);
});
}

@Interval(TIME_INTERVAL)
async batchDoiQueue(batchSize = DOI_BATCH_SIZE) {
const referencesDoiWithDepth = await this.doiBatcher.runBatch(batchSize);
referencesDoiWithDepth?.forEach((v) => {
this.doiBatcher.pushToQueue(0, v.depth + 1, -1, false, v.doi);
});
}
}
171 changes: 171 additions & 0 deletions backend/src/batch/batcher.abstract.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
import { GetGetResult } from '@elastic/elasticsearch/lib/api/types';
import { Injectable } from '@nestjs/common';
import { AxiosError, AxiosInstance, AxiosResponse } from 'axios';
import Redis from 'ioredis';
import {
CrossRefItem,
CrossRefPaperResponse,
CrossRefResponse,
PaperInfoDetail,
ReferenceInfo,
} from 'src/search/entities/crossRef.entity';
import { SearchService } from 'src/search/search.service';
import { ALLOW_UPDATE, MAX_DEPTH } from './batch.config';
import { RedisQueue } from './batch.queue';

export interface QueueItemParsed {
retries: number;
depth: number;
pagesLeft: number;
url: string;
}
export interface UrlParams {
doi?: string;
keyword?: string;
cursor?: string;
}

type CrossRefAble = CrossRefResponse | CrossRefPaperResponse;

@Injectable()
export abstract class Batcher {
queue: RedisQueue;
// failedQueue: RedisQueue;
constructor(
private readonly redis: Redis,
private readonly axios: AxiosInstance,
readonly searchService: SearchService,
readonly name: string,
) {
this.queue = new RedisQueue(this.redis, name);
// this.failedQueue = new RedisQueue(this.redis, name);
}
abstract makeUrl(...params: string[]): string;
abstract getParamsFromUrl(url: string): UrlParams;
abstract onFulfilled(
item: QueueItemParsed,
params: UrlParams,
res: AxiosResponse<CrossRefAble, any>,
i?: number,
): { papers: PaperInfoDetail[]; referenceDOIs: string[] }; // paper들의 reference들에 대한 doi 목록
abstract onRejected(item: QueueItemParsed, params: UrlParams, res?: PromiseRejectedResult, i?: number): any;
abstract validateBatchItem(item: QueueItemParsed): boolean;

parseQueueItem(value: string) {
const splits = value.split(':');
return {
retries: parseInt(splits[0]),
depth: parseInt(splits[1]),
pagesLeft: parseInt(splits[2]),
url: splits.slice(3).join(':'),
} as QueueItemParsed;
}

pushToQueue(retries = 0, depth = 0, page = -1, pushLeft = false, ...params: string[]) {
const url = this.makeUrl(...params);
this.queue.push(`${retries}:${depth}:${page}:${url}`, pushLeft);
}
async batchLog(queue: RedisQueue, batched: string[]) {
const urlQueueSize = await queue.size();
const batchedSize = batched?.length || 0;
(urlQueueSize || batchedSize) && console.log(`${queue.name} size`, urlQueueSize, ', batch size ', batchedSize);
}
fetchCrossRef<T = CrossRefAble>(url: string) {
return this.axios.get<T>(url);
}

async runBatch(batchSize: number) {
const queue = this.queue;
// const failedQueue = this.failedQueue;
const batched = await queue.pop(batchSize);

// await this.batchLog(queue, batched);
if (!batched) return;
const items = batched.map((item) => this.parseQueueItem(item)).filter((item) => this.validateBatchItem(item));
const responses = await this.batchRequest(items);
const { papers, doiWithDepth } = this.responsesParser(items, responses);
const bulkPapers = await this.makeBulkIndex(papers);
this.doBulkIndex(bulkPapers);
return doiWithDepth;
}
batchRequest<T = CrossRefAble>(items: QueueItemParsed[]) {
return Promise.allSettled(items.map((item) => this.fetchCrossRef<T>(item.url)));
}

responsesParser(items: QueueItemParsed[], responses: PromiseSettledResult<AxiosResponse<CrossRefAble, any>>[]) {
return responses
.map((res, i) => {
const params = this.getParamsFromUrl(items[i].url);
if (res.status === 'fulfilled') {
return this.onFulfilled(items[i], params, res.value, i);
} else {
const error = res.reason as AxiosError;
// Resource not found.
if (error.response?.status === 404) return;

// timeout of 20000ms exceeded
this.onRejected(items[i], params, res, i);
}
})
.reduce(
(acc, cur, i) => {
if (cur?.papers) Array.prototype.push.apply(acc.papers, cur.papers);
if (cur?.referenceDOIs) {
const doiWithDepth = cur.referenceDOIs.map((doi) => {
return { doi, depth: items[i].depth };
});
Array.prototype.push.apply(acc.doiWithDepth, doiWithDepth);
}
return acc;
},
{ papers: [], doiWithDepth: [] as { doi: string; depth: number }[] },
);
}

async makeBulkIndex(papers: PaperInfoDetail[]): Promise<PaperInfoDetail[]> {
if (ALLOW_UPDATE) return papers;
const dois = papers.map((paper) => {
return paper.doi;
});
const { docs } = await this.searchService.multiGet(dois);
const indexes = docs
.map((doc, i) => {
if ((doc as GetGetResult).found) return;
return papers[i];
})
.filter(Boolean);

// console.log(`${this.queue.name} skipped papers:`, papers.length - indexes.length);
return indexes;
}
doBulkIndex(papers: PaperInfoDetail[]) {
return this.searchService.bulkInsert(papers);
}

getPapersToRequest(item: CrossRefItem, depth: number) {
const hasHope = this.paperHasInformation(item);
const papers: string[] = [];
if (hasHope && depth < MAX_DEPTH) {
if (item.DOI) {
papers.push(item.DOI);
}
item.reference?.forEach((ref) => {
// doi가 있는 reference에 대해 paperQueue에 집어넣는다.
if (this.referenceHasInformation(ref)) {
// return [ref.DOI, 0, depth + 1];
papers.push(ref.DOI);
}
});
}
return papers;
}

paperHasInformation(paper: CrossRefItem) {
// DOI와 제목이 있는 논문만 db에 저장한다.
return paper.DOI && paper.title;
}

referenceHasInformation(reference: ReferenceInfo) {
return reference['DOI'];
}
}
50 changes: 50 additions & 0 deletions backend/src/batch/batcher.doi.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import { AxiosInstance, AxiosResponse } from 'axios';
import Redis from 'ioredis';
import { CrossRefPaperResponse, PaperInfoDetail } from 'src/search/entities/crossRef.entity';
import { SearchService } from 'src/search/search.service';
import { CROSSREF_API_PAPER_URL } from 'src/util';
import { DOI_REGEXP, MAX_RETRY } from './batch.config';
import { Batcher, UrlParams, QueueItemParsed } from './batcher.abstract';

export class DoiBatcher extends Batcher {
constructor(redis: Redis, axios: AxiosInstance, searchService: SearchService, name: string) {
super(redis, axios, searchService, name);
}
makeUrl(doi: string) {
return CROSSREF_API_PAPER_URL(doi);
}
getParamsFromUrl(url: string): UrlParams {
const u = new URL(url);
const doi = u.pathname.replace(/\/works\//, '');
return { doi };
}
validateBatchItem(item: QueueItemParsed): boolean {
const { doi } = this.getParamsFromUrl(item.url);
// DOI 대문자일 경우 검색 안 되는 경우 발생
item.url = item.url.toLowerCase();
return DOI_REGEXP.test(doi);
}
onFulfilled(
item: QueueItemParsed,
params: UrlParams,
res: AxiosResponse<CrossRefPaperResponse, any>,
): { papers: PaperInfoDetail[]; referenceDOIs: string[] } {
const paper = res.data.message;
const { depth } = item;
const referenceDOIs = this.getPapersToRequest(paper, depth);
const p = this.searchService.parsePaperInfoDetail(paper);
return { papers: [p], referenceDOIs };
}

onRejected(item: QueueItemParsed, params: UrlParams) {
const { doi } = params;
if (item.retries + 1 > MAX_RETRY) {
// this.failedQueue.push(item.url);
return;
}
console.log('error', item.url);
item.retries++;
this.pushToQueue(item.retries + 1, item.depth, item.pagesLeft - 1, false, doi);
return;
}
}
57 changes: 57 additions & 0 deletions backend/src/batch/batcher.search.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import { AxiosInstance, AxiosResponse } from 'axios';
import Redis from 'ioredis';
import { CrossRefResponse } from 'src/search/entities/crossRef.entity';
import { SearchService } from 'src/search/search.service';
import { CROSSREF_API_URL_CURSOR, MAX_ROWS } from 'src/util';
import { MAX_RETRY } from './batch.config';
import { Batcher, QueueItemParsed, UrlParams } from './batcher.abstract';

export class SearchBatcher extends Batcher {
constructor(redis: Redis, axios: AxiosInstance, searchService: SearchService, name: string) {
super(redis, axios, searchService, name);
}

makeUrl(keyword: string, cursor: string) {
return CROSSREF_API_URL_CURSOR(keyword, cursor);
}

getParamsFromUrl(url: string): UrlParams {
const u = new URL(url);
const params = new URLSearchParams(u.search);
const keyword = params.get('query');
const cursor = params.get('cursor') || '*';
return { keyword, cursor };
}
validateBatchItem(item: QueueItemParsed): boolean {
return true;
}
onFulfilled(item: QueueItemParsed, params: UrlParams, res: AxiosResponse<CrossRefResponse, any>) {
const { cursor: presentCursor, keyword } = params;
if (presentCursor === '*') {
const cursor = res.data.message['next-cursor'];
const maxPage = Math.floor(res.data.message['total-results'] / MAX_ROWS);
this.pushToQueue(0, item.depth + 1, maxPage, true, keyword, cursor);
} else if (item.pagesLeft > 0) {
this.pushToQueue(0, item.depth, item.pagesLeft - 1, false, keyword, presentCursor);
}
const referenceDOIs = res.data.message.items.flatMap((paper) => {
return this.getPapersToRequest(paper, item.depth);
});
const papers = res.data.message.items.map((paper) => {
return this.searchService.parsePaperInfoDetail(paper);
});
return { papers, referenceDOIs };
}

onRejected(item: QueueItemParsed, params: UrlParams) {
const { keyword, cursor } = params;
if (item.retries + 1 > MAX_RETRY) {
// this.failedQueue.push(item.url);
return;
}
console.log('error', item.url);
item.retries++;
this.pushToQueue(item.retries + 1, item.depth, item.pagesLeft - 1, false, keyword, cursor);
return;
}
}
Loading

0 comments on commit f807ca1

Please sign in to comment.