@@ -38,6 +38,7 @@ const size_utils = require('../../util/size_utils');
3838const os_utils = require ( '../../util/os_utils' ) ;
3939const config = require ( '../../../config' ) ;
4040const db_client = require ( '../../util/db_client' ) ;
41+ const { decode_json } = require ( '../../util/postgres_client' ) ;
4142
4243const { RpcError } = require ( '../../rpc' ) ;
4344const master_key_manager = require ( './master_key_manager' ) ;
@@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name');
152153
153154const accounts_by_email_lowercase = [ ] ;
154155
156+ const SOURCE = Object . freeze ( {
157+ DB : 'DB' ,
158+ CORE : 'CORE' ,
159+ } ) ;
155160
156161/**
157162 *
@@ -352,6 +357,8 @@ class SystemStore extends EventEmitter {
352357 this . START_REFRESH_THRESHOLD = 10 * 60 * 1000 ;
353358 this . FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000 ;
354359 this . SYSTEM_STORE_LOAD_CONCURRENCY = config . SYSTEM_STORE_LOAD_CONCURRENCY || 5 ;
360+ this . source = ( process . env . HOSTNAME && process . env . HOSTNAME . indexOf ( "endpoint" ) === - 1 ) ? SOURCE . DB : SOURCE . CORE ;
361+ dbg . log0 ( "system store source is" , this . source ) ;
355362 this . _load_serial = new semaphore . Semaphore ( 1 , { warning_timeout : this . START_REFRESH_THRESHOLD } ) ;
356363 for ( const col of COLLECTIONS ) {
357364 db_client . instance ( ) . define_collection ( col ) ;
@@ -406,8 +413,6 @@ class SystemStore extends EventEmitter {
406413 dbg . warn ( `system_store.refresh: system_store.data.time > FORCE_REFRESH_THRESHOLD, since_load = ${ since_load } , FORCE_REFRESH_THRESHOLD = ${ this . FORCE_REFRESH_THRESHOLD } ` ) ;
407414 res = this . load ( ) ;
408415 }
409- //call refresh periodically
410- P . delay_unblocking ( this . START_REFRESH_THRESHOLD ) . then ( this . refresh ) ;
411416 return res ;
412417 }
413418
@@ -418,18 +423,23 @@ class SystemStore extends EventEmitter {
418423 try {
419424 dbg . log3 ( 'SystemStore: loading ... this.last_update_time =' , this . last_update_time , ", since =" , since ) ;
420425
421- const new_data = new SystemStoreData ( ) ;
422-
423426 // If we get a load request with an timestamp older then our last update time
424427 // we ensure we load everyting from that timestamp by updating our last_update_time.
425428 if ( ! _ . isUndefined ( since ) && since < this . last_update_time ) {
426429 dbg . log0 ( 'SystemStore.load: Got load request with a timestamp' , since , 'older than my last update time' , this . last_update_time ) ;
427430 this . last_update_time = since ;
428431 }
429432 this . master_key_manager . load_root_key ( ) ;
433+ const new_data = new SystemStoreData ( ) ;
430434 let millistamp = time_utils . millistamp ( ) ;
431435 await this . _register_for_changes ( ) ;
432- await this . _read_new_data_from_db ( new_data ) ;
436+ if ( this . source === SOURCE . DB ) {
437+ await this . _read_new_data_from_db ( new_data ) ;
438+ } else {
439+ this . data = new SystemStoreData ( ) ;
440+ await this . _read_new_data_from_core ( this . data ) ;
441+ }
442+
433443 const secret = await os_utils . read_server_secret ( ) ;
434444 this . _server_secret = secret ;
435445 if ( dbg . should_log ( 1 ) ) { //param should match below logs' level
@@ -439,8 +449,10 @@ class SystemStore extends EventEmitter {
439449 depth : 4
440450 } ) ) ;
441451 }
442- this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
443- this . data = _ . cloneDeep ( this . old_db_data ) ;
452+ if ( this . source === SOURCE . DB ) {
453+ this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
454+ this . data = _ . cloneDeep ( this . old_db_data ) ;
455+ }
444456 millistamp = time_utils . millistamp ( ) ;
445457 this . data . rebuild ( ) ;
446458 dbg . log1 ( 'SystemStore: rebuild took' , time_utils . millitook ( millistamp ) ) ;
@@ -462,6 +474,12 @@ class SystemStore extends EventEmitter {
462474 } ) ;
463475 }
464476
477+ //return the latest copy of in-memory data
478+ async recent_db_data ( ) {
479+ //return this.db_clone;
480+ return this . _load_serial . surround ( async ( ) => this . old_db_data ) ;
481+ }
482+
465483 _update_data_from_new ( data , new_data ) {
466484 COLLECTIONS . forEach ( col => {
467485 const old_items = data [ col . name ] ;
@@ -527,6 +545,27 @@ class SystemStore extends EventEmitter {
527545 this . last_update_time = now ;
528546 }
529547
548+ async _read_new_data_from_core ( target ) {
549+ dbg . log0 ( "_read_new_data_from_core" ) ;
550+ const res = await server_rpc . client . system . get_system_store ( ) ;
551+ for ( const key of Object . keys ( res ) ) {
552+ const collection = COLLECTIONS_BY_NAME [ key ] ;
553+ if ( collection ) {
554+ target [ key ] = [ ] ;
555+ _ . each ( res [ key ] , item => {
556+ //these two lines will transform string values into appropriately typed objects
557+ //(SensitiveString, ObjectId) according to schema
558+ const after = decode_json ( collection . schema , item ) ;
559+ db_client . instance ( ) . validate ( key , after ) ;
560+ target [ key ] . push ( after ) ;
561+ } ) ;
562+ } else {
563+ target [ key ] = res [ key ] ;
564+ }
565+ }
566+ return res ;
567+ }
568+
530569 _check_schema ( col , item , warn ) {
531570 return db_client . instance ( ) . validate ( col . name , item , warn ) ;
532571 }
@@ -619,7 +658,7 @@ class SystemStore extends EventEmitter {
619658 if ( any_news ) {
620659 if ( this . is_standalone ) {
621660 await this . load ( last_update ) ;
622- } else if ( publish ) {
661+ } else /* if (publish)*/ {
623662 // notify all the cluster (including myself) to reload
624663 await server_rpc . client . redirector . publish_to_cluster ( {
625664 method_api : 'server_inter_process_api' ,
@@ -855,3 +894,4 @@ SystemStore._instance = undefined;
855894// EXPORTS
856895exports . SystemStore = SystemStore ;
857896exports . get_instance = SystemStore . get_instance ;
897+ exports . SOURCE = SOURCE ;
0 commit comments