Skip to content

Commit

Permalink
Merge pull request #43 from PrefectHQ/subscription-enhancements
Browse files Browse the repository at this point in the history
Enhancement: useSubscription reactive wrapper and expose executed
  • Loading branch information
pleek91 authored Mar 28, 2022
2 parents baaaf88 + ca99370 commit 33ef896
Show file tree
Hide file tree
Showing 8 changed files with 153 additions and 90 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
{
"name": "@prefecthq/vue-compositions",
"private": false,
"version": "0.1.19",
"version": "0.1.20",
"description": "A collection of reusable vue compositions.",
"main": "index.ts",
"scripts": {
Expand Down
34 changes: 28 additions & 6 deletions src/subscribe/channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,16 +28,15 @@ class ChannelSignatureManager {
ChannelSignatureManager.actionIds.set(action, actionId)
}

const unreffed = unrefArgs(args)
const stringArgs = JSON.stringify(unreffed)
const unwrapped = unrefArgs(args)
const stringArgs = JSON.stringify(unwrapped)

return `${actionId}-${stringArgs}`
}
}

export default class Channel<T extends Action = Action> {
public readonly signature: ChannelSignature
public executed: boolean = false

private readonly manager: Manager
private readonly action: T
Expand All @@ -46,6 +45,8 @@ export default class Channel<T extends Action = Action> {
private response: ActionResponse<T> | undefined = undefined
private timer: ReturnType<typeof setInterval> | null = null
private lastExecution: number = 0
private _loading: boolean = false
private _executed: boolean = false

public constructor(manager: Manager, action: T, args: ActionArguments<T>) {
this.signature = ChannelSignatureManager.get(action, args)
Expand All @@ -62,7 +63,26 @@ export default class Channel<T extends Action = Action> {
return Math.min(...intervals)
}

private get executed(): boolean {
return this._executed
}

private set executed(executed: boolean) {
this._executed = executed

for (const subscription of this.subscriptions.values()) {
subscription.executed.value = executed
}
}

// conflicting rules
// eslint-disable-next-line @typescript-eslint/member-ordering
private get loading(): boolean {
return this._loading
}

private set loading(loading: boolean) {
this._loading = loading
for (const subscription of this.subscriptions.values()) {
subscription.loading.value = loading
}
Expand All @@ -85,7 +105,9 @@ export default class Channel<T extends Action = Action> {

this.subscriptions.set(subscription.id, subscription)

if (!this.executed) {
if (this.executed || this.loading) {
subscription.executed.value = this.executed
} else {
this.execute()
}

Expand All @@ -108,7 +130,6 @@ export default class Channel<T extends Action = Action> {
const args = unrefArgs(this.args)

this.loading = true
this.executed = true
this.lastExecution = Date.now()

this.setInterval()
Expand All @@ -121,6 +142,7 @@ export default class Channel<T extends Action = Action> {
this.errored = true
this.error = error
} finally {
this.executed = true
this.loading = false
}
}
Expand All @@ -129,7 +151,7 @@ export default class Channel<T extends Action = Action> {
return this.subscriptions.has(id)
}

private setResponse(response: Awaited<ReturnType<T>>) {
private setResponse(response: Awaited<ReturnType<T>>): void {
this.response = response

for (const subscription of this.subscriptions.values()) {
Expand Down
51 changes: 16 additions & 35 deletions src/subscribe/subscribe.ts
Original file line number Diff line number Diff line change
@@ -1,68 +1,49 @@
import { getCurrentInstance, isReactive, isRef, onUnmounted, shallowReactive, unref, watch } from 'vue'
import { getCurrentInstance, onUnmounted, reactive, watch } from 'vue'
import Manager from './manager'
import Subscription from './subscription'
import { Action, ActionArguments, SubscribeArguments, SubscriptionOptions } from './types'
import { watchableArgs } from './utilities'
import { Action, ActionArguments, SubscribeArguments, UseSubscription } from './types'
import { mapSubscription, watchableArgs } from './utilities'

const defaultManager = new Manager()

/**
* @deprecated use useSubscription instead
*/
export function subscribe<T extends Action>(
action: T,
args: ActionArguments<T>,
options: SubscriptionOptions = {},
): Subscription<T> {
export function useSubscription<T extends Action>(...[action, args, options = {}]: SubscribeArguments<T>): UseSubscription<T> {
const manager = options.manager ?? defaultManager
const subscription = shallowReactive(manager.subscribe(action, args, options))
const argsWithDefault = args ?? ([] as unknown as ActionArguments<T>)
const subscription = manager.subscribe(action, argsWithDefault, options)
const response = reactive(mapSubscription(subscription))

let unwatch: ReturnType<typeof watch> | undefined
const watchable = watchableArgs(argsWithDefault)

if (
isRef(args) ||
isReactive(args) ||
(unref(args) as Parameters<T>).some(isRef) ||
(unref(args) as Parameters<T>).some(isReactive)
) {
const argsToWatch = watchableArgs(args)

if (watchable !== null) {
unwatch = watch(
argsToWatch,
watchable,
() => {
if (!subscription.isSubscribed()) {
if (!response.isSubscribed()) {
unwatch!()
return
}

subscription.unsubscribe()
response.unsubscribe()

const newSubscription = manager.subscribe(action, args, options)
const newSubscription = manager.subscribe(action, argsWithDefault, options)

newSubscription.response.value ??= subscription.response.value

Object.assign(subscription, newSubscription)
Object.assign(response, mapSubscription(newSubscription))
},
{ deep: true },
)
}

if (getCurrentInstance()) {
onUnmounted(() => {
subscription.unsubscribe()
response.unsubscribe()

if (unwatch) {
unwatch()
}
})
}

return subscription
}

export function useSubscription<T extends Action>(...[action, args, options]: SubscribeArguments<T>): Subscription<T> {
const argsWithDefault = args ?? ([] as unknown as ActionArguments<T>)
const optionsWithDefault = options ?? {}

return subscribe(action, argsWithDefault, optionsWithDefault)
return response
}
11 changes: 6 additions & 5 deletions src/subscribe/subscription.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ export default class Subscription<T extends Action> {
public response: Ref<ActionResponse<T>| undefined> = ref(undefined)
public errored: Ref<boolean> = ref(false)
public error: Ref<unknown> = ref(null)
public executed: Ref<boolean> = ref(false)

private readonly channel: Channel<T>

Expand All @@ -42,7 +43,7 @@ export default class Subscription<T extends Action> {

public promise(): Promise<Subscription<T>> {
return new Promise((resolve, reject) => {
if (this.channel.executed) {
if (this.executed.value) {
if (this.errored.value) {
reject(this.error.value)
return
Expand All @@ -52,17 +53,17 @@ export default class Subscription<T extends Action> {
return
}

const loadingWatcher = watch(this.loading, () => {
if (!this.loading.value) {
const executedWatcher = watch(this.executed, () => {
if (this.executed.value) {
erroredWatcher()
loadingWatcher()
executedWatcher()
resolve(this)
}
})

const erroredWatcher = watch(this.errored, () => {
if (this.errored.value) {
loadingWatcher()
executedWatcher()
erroredWatcher()
reject(this.error.value)
}
Expand Down
30 changes: 28 additions & 2 deletions src/subscribe/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
/* eslint-disable @typescript-eslint/no-explicit-any */
import { Ref, UnwrapRef } from 'vue'
import Manager from './manager'
import Subscription from './subscription'

type ToPossibleRefs<T> = {
[K in keyof T]: T[K] | Ref<UnwrapRef<T[K]>>
Expand All @@ -23,5 +24,30 @@ type OnlyRequired<T extends any[], U extends any[] = []> = Partial<T> extends T
type ActionParamsRequired<T extends Action> = OnlyRequired<Parameters<T>>

export type SubscribeArguments<T extends Action> = ActionParamsRequired<T> extends never[]
? [action: T, args?: ActionArguments<T>, options?: SubscriptionOptions ]
: [action: T, args: ActionArguments<T>, options?: SubscriptionOptions ]
? [action: T, args?: ActionArguments<T>, options?: SubscriptionOptions]
: [action: T, args: ActionArguments<T>, options?: SubscriptionOptions]


export type MappedSubscription<T extends Action> = {
loading: Subscription<T>['loading'],
response: Subscription<T>['response'],
errored: Subscription<T>['errored'],
error: Subscription<T>['error'],
executed: Subscription<T>['executed'],
refresh: Subscription<T>['refresh'],
unsubscribe: Subscription<T>['unsubscribe'],
isSubscribed: Subscription<T>['isSubscribed'],
promise: () => Promise<UseSubscription<T>>,
}

export type UseSubscription<T extends Action> = {
loading: boolean,
response: ActionResponse<T> | undefined,
errored: boolean,
error: unknown,
executed: boolean,
refresh: Subscription<T>['refresh'],
unsubscribe: Subscription<T>['unsubscribe'],
isSubscribed: Subscription<T>['isSubscribed'],
promise: () => Promise<UseSubscription<T>>,
}
25 changes: 21 additions & 4 deletions src/subscribe/utilities.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { isReactive, isRef, unref, WatchSource } from 'vue'
import { Action, ActionArguments } from './types'
import { isReactive, isRef, reactive, unref, WatchSource } from 'vue'
import Subscription from './subscription'
import { Action, ActionArguments, MappedSubscription } from './types'

export function unrefArgs<T extends Action>(args: ActionArguments<T>): Parameters<T> {
const argsUnref = unref(args) as Parameters<T>

return argsUnref.map(unref) as Parameters<T>
}

export function watchableArgs<T extends Action>(args: ActionArguments<T>): WatchSource | WatchSource[] {
export function watchableArgs<T extends Action>(args: ActionArguments<T>): WatchSource | WatchSource[] | null {
if (isRef(args) || isReactive(args)) {
// can't quite figure out the types here. But the tests around reactive arguments pass so I believe this is correct
// eslint-disable-next-line @typescript-eslint/no-explicit-any
Expand All @@ -18,5 +19,21 @@ export function watchableArgs<T extends Action>(args: ActionArguments<T>): Watch
return args.filter(arg => isRef(arg) || isReactive(arg))
}

return []
return null
}

export function mapSubscription<T extends Action>(subscription: Subscription<T>): MappedSubscription<T> {
const { loading, error, errored, response, executed } = subscription

return {
loading,
error,
errored,
response,
executed,
refresh: () => subscription.refresh(),
unsubscribe: () => subscription.unsubscribe(),
isSubscribed: () => subscription.isSubscribed(),
promise: () => subscription.promise().then(subscription => reactive(mapSubscription(subscription))),
}
}
Loading

0 comments on commit 33ef896

Please sign in to comment.