@@ -341,10 +341,6 @@ class SystemStore extends EventEmitter {
341341 */
342342 static get_instance ( options = { } ) {
343343 const { standalone } = options ;
344- //load from core if enabled and this is an endpoint
345- const is_endpoint = process . env . HOSTNAME && process . env . HOSTNAME . indexOf ( "endpoint" ) !== - 1 ;
346- this . source = options . source ||
347- ( ( config . SYSTEM_STORE_SOURCE . toUpperCase ( ) === 'CORE' && is_endpoint ) ? SOURCE . CORE : SOURCE . DB ) ;
348344 SystemStore . _instance = SystemStore . _instance || new SystemStore ( { standalone } ) ;
349345 return SystemStore . _instance ;
350346 }
@@ -361,18 +357,13 @@ class SystemStore extends EventEmitter {
361357 this . START_REFRESH_THRESHOLD = 10 * 60 * 1000 ;
362358 this . FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000 ;
363359 this . SYSTEM_STORE_LOAD_CONCURRENCY = config . SYSTEM_STORE_LOAD_CONCURRENCY || 5 ;
364- this . source = options . source || SOURCE . DB ;
360+ this . source = options . source || config . SYSTEM_STORE_SOURCE ;
361+ this . source = this . source . toUpperCase ( ) ;
365362 dbg . log0 ( "system store source is" , this . source ) ;
366363 this . _load_serial = new semaphore . Semaphore ( 1 , { warning_timeout : this . START_REFRESH_THRESHOLD } ) ;
367- for ( const col of COLLECTIONS ) {
368- try {
364+ if ( options . skip_define_for_tests !== true ) {
365+ for ( const col of COLLECTIONS ) {
369366 db_client . instance ( ) . define_collection ( col ) ;
370- } catch ( e ) {
371- if ( e . message ?. indexOf ( "already defined" ) > - 1 ) {
372- dbg . warn ( "Ignoring already defined error" ) ;
373- } else {
374- throw e ;
375- }
376367 }
377368 }
378369 js_utils . deep_freeze ( COLLECTIONS ) ;
@@ -432,7 +423,7 @@ class SystemStore extends EventEmitter {
432423 //then endpoints skip it.
433424 //endpoints will be updated in the next load_system_store()
434425 //once core's in memory system store is updated.
435- if ( this . source . toUpperCase ( ) === 'CORE' && load_from_core_step && load_from_core_step . toUpperCase ( ) === 'CORE' ) {
426+ if ( load_from_core_step && ( this . source !== load_from_core_step ) ) {
436427 return ;
437428 }
438429
@@ -504,6 +495,9 @@ class SystemStore extends EventEmitter {
504495
505496 //return the latest copy of in-memory data
506497 async recent_db_data ( ) {
498+ if ( this . source === SOURCE . CORE ) {
499+ throw new RpcError ( 'BAD_REQUEST' , 'recent_db_data is not available for CORE source' ) ;
500+ }
507501 return this . _load_serial . surround ( async ( ) => this . old_db_data ) ;
508502 }
509503
@@ -693,22 +687,20 @@ class SystemStore extends EventEmitter {
693687 method_api : 'server_inter_process_api' ,
694688 method_name : 'load_system_store' ,
695689 target : '' ,
696- request_params : { since : last_update , load_from_core_step : 'CORE' }
690+ request_params : { since : last_update , load_from_core_step : SOURCE . DB }
697691 } ) ;
698692
699693 //if endpoints are loading system store from core, we need to wait until
700694 //above publish_to_cluster() will update core's in-memory db.
701695 //the next publist_to_cluster() will make endpoints load the updated
702696 //system store from core
703- if ( config . SYSTEM_STORE_SOURCE . toUpperCase ( ) === 'CORE' ) {
704- dbg . log2 ( "second phase publish" ) ;
705- await server_rpc . client . redirector . publish_to_cluster ( {
706- method_api : 'server_inter_process_api' ,
707- method_name : 'load_system_store' ,
708- target : '' ,
709- request_params : { since : last_update , load_from_core_step : 'ENDPOINT' }
710- } ) ;
711- }
697+ dbg . log2 ( "second phase publish" ) ;
698+ await server_rpc . client . redirector . publish_to_cluster ( {
699+ method_api : 'server_inter_process_api' ,
700+ method_name : 'load_system_store' ,
701+ target : '' ,
702+ request_params : { since : last_update , load_from_core_step : SOURCE . CORE }
703+ } ) ;
712704 }
713705 }
714706 }
0 commit comments