Skip to content

Commit

Permalink
Add Server#sendOnConnect()
Browse files Browse the repository at this point in the history
  • Loading branch information
ai committed Sep 1, 2024
1 parent 0c6a855 commit c817819
Show file tree
Hide file tree
Showing 6 changed files with 130 additions and 60 deletions.
25 changes: 24 additions & 1 deletion base-server/index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@ import type {
} from 'node:http'
import type { LogFn } from 'pino'

import type { ChannelContext, Context } from '../context/index.js'
import type {
ChannelContext,
ConnectContext,
Context
} from '../context/index.js'
import type { ServerClient } from '../server-client/index.js'

interface TypeOptions {
Expand All @@ -40,6 +44,12 @@ interface ChannelOptions {
queue?: string
}

interface ConnectLoader<Headers extends object = {}> {
(ctx: ConnectContext<Headers>, lastSynced: number):
| [Action, ServerMeta][]
| Promise<[Action, ServerMeta][]>
}

type ServerNodeConstructor = new (...args: any[]) => ServerNode

export interface ServerMeta extends Meta {
Expand Down Expand Up @@ -1106,6 +1116,19 @@ export class BaseServer<
*/
sendAction(action: Action, meta: ServerMeta): Promise<void> | void

/**
* Change a way how server loads actions history for the client.
*
* ```js
* server.sendOnConnect(async (ctx, lastSynced) => {
* return db.loadActions({ user: ctx.userId, after: lastSynced })
* })
* ```
*
* @param loader Callback which loads list of actions and meta.
*/
sendOnConnect(loader: ConnectLoader<Headers>)

/**
* Send `logux/subscribed` if client was not already subscribed.
*
Expand Down
4 changes: 4 additions & 0 deletions base-server/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -892,6 +892,10 @@ export class BaseServer {
}
}

sendOnConnect(loader) {
this.connectLoader = loader
}

setTimeout(callback, ms) {
this.lastTimeout += 1
let id = this.lastTimeout
Expand Down
96 changes: 48 additions & 48 deletions context/index.d.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,10 @@
import type { AnyAction } from '@logux/core'

import type { ServerMeta } from '../base-server/index.js'
import type { ServerClient } from '../server-client/index.js'
import type { Server } from '../server/index.js'

/**
* Action context.
*
* ```js
* server.type('FOO', {
* access (ctx, action, meta) {
* return ctx.isSubprotocol('3.x') ? check3(action) : check4(action)
* }
* })
* ```
*/
export class Context<Data extends object = {}, Headers extends object = {}> {
export class ConnectContext<Headers extends object = {}> {
/**
* Unique persistence client ID.
*
Expand All @@ -24,23 +14,6 @@ export class Context<Data extends object = {}, Headers extends object = {}> {
*/
clientId: string

/**
* Open structure to save some data between different steps of processing.
*
* ```js
* server.type('RENAME', {
* access (ctx, action, meta) {
* ctx.data.user = findUser(ctx.userId)
* return ctx.data.user.hasAccess(action.projectId)
* }
* process (ctx, action, meta) {
* return ctx.data.user.rename(action.projectId, action.name)
* }
* })
* ```
*/
data: Data

/**
* Client’s headers.
*
Expand All @@ -53,15 +26,6 @@ export class Context<Data extends object = {}, Headers extends object = {}> {
*/
headers: Headers

/**
* Was action created by Logux server.
*
* ```js
* access: (ctx, action, meta) => ctx.isServer
* ```
*/
isServer: boolean

/**
* Unique node ID.
*
Expand Down Expand Up @@ -92,17 +56,9 @@ export class Context<Data extends object = {}, Headers extends object = {}> {
* }
* ```
*/
userId: 'server' | string
userId: string

/**
* @param nodeId Unique node ID.
* @param clientId Unique persistence client ID.
* @param userId User ID taken node ID.
* @param subprotocol Action creator application subprotocol version
* in SemVer format.
* @param server Logux server
*/
constructor(server: Server, meta: ServerMeta)
constructor(server: Server, client: ServerClient)

/**
* Check creator subprotocol version. It uses `semver` npm package
Expand Down Expand Up @@ -135,6 +91,50 @@ export class Context<Data extends object = {}, Headers extends object = {}> {
sendBack(action: AnyAction, meta?: Partial<ServerMeta>): Promise<void>
}

/**
* Action context.
*
* ```js
* server.type('FOO', {
* access (ctx, action, meta) {
* return ctx.isSubprotocol('3.x') ? check3(action) : check4(action)
* }
* })
* ```
*/
export class Context<
Data extends object = {},
Headers extends object = {}
> extends ConnectContext<Headers> {
/**
* Open structure to save some data between different steps of processing.
*
* ```js
* server.type('RENAME', {
* access (ctx, action, meta) {
* ctx.data.user = findUser(ctx.userId)
* return ctx.data.user.hasAccess(action.projectId)
* }
* process (ctx, action, meta) {
* return ctx.data.user.rename(action.projectId, action.name)
* }
* })
* ```
*/
data: Data

/**
* Was action created by Logux server.
*
* ```js
* access: (ctx, action, meta) => ctx.isServer
* ```
*/
isServer: boolean

constructor(server: Server, meta: ServerMeta)
}

/**
* Subscription context.
*
Expand Down
29 changes: 18 additions & 11 deletions context/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,25 @@ export class Context {
this.server = server
this.data = {}

let parsed = parseId(meta.id)
this.nodeId = parsed.nodeId
this.userId = parsed.userId
this.clientId = parsed.clientId
this.isServer = this.userId === 'server'

let client = server.clientIds.get(this.clientId)

if (meta.subprotocol) {
this.subprotocol = meta.subprotocol
} else if (client) {
let client
if (meta.node) {
client = meta
this.nodeId = client.nodeId
this.userId = client.userId
this.clientId = client.clientId
this.subprotocol = client.node.remoteSubprotocol
} else {
let parsed = parseId(meta.id)
this.nodeId = parsed.nodeId
this.userId = parsed.userId
this.clientId = parsed.clientId
this.isServer = this.userId === 'server'
client = server.clientIds.get(this.clientId)
if (meta.subprotocol) {
this.subprotocol = meta.subprotocol
} else if (client) {
this.subprotocol = client.node.remoteSubprotocol
}
}

if (client) {
Expand Down
14 changes: 14 additions & 0 deletions server-client/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import fastq from 'fastq'
import semver from 'semver'

import { ALLOWED_META } from '../allowed-meta/index.js'
import { Context } from '../context/index.js'
import { filterMeta } from '../filter-meta/index.js'
import { FilteredNode } from '../filtered-node/index.js'

Expand Down Expand Up @@ -95,6 +96,19 @@ export class ServerClient {
this.node.setLocalHeaders({ env: 'development' })
}

if (app.connectLoader) {
this.node.syncSinceQuery = async lastSynced => {
let context = new Context(app, this)
let entries = await app.connectLoader(context, lastSynced)
let added = entries.reduce((max, entry) => {
let meta = filterMeta(entry[1])
entry[1] = meta
return meta.added > max ? meta.added : max
}, 0)
return { added, entries }
}
}

this.node.catch(err => {
err.connectionId = this.key
this.app.emitter.emit('error', err)
Expand Down
22 changes: 22 additions & 0 deletions server-client/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2519,3 +2519,25 @@ it('replaces Node class if necessary', async () => {
await setTimeout(10)
expect(actions(client)).toEqual([{ type: 'FOO' }])
})

it('allows to change how server loads initial actions', async () => {
let app = createServer({})
app.sendOnConnect(async (ctx, lastSync) => {
expect(ctx.clientId).toEqual('10:client')
expect(ctx.subprotocol).toEqual('0.0.1')
expect(lastSync).toEqual(0)
return [
[
{ type: 'FOO' },
{ added: 0, id: '1 server:uuid 0', reasons: [], server: '', time: 2 }
],
[
{ type: 'BAR' },
{ added: 0, id: '1 server:uuid 0', reasons: [], server: '', time: 1 }
]
]
})
let client = await connectClient(app, '10:client:uuid')
await setTimeout(10)
expect(actions(client)).toEqual([{ type: 'BAR' }, { type: 'FOO' }])
})

0 comments on commit c817819

Please sign in to comment.