@@ -351,6 +351,7 @@ class SystemStore extends EventEmitter {
351351 this . is_finished_initial_load = false ;
352352 this . START_REFRESH_THRESHOLD = 10 * 60 * 1000 ;
353353 this . FORCE_REFRESH_THRESHOLD = 60 * 60 * 1000 ;
354+ this . SYSTEM_STORE_LOAD_CONCURRENCY = config . SYSTEM_STORE_LOAD_CONCURRENCY || 5 ;
354355 this . _load_serial = new semaphore . Semaphore ( 1 , { warning_timeout : this . START_REFRESH_THRESHOLD } ) ;
355356 for ( const col of COLLECTIONS ) {
356357 db_client . instance ( ) . define_collection ( col ) ;
@@ -411,26 +412,29 @@ class SystemStore extends EventEmitter {
411412 // because it might not see the latest changes if we don't reload right after make_changes.
412413 return this . _load_serial . surround ( async ( ) => {
413414 try {
414- dbg . log3 ( 'SystemStore: loading ...' ) ;
415+ dbg . log3 ( 'SystemStore: loading ... this.last_update_time =' , this . last_update_time , ", since =" , since ) ;
416+
417+ const new_data = new SystemStoreData ( ) ;
415418
416419 // If we get a load request with an timestamp older then our last update time
417420 // we ensure we load everyting from that timestamp by updating our last_update_time.
418421 if ( ! _ . isUndefined ( since ) && since < this . last_update_time ) {
419- dbg . log0 ( 'SystemStore.load: Got load request with a timestamp older then my last update time' ) ;
422+ dbg . log0 ( 'SystemStore.load: Got load request with a timestamp' , since , ' older than my last update time', this . last_update_time ) ;
420423 this . last_update_time = since ;
421424 }
422425 this . master_key_manager . load_root_key ( ) ;
423- const new_data = new SystemStoreData ( ) ;
424426 let millistamp = time_utils . millistamp ( ) ;
425427 await this . _register_for_changes ( ) ;
426428 await this . _read_new_data_from_db ( new_data ) ;
427429 const secret = await os_utils . read_server_secret ( ) ;
428430 this . _server_secret = secret ;
429- dbg . log1 ( 'SystemStore: fetch took' , time_utils . millitook ( millistamp ) ) ;
430- dbg . log1 ( 'SystemStore: fetch size' , size_utils . human_size ( JSON . stringify ( new_data ) . length ) ) ;
431- dbg . log1 ( 'SystemStore: fetch data' , util . inspect ( new_data , {
432- depth : 4
433- } ) ) ;
431+ if ( dbg . should_log ( 1 ) ) { //param should match below logs' level
432+ dbg . log1 ( 'SystemStore: fetch took' , time_utils . millitook ( millistamp ) ) ;
433+ dbg . log1 ( 'SystemStore: fetch size' , size_utils . human_size ( JSON . stringify ( new_data ) . length ) ) ;
434+ dbg . log1 ( 'SystemStore: fetch data' , util . inspect ( new_data , {
435+ depth : 4
436+ } ) ) ;
437+ }
434438 this . old_db_data = this . _update_data_from_new ( this . old_db_data || { } , new_data ) ;
435439 this . data = _ . cloneDeep ( this . old_db_data ) ;
436440 millistamp = time_utils . millistamp ( ) ;
@@ -506,7 +510,7 @@ class SystemStore extends EventEmitter {
506510 }
507511 } ;
508512 await db_client . instance ( ) . connect ( ) ;
509- await P . map ( COLLECTIONS , async col => {
513+ await P . map_with_concurrency ( this . SYSTEM_STORE_LOAD_CONCURRENCY , COLLECTIONS , async col => {
510514 const res = await db_client . instance ( ) . collection ( col . name )
511515 . find ( newly_updated_query , {
512516 projection : { last_update : 0 }
@@ -598,7 +602,7 @@ class SystemStore extends EventEmitter {
598602 * @property {Object } [remove]
599603 *
600604 */
601- async make_changes ( changes ) {
605+ async make_changes ( changes , publish = true ) {
602606 // Refreshing must be done outside the semapore lock because refresh
603607 // might call load that is locking on the same semaphore.
604608 await this . refresh ( ) ;
@@ -611,7 +615,7 @@ class SystemStore extends EventEmitter {
611615 if ( any_news ) {
612616 if ( this . is_standalone ) {
613617 await this . load ( last_update ) ;
614- } else {
618+ } else if ( publish ) {
615619 // notify all the cluster (including myself) to reload
616620 await server_rpc . client . redirector . publish_to_cluster ( {
617621 method_api : 'server_inter_process_api' ,
0 commit comments