-
Notifications
You must be signed in to change notification settings - Fork 2
/
createStore.js
229 lines (207 loc) · 7.13 KB
/
createStore.js
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
import { Observable, Subject } from './rx-ext';
import { Scheduler } from 'rxjs';
const INIT = '@udeo/INIT';
const HYDRATE = '@udeo/HYDRATE';
const CLEAR_STATE = '@udeo/CLEAR_STATE';
const CONNECT_STREAM = '@udeo/CONNECT_STREAM';
/**
* @typedef moduleDefinition
* @type {Object}
* @property {Function} flow - Provides the module's data flow.
* Returns an array of action streams to be reduced.
* @property {Function} reducer - The reducer of the provided flow.
* Returns the next version of the module's state (a plain old reducer function).
*/
/**
* Creates a store which houses a collection of state streams. It adds a state
* stream for each module definition provided.
*
* @param {Object} definitions - The module definitions. Each module definition
* provides two functions used to form the module's state stream.
*
* @param {Object} [preloadedState] - The initial state. Can be used to hydrate
* the store from state generated by the server in universal apps.
*
* @returns {Object} store - An Udeo store that allows you to subscribe to the state streams,
* dispatch actions and read the current state of the application.
*/
export function createStore(definitions, preloadedState = {}) {
// The raw action stream used to dispatch actions
const dispatch$ = new Subject();
// Collection of state streams by module - state streams are lazily connected upon subscribe
const stateStreams = {};
// Current state of the application - records the latest output of all state streams
let currentState = preloadedState;
// A collection of side effect streams by action type
const sideEffectStreams = {};
/**
* Adds a state stream for each module definition provided
*/
function addStreams(defs) {
Object.keys(defs).forEach(moduleName => {
const moduleResources = defs[moduleName];
// Use preloaded state or use dummy action to get initial state from reducer
if (!currentState[moduleName]) {
currentState[moduleName] = moduleResources.reducer(undefined, { type: INIT });
}
// The API provided to each flow function in addition to the dispatch$
const api = {
getState,
getState$,
// Provides a way to get a single action stream by type
getAction$: (actionType) => getSideEffect$(moduleName, actionType),
dispatch,
};
// The dispatch$ is to be used to filter out actions that have been dispatched
const streams = moduleResources.flow(dispatch$, api);
stateStreams[moduleName] = createState$(
moduleName,
streams,
moduleResources.reducer
);
});
}
/**
* Dispacthes the given action
*/
function dispatch(action) {
dispatch$.next(action);
}
/**
* Gets the current state of the application or module if module name is provided
*/
function getState(moduleName) {
return moduleName ? currentState[moduleName] : currentState;
}
/**
* Gets the state stream for given module
*/
function getState$(moduleName) {
let state$ = stateStreams[moduleName];
if (!state$) {
// We're still busy adding the stream so return a deferred observable
state$ = Observable.defer(() => stateStreams[moduleName]);
}
return state$;
}
/**
* Adds a side effect stream for the given action type. This side effect stream
* will flow as the action stream for the given action type flows through.
* The side effect stream will only flow if the action type in question is flowing
* from its origin. Thus we need to keep track of which module requested the
* side effect stream
*/
function getSideEffect$(requestingModule, actionType) {
let register = sideEffectStreams[actionType];
if (!register) {
register = {
sideEffect$: new Subject(),
modules: { [requestingModule]: true }
};
sideEffectStreams[actionType] = register;
} else {
register.modules[requestingModule] = true;
}
return register.sideEffect$.delay(0, Scheduler.asap);
}
const metaDispatch$ = new Subject();
/**
* Hydrates the state of the given module by dispatching a HYDRATE action.
* This action is automatically handled by the module's state stream
*/
function hydrate(moduleName, h) {
metaDispatch$.next({
type: HYDRATE,
payload: h,
meta: moduleName,
});
}
/**
* Clears the state of the given module (setting it back to the initial state) by
* dispatching a CLEAR_STATE action. This action is automatically handled by
* the module's state stream
*/
function clearState(moduleName) {
metaDispatch$.next({
type: CLEAR_STATE,
meta: moduleName,
});
}
let newStateMiddleware = () => {};
/**
* Set middleware to be invoked upon state changes
*/
function setMiddleware(middleware) {
newStateMiddleware = middleware;
}
/**
* Uses the provided action streams and reducer to build a state stream.
* The given action streams will be combined with more general action streams
* of type: HYDRATE and CLEAR_STATE
* These general action streams will be auto reduced for each module
*/
function createState$(moduleName, streams, reducer) {
// Higher order reducer that handles the HYDRATE and CLEAR_STATE actions
const moduleReducer = (state, action) => {
switch (action.type) {
case HYDRATE:
return {
...state,
...action.payload,
hydrated: true,
};
case CLEAR_STATE:
return reducer(undefined, action);
default:
return reducer(state, action);
}
};
// Combine module's action streams with common module actions streams
const combinedAction$ = Observable.merge(
...streams,
metaDispatch$.filter(action => action.meta === moduleName)
);
return combinedAction$
// Side effects
.do(action => {
const register = sideEffectStreams[action.type];
// Let action flow through registered side effect stream if the current module is the origin
if (register && !register.modules[moduleName]) {
register.sideEffect$.next(action);
}
})
// Start with a connect action. This is called when the state stream is
// subscribed to for the first time, or when it has been re-subscribed to
// after going cold.
.startWith({ type: CONNECT_STREAM })
// Handle connect action to pick up the state where we left it off
.map(action => action.type === CONNECT_STREAM ? currentState[moduleName] : action)
// Reduce state (with higher order reducer) for dispatched actions
.scan((state, action) => {
const newState = moduleReducer(state, action);
newStateMiddleware(moduleName, action, state, newState);
return newState;
})
// Record new state
.do(newState => {
currentState = {
...currentState,
[moduleName]: newState,
};
})
// Provide latest from state stream on subscribe
.publishReplay(1)
.refCount();
}
// Init
addStreams(definitions);
// Store API
return {
dispatch,
getState$,
getState,
hydrate,
clearState,
setMiddleware,
};
}