Skip to content

Commit

Permalink
feat: implement kuai-router & listener
Browse files Browse the repository at this point in the history
  • Loading branch information
PainterPuppets committed Dec 27, 2022
1 parent 434f69f commit d8e1ae2
Show file tree
Hide file tree
Showing 11 changed files with 1,438 additions and 32 deletions.
1,062 changes: 1,036 additions & 26 deletions package-lock.json

Large diffs are not rendered by default.

59 changes: 59 additions & 0 deletions packages/io/__tests__/adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
import { describe, it, expect, afterAll } from '@jest/globals'
import { CoR } from '../src/cor'
import { KuaiRouter } from '../src/router'
import { KoaRouterAdapter } from '../src/adapter'
import Koa from 'koa'

describe('test KoaRouterAdapter', () => {
const koaServer = new Koa()

const cor = new CoR()
const kuaiRouter = new KuaiRouter()
kuaiRouter.get('/', async (ctx, next) => {
ctx.ok('hello root')
await next()
})

kuaiRouter.get('/parent', async (ctx, next) => {
ctx.ok('hello parent')
await next()
})

kuaiRouter.get('/parent/children', async (ctx, next) => {
ctx.ok('hello children')
await next()
})

cor.use(kuaiRouter.middleware())

const koaRouterAdapter = new KoaRouterAdapter(cor)

koaServer.use(koaRouterAdapter.routes()).use(koaRouterAdapter.allowedMethods())

const server = koaServer.listen(4004)

afterAll(() => {
server.close()
})

it(`should find root`, async () => {
const res = await fetch('http://localhost:4004/', { method: 'GET' })
expect(res.status).toEqual(200)
const body = await res.text()
expect(body).toEqual('hello root')
})

it(`should find parent`, async () => {
const res = await fetch('http://localhost:4004/parent', { method: 'GET' })
expect(res.status).toEqual(200)
const body = await res.text()
expect(body).toEqual('hello parent')
})

it(`should find children`, async () => {
const res = await fetch('http://localhost:4004/parent/children', { method: 'GET' })
expect(res.status).toEqual(200)
const body = await res.text()
expect(body).toEqual('hello children')
})
})
29 changes: 29 additions & 0 deletions packages/io/__tests__/cor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,4 +101,33 @@ describe('Test CoR', () => {
expect(mockBefore).toBeCalled()
expect(mockAfter).toBeCalled()
})

it(`should return ok on any middleware`, async () => {
const cor = new CoR()
const mockFn = jest.fn()
const mockFn1 = jest.fn()
const mockFn2 = jest.fn()

cor.use(async (_, next) => {
mockFn()
await next()
})
cor.use(async (ctx, next) => {
mockFn1()
ctx.ok('ok')
await next()
})
cor.use(async (ctx, next) => {
mockFn2()
ctx.ok('hello')
await next()
})

const result = await cor.dispatch({})

expect(result).toEqual('ok')
expect(mockFn).toBeCalled()
expect(mockFn1).toBeCalled()
expect(mockFn2).toBeCalled()
})
})
99 changes: 99 additions & 0 deletions packages/io/__tests__/listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { jest, describe, it, expect } from '@jest/globals'
import type { CKBComponents } from '@ckb-lumos/rpc/lib/types/api'
import { TipHeaderListener } from '../src/listener'
import { timeout, distinctUntilChanged, catchError } from 'rxjs'
import type { Subscription } from 'rxjs'

import { ChainSource } from '../src/types'
/* eslint-disable-next-line @typescript-eslint/no-var-requires */
const { scheduler } = require('node:timers/promises')

describe('test Listener', () => {
const mockHeader = {
timestamp: '0x',
number: '0x',
epoch: '0x',
compactTarget: '0x',
dao: '0x',
hash: '0x',
nonce: '0x',
parentHash: '0x',
proposalsHash: '0x',
transactionsRoot: '0x',
extraHash: '0x',
version: '0x',
}
const mockEpoch = {
compactTarget: '0x',
length: '0x',
startNumber: '0x',
number: '0x',
}

const waitSubscriptionClose = async (sub: Subscription) => {
while (!sub.closed) {
await scheduler.wait(1000)
}
}

const mockSource: ChainSource = {
getTipBlockNumber: () => Promise.resolve('0x' + new Date().getSeconds().toString(16).padStart(2, '0')),
getTipHeader: () =>
Promise.resolve({
...mockHeader,
number: '0x' + new Date().getSeconds().toString(16).padStart(2, '0'),
}),
getCurrentEpoch: () => Promise.resolve(mockEpoch),
}

it(`distinctUntilChanged pipe `, async () => {
const tipHeaders: CKBComponents.BlockHeader[] = []
const subscribe = new TipHeaderListener(mockSource, 100)
.getObservable()
.pipe(distinctUntilChanged((a, b) => a.number === b.number))
.subscribe((header) => {
tipHeaders.push(header)
if (tipHeaders.length === 4) {
subscribe.unsubscribe()
expect(tipHeaders[0].number).not.toMatch(tipHeaders[1].number)
expect(tipHeaders[1].number).not.toMatch(tipHeaders[2].number)
expect(tipHeaders[2].number).not.toMatch(tipHeaders[3].number)
}
})

await waitSubscriptionClose(subscribe)
})

it(`custom pipe listener`, async () => {
const slowMockSource: ChainSource = {
...mockSource,
getTipHeader: async () => {
await scheduler.wait(1000 * 60)
return Promise.resolve({
...mockHeader,
number: '0x' + new Date().getSeconds().toString(16).padStart(2, '0'),
})
},
}

const mockErrHadnle = jest.fn()

const subscription = new TipHeaderListener(slowMockSource, 100)
.getObservable()
.pipe(
timeout(1000),
distinctUntilChanged((a, b) => a.number === b.number),
catchError((err) => {
mockErrHadnle(err.message)
return '0x'
}),
)
.subscribe(() => {
subscription.unsubscribe()
})
await waitSubscriptionClose(subscription)

expect(mockErrHadnle).toBeCalled()
expect(mockErrHadnle).toBeCalledWith('Timeout has occurred')
})
})
35 changes: 35 additions & 0 deletions packages/io/__tests__/router.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
import { describe, it, expect } from '@jest/globals'
import { CoR } from '../src/cor'
import { KuaiRouter } from '../src/router'

