@@ -38,8 +38,9 @@ const size_utils = require('../../util/size_utils');
3838const os_utils = require ( '../../util/os_utils' ) ;
3939const config = require ( '../../../config' ) ;
4040const db_client = require ( '../../util/db_client' ) ;
41+ const { decode_json } = require ( '../../util/postgres_client' ) ;
4142
42- const { RpcError } = require ( '../../rpc' ) ;
43+ const { RpcError, RPC_BUFFERS } = require ( '../../rpc' ) ;
4344const master_key_manager = require ( './master_key_manager' ) ;
4445
4546const COLLECTIONS = [ {
@@ -152,6 +153,10 @@ const COLLECTIONS_BY_NAME = _.keyBy(COLLECTIONS, 'name');
152153
153154const accounts_by_email_lowercase = [ ] ;
154155
156+ const SOURCE = Object . freeze ( {
157+ DB : 'DB' ,
158+ CORE : 'CORE' ,
159+ } ) ;
155160
156161/**
157162 *
@@ -352,6 +357,8 @@ class SystemStore extends EventEmitter {
352357 this . START_REFRESH_THRESHOLD = 10 * 60 * 1000 ;
353358 this . FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000 ;
354359 this . SYSTEM_STORE_LOAD_CONCURRENCY = config . SYSTEM_STORE_LOAD_CONCURRENCY || 5 ;
360+ this . source = ( process . env . HOSTNAME && process . env . HOSTNAME . indexOf ( "endpoint" ) === - 1 ) ? SOURCE . DB : SOURCE . CORE ;
361+ dbg . log0 ( "system store source is" , this . source ) ;
355362 this . _load_serial = new semaphore . Semaphore ( 1 , { warning_timeout : this . START_REFRESH_THRESHOLD } ) ;
356363 for ( const col of COLLECTIONS ) {
357364 db_client . instance ( ) . define_collection ( col ) ;
@@ -414,18 +421,23 @@ class SystemStore extends EventEmitter {
414421 try {
415422 dbg . log3 ( 'SystemStore: loading ... this.last_update_time =' , this . last_update_time , ", since =" , since ) ;
416423
417- const new_data = new SystemStoreData ( ) ;
418-
419424 // If we get a load request with an timestamp older then our last update time
420425 // we ensure we load everyting from that timestamp by updating our last_update_time.
421426 if ( ! _ . isUndefined ( since ) && since < this . last_update_time ) {
422427 dbg . log0 ( 'SystemStore.load: Got load request with a timestamp' , since , 'older than my last update time' , this . last_update_time ) ;
423428 this . last_update_time = since ;
424429 }
425430 this . master_key_manager . load_root_key ( ) ;
431+ const new_data = new SystemStoreData ( ) ;
426432 let millistamp = time_utils . millistamp ( ) ;
427433 await this . _register_for_changes ( ) ;
428- await this . _read_new_data_from_db ( new_data ) ;
434+ if ( this . source === SOURCE . DB ) {
435+ await this . _read_new_data_from_db ( new_data ) ;
436+ } else {
437+ this . data = new SystemStoreData ( ) ;
438+ await this . _read_new_data_from_core ( this . data ) ;
439+ }
440+
429441 const secret = await os_utils . read_server_secret ( ) ;
430442 this . _server_secret = secret ;
431443 if ( dbg . should_log ( 1 ) ) { //param should match below logs' level
@@ -435,8 +447,10 @@ class SystemStore extends EventEmitter {
435447 depth : 4
436448 } ) ) ;
437449 }
438- this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
439- this . data = _ . cloneDeep ( this . old_db_data ) ;
450+ if ( this . source === SOURCE . DB ) {
451+ this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
452+ this . data = _ . cloneDeep ( this . old_db_data ) ;
453+ }
440454 millistamp = time_utils . millistamp ( ) ;
441455 this . data . rebuild ( ) ;
442456 dbg . log1 ( 'SystemStore: rebuild took' , time_utils . millitook ( millistamp ) ) ;
@@ -458,6 +472,11 @@ class SystemStore extends EventEmitter {
458472 } ) ;
459473 }
460474
475+ //return the latest copy of in-memory data
476+ async recent_db_data ( ) {
477+ return this . _load_serial . surround ( async ( ) => this . old_db_data ) ;
478+ }
479+
461480 _update_data_from_new ( data , new_data ) {
462481 COLLECTIONS . forEach ( col => {
463482 const old_items = data [ col . name ] ;
@@ -523,6 +542,28 @@ class SystemStore extends EventEmitter {
523542 this . last_update_time = now ;
524543 }
525544
545+ async _read_new_data_from_core ( target ) {
546+ dbg . log3 ( "_read_new_data_from_core begins" ) ;
547+ const res = await server_rpc . client . system . get_system_store ( ) ;
548+ const ss = JSON . parse ( res [ RPC_BUFFERS ] . data . toString ( ) ) ;
549+ dbg . log3 ( "_read_new_data_from_core new system store" , ss ) ;
550+ for ( const key of Object . keys ( ss ) ) {
551+ const collection = COLLECTIONS_BY_NAME [ key ] ;
552+ if ( collection ) {
553+ target [ key ] = [ ] ;
554+ _ . each ( ss [ key ] , item => {
555+ //these two lines will transform string values into appropriately typed objects
556+ //(SensitiveString, ObjectId) according to schema
557+ const after = decode_json ( collection . schema , item ) ;
558+ db_client . instance ( ) . validate ( key , after ) ;
559+ target [ key ] . push ( after ) ;
560+ } ) ;
561+ } else {
562+ target [ key ] = ss [ key ] ;
563+ }
564+ }
565+ }
566+
526567 _check_schema ( col , item , warn ) {
527568 return db_client . instance ( ) . validate ( col . name , item , warn ) ;
528569 }
@@ -851,3 +892,4 @@ SystemStore._instance = undefined;
851892// EXPORTS
852893exports . SystemStore = SystemStore ;
853894exports . get_instance = SystemStore . get_instance ;
895+ exports . SOURCE = SOURCE ;
0 commit comments