From 3b66d88c5a1fb180011b944f36a2484181e5eaec Mon Sep 17 00:00:00 2001 From: Frank Zhao Date: Sun, 13 Mar 2022 08:53:00 +0800 Subject: [PATCH] feat: add oss client and hypercrx task (#772) Signed-off-by: Frankzhaopku --- package-lock.json | 21 +++++++++++ package.json | 1 + src/config.ts | 17 ++++++--- src/cron/tasks/hacking_force_month.ts | 4 +- src/cron/tasks/hypercrx_actor.ts | 50 +++++++++++++++++++++++++ src/cron/tasks/hypercrx_repo.ts | 53 +++++++++++++++++++++++++++ src/db/clickhouse.ts | 6 +-- src/db/neo4j.ts | 14 ++++--- src/oss/ali.ts | 4 +- src/utils.ts | 14 +++++++ 10 files changed, 167 insertions(+), 17 deletions(-) create mode 100644 src/cron/tasks/hypercrx_actor.ts create mode 100644 src/cron/tasks/hypercrx_repo.ts diff --git a/package-lock.json b/package-lock.json index 97cb733dc..0ed8ee789 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1062,6 +1062,27 @@ "minimist": "^1.1.0" } }, + "p-finally": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/p-finally/-/p-finally-1.0.0.tgz", + "integrity": "sha1-P7z7FbiZpEEjs0ttzBi3JDNqLK4=" + }, + "p-timeout": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/p-timeout/-/p-timeout-3.2.0.tgz", + "integrity": "sha512-rhIwUycgwwKcP9yTOOFK/AKsAopjjCakVqLHePO3CC6Mir1Z99xT+R63jZxAT5lFZLa2inS5h+ZS2GvR99/FBg==", + "requires": { + "p-finally": "^1.0.0" + } + }, + "p-wait-for": { + "version": "3.2.0", + "resolved": "https://registry.npmjs.org/p-wait-for/-/p-wait-for-3.2.0.tgz", + "integrity": "sha512-wpgERjNkLrBiFmkMEjuZJEWKKDrNfHCKA1OhyN1wg1FrLkULbviEy6py1AyJUgZ72YWFbZ38FIpnqvVqAlDUwA==", + "requires": { + "p-timeout": "^3.0.0" + } + }, "pac-proxy-agent": { "version": "5.0.0", "resolved": "https://registry.npmjs.org/pac-proxy-agent/-/pac-proxy-agent-5.0.0.tgz", diff --git a/package.json b/package.json index 9731a652a..15bf334db 100644 --- a/package.json +++ b/package.json @@ -37,6 +37,7 @@ "lodash": "^4.17.21", "neo4j-driver": "^4.4.1", "node-cron": "^3.0.0", + "p-wait-for": "^3.1.0", "parse-neo4j": "^0.6.11", "pope": "^3.0.0", "request": "^2.88.2", diff --git a/src/config.ts b/src/config.ts index 290ce34f1..c5701400d 100644 --- a/src/config.ts +++ b/src/config.ts @@ -1,5 +1,7 @@ import { merge } from "lodash"; +let inited = false; + let config = { general: { owner: 'X-lab2017', @@ -29,8 +31,13 @@ let config = { } }; -import('./local_config').then(localConfig => { - config = merge(config, localConfig.default); -}).catch(() => {}); - -export default () => config; +export default async () => { + if (!inited) { + try { + await import('./local_config').then(localConfig => { + config = merge(config, localConfig.default); + }); + } catch {} + } + return config; +} diff --git a/src/cron/tasks/hacking_force_month.ts b/src/cron/tasks/hacking_force_month.ts index 182395515..773b5f546 100644 --- a/src/cron/tasks/hacking_force_month.ts +++ b/src/cron/tasks/hacking_force_month.ts @@ -7,7 +7,7 @@ import { getClient } from '../../oss/ali'; const task: Task = { cron: '0 0 5 * *', // runs on the 5th day of every month at 00:00 enable: true, - immediate: true, + immediate: false, callback: async () => { const chineseUserIds = getGitHubData([':regions/China']).githubUsers; const q = `MATCH (u:User) WHERE u.id IN [${chineseUserIds.join(',')}] RETURN u;`; @@ -32,7 +32,7 @@ const task: Task = { }); }); }); - const client = getClient(); + const client = await getClient(); await client.put('/hacking_force/total.json', Buffer.from(JSON.stringify(result))); console.log('Run hacking force total task done.'); } diff --git a/src/cron/tasks/hypercrx_actor.ts b/src/cron/tasks/hypercrx_actor.ts new file mode 100644 index 000000000..56df3e8cb --- /dev/null +++ b/src/cron/tasks/hypercrx_actor.ts @@ -0,0 +1,50 @@ +import { existsSync, mkdirSync, writeFileSync } from 'fs'; +import { Task } from '..'; +import { getClient as getNeo4jClient, parseRecord } from '../../db/neo4j'; +import { forEveryMonth } from '../../metrics/basic'; + +const task: Task = { + cron: '0 0 7 * *', // runs on the 5th day of every month at 00:00 + enable: true, + immediate: false, + callback: async () => { + const neo4jClient = await getNeo4jClient(); + const session = neo4jClient.session(); + const now = new Date(); + const lastMonth = new Date(now.getTime() - 30 * 24 * 60 * 60); + const year = lastMonth.getFullYear(), month = lastMonth.getMonth() + 1; + const q = `MATCH (u:User) WHERE u.activity_${year}${month} IS NOT NULL RETURN u;`; + const result = session.run(q); + let count = 0; + result.subscribe({ + onNext: async r => { + const user = parseRecord(r); + const userInfo = { + login: user.login, + id: user.id, + activity: {}, + influence: {}, + }; + forEveryMonth(2015, 1, year, month, (y, m) => { + userInfo.activity[`${y}-${m}`] = user[`activity_${y}${m}`] ?? 0; + userInfo.influence[`${y}-${m}`] = user[`open_rank_${y}${m}`] ?? 0; + }); + const dir = `./local_files/hypercrx_actor/${user.login.charAt(0).toLowerCase()}`; + if (!existsSync(dir)) { + mkdirSync(dir); + } + writeFileSync(`${dir}/${user.login}.json`, JSON.stringify(userInfo)); + count++; + if (count % 10000 === 0) console.log(`Finish write ${count} files.`); + }, + onCompleted: () => { + session.close().then(() => { + console.log(`Process ${count} users done.`); + }); + }, + onError: console.log, + }); + } +}; + +module.exports = task; diff --git a/src/cron/tasks/hypercrx_repo.ts b/src/cron/tasks/hypercrx_repo.ts new file mode 100644 index 000000000..c41c7fd9d --- /dev/null +++ b/src/cron/tasks/hypercrx_repo.ts @@ -0,0 +1,53 @@ +import { writeFileSync, existsSync, mkdirSync } from 'fs'; +import { Task } from '..'; +import { getClient as getNeo4jClient, parseRecord } from '../../db/neo4j'; +import { forEveryMonth } from '../../metrics/basic'; + +const task: Task = { + cron: '0 0 6 * *', // runs on the 5th day of every month at 00:00 + enable: true, + immediate: false, + callback: async () => { + const neo4jClient = await getNeo4jClient(); + const session = neo4jClient.session(); + const now = new Date(); + const lastMonth = new Date(now.getTime() - 30 * 24 * 60 * 60); + const year = lastMonth.getFullYear(), month = lastMonth.getMonth() + 1; + const q = `MATCH (r:Repo) WHERE r.activity_${year}${month} IS NOT NULL RETURN r;`; + const result = session.run(q); + let count = 0; + result.subscribe({ + onNext: async r => { + const repo = parseRecord(r); + const repoInfo = { + id: repo.id, + name: repo.name, + org: repo.org_login, + org_id: repo.org_id, + activity: {}, + influence: {}, + }; + forEveryMonth(2015, 1, year, month, (y, m) => { + repoInfo.activity[`${y}-${m}`] = repo[`activity_${y}${m}`] ?? 0; + repoInfo.influence[`${y}-${m}`] = repo[`open_rank_${y}${m}`] ?? 0; + }); + const [owner, name] = repoInfo.name.split('/'); + const dir = `./local_files/hypercrx_repo/${owner}`; + if (!existsSync(dir)) { + mkdirSync(dir); + } + writeFileSync(`${dir}/${name}.json`, JSON.stringify(repoInfo)); + count++; + if (count % 10000 === 0) console.log(`Finish write ${count} files.`); + }, + onCompleted: () => { + session.close().then(() => { + console.log(`Process ${count} repos done.`); + }); + }, + onError: console.log, + }); + } +}; + +module.exports = task; diff --git a/src/db/clickhouse.ts b/src/db/clickhouse.ts index 2401fc8a9..f10ab0a01 100644 --- a/src/db/clickhouse.ts +++ b/src/db/clickhouse.ts @@ -3,13 +3,13 @@ import getConfig from '../config'; let client; -function getClient() { +async function getClient() { if (client) return client; - client = new ClickHouse(getConfig().db.clickhouse); + client = new ClickHouse((await getConfig()).db.clickhouse); return client; } export async function query(q: string): Promise { - const client = getClient(); + const client = await getClient(); return (await client.querying(q) as any).data; } diff --git a/src/db/neo4j.ts b/src/db/neo4j.ts index b2325745d..7d11aa069 100644 --- a/src/db/neo4j.ts +++ b/src/db/neo4j.ts @@ -4,15 +4,19 @@ import getConfig from '../config'; let driver: any; -function getClient() { +export async function getClient() { if (driver) return driver; - driver = neo4j.driver(getConfig().db.neo4j.host); + driver = neo4j.driver((await getConfig()).db.neo4j.host); return driver; } -export async function query(query: string, params?: any) { - const session = getClient().session(); +export async function query(query: string, params?: any): Promise { + const session = (await getClient()).session(); const r = await session.run(query, params); await session.close(); - return parser.parse(r) as T; + return parser.parse(r) as T[]; +} + +export function parseRecord(record: any): T { + return parser.parseRecord(record)._fields[0]; } diff --git a/src/oss/ali.ts b/src/oss/ali.ts index a2b22a464..7cd1db6e3 100644 --- a/src/oss/ali.ts +++ b/src/oss/ali.ts @@ -3,9 +3,9 @@ import getConfig from '../config'; let client: OSS; -export function getClient(): OSS { +export async function getClient(): Promise { if (client) return client; - const config = getConfig(); + const config = await getConfig(); client = new OSS(config.oss.ali); return client; }; diff --git a/src/utils.ts b/src/utils.ts index 7f7d3dbbf..012fcefaf 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -1,5 +1,6 @@ import { existsSync, readFileSync } from "fs"; import { load } from 'js-yaml'; +import pWaitFor from 'p-wait-for'; export function readFileAsObj(path: string) { if (!existsSync(path)) { @@ -25,3 +26,16 @@ export function readFileAsObj(path: string) { } return null; } + +export async function waitFor(mill: number): Promise { + return new Promise(resolve => { + setTimeout(() => { + resolve(); + }, mill); + }); +} + +export async function waitUntil(func: () => boolean, options?: object): Promise { + if (func()) return; + return pWaitFor(func, Object.assign({ interval: 1000 }, options)); +}