describe('test KuaiRouter', () => {
it(`should use kuaiRouter`, async () => {
const cor = new CoR()
const kuaiRouter = new KuaiRouter()
kuaiRouter.get('/', async (ctx, next) => {
ctx.ok('hello root')
await next()
})

kuaiRouter.get('/parent', async (ctx, next) => {
ctx.ok('hello parent')
await next()
})

kuaiRouter.get('/parent/children', async (ctx, next) => {
ctx.ok('hello children')
await next()
})

cor.use(kuaiRouter.middleware())

const rootResult = await cor.dispatch({ method: 'GET', path: '/' })
expect(rootResult).toMatch('hello root')

const parentResult = await cor.dispatch({ method: 'GET', path: '/parent' })
expect(parentResult).toMatch('hello parent')

const childrenResult = await cor.dispatch({ method: 'GET', path: '/parent/children' })
expect(childrenResult).toMatch('hello children')
})
})
6 changes: 2 additions & 4 deletions packages/io/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@
module.exports = {
preset: 'ts-jest',
testEnvironment: 'node',
globals: {
'ts-jest': {
tsConfig: 'tsconfig.test.json',
},
transform: {
'^.+\\.tsx?$': 'ts-jest',
},
}
9 changes: 7 additions & 2 deletions packages/io/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,14 @@
"url": "https://github.com/ckb-js/kuai/issues"
},
"dependencies": {
"koa-compose": "4.1.0"
"@ckb-lumos/rpc": "0.19.0",
"koa-compose": "4.1.0",
"koa-router": "12.0.0",
"rxjs": "7.8.0"
},
"devDependencies": {
"@types/koa-compose": "3.2.5"
"@types/koa-compose": "3.2.5",
"@types/koa-router": "7.4.4",
"koa": "2.14.1"
}
}
26 changes: 26 additions & 0 deletions packages/io/src/adapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import KoaRouter from 'koa-router'
import { CoR } from './cor'

export class KoaRouterAdapter extends KoaRouter {
constructor(private readonly cor: CoR) {
super()
}

middleware(): KoaRouter.IMiddleware {
return this.routes()
}

routes(): KoaRouter.IMiddleware {
return async (ctx, next) => {
ctx.body = await this.cor.dispatch({
method: ctx.method,
path: ctx.path,
params: ctx.params,
// todo: support body & query
// query: ctx.query,
// body: ctx.request.body,
})
await next()
}
}
}
47 changes: 47 additions & 0 deletions packages/io/src/listener.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import type { CKBComponents } from '@ckb-lumos/rpc/lib/types/api'

import { Observable } from 'rxjs'
import { Listener as IListener, ChainSource } from './types'
import type { Subscription } from 'rxjs'

export class Listener<T> implements IListener<T> {
protected _observable: Observable<T>

constructor(private pollingInterval: number = 1000) {
this._observable = this.polling()
}

protected emit(): Promise<T> | T {
throw new Error('Method not implemented.')
}

protected polling(): Observable<T> {
return new Observable<T>((subscriber) => {
const timer = setInterval(async () => {
subscriber.next(await this.emit())
}, this.pollingInterval)

return () => {
clearInterval(timer)
}
})
}

public on(listen: (obj: T) => void): Subscription {
return this._observable.subscribe(listen)
}
}

export class TipHeaderListener extends Listener<CKBComponents.BlockHeader> {
constructor(private source: ChainSource, pollingInterval = 1000) {
super(pollingInterval)
}

protected async emit(): Promise<CKBComponents.BlockHeader> {
return this.source.getTipHeader()
}

public getObservable(): Observable<CKBComponents.BlockHeader> {
return this._observable
}
}
Loading

0 comments on commit d8e1ae2

Please sign in to comment.