55 */
66package org .elasticsearch .xpack .ccr .action ;
77
8+ import com .carrotsearch .hppc .cursors .ObjectObjectCursor ;
89import org .elasticsearch .Version ;
910import org .elasticsearch .action .admin .cluster .state .ClusterStateResponse ;
11+ import org .elasticsearch .action .support .replication .ClusterStateCreationUtils ;
1012import org .elasticsearch .client .Client ;
1113import org .elasticsearch .cluster .ClusterName ;
1214import org .elasticsearch .cluster .ClusterState ;
1820import org .elasticsearch .cluster .routing .ShardRoutingState ;
1921import org .elasticsearch .cluster .routing .TestShardRouting ;
2022import org .elasticsearch .cluster .service .ClusterService ;
23+ import org .elasticsearch .common .UUIDs ;
2124import org .elasticsearch .common .collect .Tuple ;
2225import org .elasticsearch .common .settings .ClusterSettings ;
2326import org .elasticsearch .common .settings .Settings ;
2427import org .elasticsearch .common .unit .ByteSizeValue ;
2528import org .elasticsearch .common .unit .TimeValue ;
29+ import org .elasticsearch .common .util .concurrent .ConcurrentCollections ;
2630import org .elasticsearch .common .util .concurrent .EsRejectedExecutionException ;
2731import org .elasticsearch .index .Index ;
2832import org .elasticsearch .index .IndexSettings ;
4448import java .util .LinkedList ;
4549import java .util .List ;
4650import java .util .Map ;
51+ import java .util .Set ;
4752import java .util .concurrent .CountDownLatch ;
4853import java .util .concurrent .ExecutorService ;
4954import java .util .concurrent .Executors ;
5055import java .util .concurrent .atomic .AtomicInteger ;
56+ import java .util .concurrent .atomic .AtomicReference ;
5157import java .util .function .BiConsumer ;
5258import java .util .function .Consumer ;
5359import java .util .function .Function ;
5763import static org .elasticsearch .xpack .ccr .action .AutoFollowCoordinator .AutoFollower .recordLeaderIndexAsFollowFunction ;
5864import static org .hamcrest .Matchers .equalTo ;
5965import static org .hamcrest .Matchers .greaterThan ;
66+ import static org .hamcrest .Matchers .hasItem ;
6067import static org .hamcrest .Matchers .is ;
6168import static org .hamcrest .Matchers .notNullValue ;
6269import static org .hamcrest .Matchers .nullValue ;
@@ -80,7 +87,7 @@ public void testAutoFollower() {
8087 Map <String , List <String >> followedLeaderIndexUUIDS = new HashMap <>();
8188 followedLeaderIndexUUIDS .put ("remote" , new ArrayList <>());
8289 Map <String , Map <String , String >> autoFollowHeaders = new HashMap <>();
83- autoFollowHeaders .put ("remote" , Collections . singletonMap ("key" , "val" ));
90+ autoFollowHeaders .put ("remote" , Map . of ("key" , "val" ));
8491 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (patterns , followedLeaderIndexUUIDS , autoFollowHeaders );
8592
8693 ClusterState currentState = ClusterState .builder (new ClusterName ("name" ))
@@ -315,7 +322,7 @@ public void testGetLeaderIndicesToFollow() {
315322 Map <String , Map <String , String >> headers = new HashMap <>();
316323 ClusterState clusterState = ClusterState .builder (new ClusterName ("remote" ))
317324 .metaData (MetaData .builder ().putCustom (AutoFollowMetadata .TYPE ,
318- new AutoFollowMetadata (Collections . singletonMap ("remote" , autoFollowPattern ), Collections .emptyMap (), headers )))
325+ new AutoFollowMetadata (Map . of ("remote" , autoFollowPattern ), Collections .emptyMap (), headers )))
319326 .build ();
320327
321328 RoutingTable .Builder routingTableBuilder = RoutingTable .builder ();
@@ -377,7 +384,7 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() {
377384 Map <String , Map <String , String >> headers = new HashMap <>();
378385 ClusterState clusterState = ClusterState .builder (new ClusterName ("remote" ))
379386 .metaData (MetaData .builder ().putCustom (AutoFollowMetadata .TYPE ,
380- new AutoFollowMetadata (Collections . singletonMap ("remote" , autoFollowPattern ), Collections .emptyMap (), headers )))
387+ new AutoFollowMetadata (Map . of ("remote" , autoFollowPattern ), Collections .emptyMap (), headers )))
381388 .build ();
382389
383390 // 1 shard started and another not started:
@@ -416,9 +423,29 @@ public void testGetLeaderIndicesToFollow_shardsNotStarted() {
416423 assertThat (result .get (1 ).getName (), equalTo ("index2" ));
417424 }
418425
426+ public void testGetLeaderIndicesToFollowWithClosedIndices () {
427+ final AutoFollowPattern autoFollowPattern = new AutoFollowPattern ("remote" , Collections .singletonList ("*" ),
428+ null , null , null , null , null , null , null , null , null , null , null );
429+
430+ // index is opened
431+ ClusterState remoteState = ClusterStateCreationUtils .stateWithActivePrimary ("test-index" , true , randomIntBetween (1 , 3 ), 0 );
432+ List <Index > result = AutoFollower .getLeaderIndicesToFollow (autoFollowPattern , remoteState , Collections .emptyList ());
433+ assertThat (result .size (), equalTo (1 ));
434+ assertThat (result , hasItem (remoteState .metaData ().index ("test-index" ).getIndex ()));
435+
436+ // index is closed
437+ remoteState = ClusterState .builder (remoteState )
438+ .metaData (MetaData .builder (remoteState .metaData ())
439+ .put (IndexMetaData .builder (remoteState .metaData ().index ("test-index" )).state (IndexMetaData .State .CLOSE ).build (), true )
440+ .build ())
441+ .build ();
442+ result = AutoFollower .getLeaderIndicesToFollow (autoFollowPattern , remoteState , Collections .emptyList ());
443+ assertThat (result .size (), equalTo (0 ));
444+ }
445+
419446 public void testRecordLeaderIndexAsFollowFunction () {
420447 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (Collections .emptyMap (),
421- Collections . singletonMap ("pattern1" , Collections .emptyList ()), Collections .emptyMap ());
448+ Map . of ("pattern1" , Collections .emptyList ()), Collections .emptyMap ());
422449 ClusterState clusterState = new ClusterState .Builder (new ClusterName ("name" ))
423450 .metaData (new MetaData .Builder ().putCustom (AutoFollowMetadata .TYPE , autoFollowMetadata ))
424451 .build ();
@@ -445,7 +472,7 @@ public void testRecordLeaderIndexAsFollowFunctionNoEntry() {
445472
446473 public void testCleanFollowedLeaderIndices () {
447474 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (Collections .emptyMap (),
448- Collections . singletonMap ("pattern1" , Arrays .asList ("index1" , "index2" , "index3" )), Collections .emptyMap ());
475+ Map . of ("pattern1" , Arrays .asList ("index1" , "index2" , "index3" )), Collections .emptyMap ());
449476 ClusterState clusterState = new ClusterState .Builder (new ClusterName ("name" ))
450477 .metaData (new MetaData .Builder ().putCustom (AutoFollowMetadata .TYPE , autoFollowMetadata ))
451478 .build ();
@@ -474,7 +501,7 @@ public void testCleanFollowedLeaderIndices() {
474501
475502 public void testCleanFollowedLeaderIndicesNoChanges () {
476503 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (Collections .emptyMap (),
477- Collections . singletonMap ("pattern1" , Arrays .asList ("index1" , "index2" , "index3" )), Collections .emptyMap ());
504+ Map . of ("pattern1" , Arrays .asList ("index1" , "index2" , "index3" )), Collections .emptyMap ());
478505 ClusterState clusterState = new ClusterState .Builder (new ClusterName ("name" ))
479506 .metaData (new MetaData .Builder ().putCustom (AutoFollowMetadata .TYPE , autoFollowMetadata ))
480507 .build ();
@@ -507,7 +534,7 @@ public void testCleanFollowedLeaderIndicesNoChanges() {
507534
508535 public void testCleanFollowedLeaderIndicesNoEntry () {
509536 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (Collections .emptyMap (),
510- Collections . singletonMap ("pattern2" , Arrays .asList ("index1" , "index2" , "index3" )), Collections .emptyMap ());
537+ Map . of ("pattern2" , Arrays .asList ("index1" , "index2" , "index3" )), Collections .emptyMap ());
511538 ClusterState clusterState = new ClusterState .Builder (new ClusterName ("name" ))
512539 .metaData (new MetaData .Builder ().putCustom (AutoFollowMetadata .TYPE , autoFollowMetadata ))
513540 .build ();
@@ -717,7 +744,7 @@ public void testWaitForMetadataVersion() {
717744 Map <String , List <String >> followedLeaderIndexUUIDS = new HashMap <>();
718745 followedLeaderIndexUUIDS .put ("remote" , new ArrayList <>());
719746 Map <String , Map <String , String >> autoFollowHeaders = new HashMap <>();
720- autoFollowHeaders .put ("remote" , Collections . singletonMap ("key" , "val" ));
747+ autoFollowHeaders .put ("remote" , Map . of ("key" , "val" ));
721748 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (patterns , followedLeaderIndexUUIDS , autoFollowHeaders );
722749
723750 final LinkedList <ClusterState > leaderStates = new LinkedList <>();
@@ -763,7 +790,9 @@ void updateAutoFollowMetadata(Function<ClusterState, ClusterState> updateFunctio
763790 autoFollower .start ();
764791 assertThat (allResults .size (), equalTo (states .length ));
765792 for (int i = 0 ; i < states .length ; i ++) {
766- assertThat (allResults .get (i ).autoFollowExecutionResults .containsKey (new Index ("logs-" + i , "_na_" )), is (true ));
793+ final String indexName = "logs-" + i ;
794+ assertThat (allResults .get (i ).autoFollowExecutionResults .keySet ().stream ()
795+ .anyMatch (index -> index .getName ().equals (indexName )), is (true ));
767796 }
768797 }
769798
@@ -778,7 +807,7 @@ public void testWaitForTimeOut() {
778807 Map <String , List <String >> followedLeaderIndexUUIDS = new HashMap <>();
779808 followedLeaderIndexUUIDS .put ("remote" , new ArrayList <>());
780809 Map <String , Map <String , String >> autoFollowHeaders = new HashMap <>();
781- autoFollowHeaders .put ("remote" , Collections . singletonMap ("key" , "val" ));
810+ autoFollowHeaders .put ("remote" , Map . of ("key" , "val" ));
782811 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (patterns , followedLeaderIndexUUIDS , autoFollowHeaders );
783812
784813 ClusterState [] states = new ClusterState [16 ];
@@ -836,7 +865,7 @@ public void testAutoFollowerSoftDeletesDisabled() {
836865 Map <String , List <String >> followedLeaderIndexUUIDS = new HashMap <>();
837866 followedLeaderIndexUUIDS .put ("remote" , new ArrayList <>());
838867 Map <String , Map <String , String >> autoFollowHeaders = new HashMap <>();
839- autoFollowHeaders .put ("remote" , Collections . singletonMap ("key" , "val" ));
868+ autoFollowHeaders .put ("remote" , Map . of ("key" , "val" ));
840869 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (patterns , followedLeaderIndexUUIDS , autoFollowHeaders );
841870
842871 ClusterState currentState = ClusterState .builder (new ClusterName ("name" ))
@@ -902,14 +931,14 @@ public void testAutoFollowerFollowerIndexAlreadyExists() {
902931 Map <String , List <String >> followedLeaderIndexUUIDS = new HashMap <>();
903932 followedLeaderIndexUUIDS .put ("remote" , new ArrayList <>());
904933 Map <String , Map <String , String >> autoFollowHeaders = new HashMap <>();
905- autoFollowHeaders .put ("remote" , Collections . singletonMap ("key" , "val" ));
934+ autoFollowHeaders .put ("remote" , Map . of ("key" , "val" ));
906935 AutoFollowMetadata autoFollowMetadata = new AutoFollowMetadata (patterns , followedLeaderIndexUUIDS , autoFollowHeaders );
907936
908937 ClusterState currentState = ClusterState .builder (new ClusterName ("name" ))
909938 .metaData (MetaData .builder ()
910939 .put (IndexMetaData .builder ("logs-20190101" )
911940 .settings (settings (Version .CURRENT ).put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), true ))
912- .putCustom (Ccr .CCR_CUSTOM_METADATA_KEY , Collections . singletonMap (Ccr .CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY ,
941+ .putCustom (Ccr .CCR_CUSTOM_METADATA_KEY , Map . of (Ccr .CCR_CUSTOM_METADATA_LEADER_INDEX_UUID_KEY ,
913942 remoteState .metaData ().index ("logs-20190101" ).getIndexUUID ()))
914943 .numberOfShards (1 )
915944 .numberOfReplicas (0 ))
@@ -1045,6 +1074,85 @@ void updateAutoFollowMetadata(
10451074 }
10461075 }
10471076
1077+ public void testClosedIndicesAreNotAutoFollowed () {
1078+ final Client client = mock (Client .class );
1079+ when (client .getRemoteClusterClient (anyString ())).thenReturn (client );
1080+
1081+ final String pattern = "pattern1" ;
1082+ final ClusterState localState = ClusterState .builder (new ClusterName ("local" ))
1083+ .metaData (MetaData .builder ()
1084+ .putCustom (AutoFollowMetadata .TYPE ,
1085+ new AutoFollowMetadata (Map .of (pattern , new AutoFollowPattern ("remote" , List .of ("docs-*" ), null ,
1086+ null , null , null , null , null , null , null , null , null , null )),
1087+ Map .of (pattern , List .of ()), Map .of (pattern , Map .of ()))))
1088+ .build ();
1089+
1090+ ClusterState remoteState = null ;
1091+ final int nbLeaderIndices = randomInt (15 );
1092+ for (int i = 0 ; i < nbLeaderIndices ; i ++) {
1093+ String indexName = "docs-" + i ;
1094+ if (remoteState == null ) {
1095+ remoteState = createRemoteClusterState (indexName , true );
1096+ } else {
1097+ remoteState = createRemoteClusterState (remoteState , indexName );
1098+ }
1099+ if (randomBoolean ()) {
1100+ // randomly close the index
1101+ remoteState = ClusterState .builder (remoteState .getClusterName ())
1102+ .routingTable (remoteState .routingTable ())
1103+ .metaData (MetaData .builder (remoteState .metaData ())
1104+ .put (IndexMetaData .builder (remoteState .metaData ().index (indexName )).state (IndexMetaData .State .CLOSE ).build (), true )
1105+ .build ())
1106+ .build ();
1107+ }
1108+ }
1109+
1110+ final ClusterState finalRemoteState = remoteState ;
1111+ final AtomicReference <ClusterState > lastModifiedClusterState = new AtomicReference <>(localState );
1112+ final List <AutoFollowCoordinator .AutoFollowResult > results = new ArrayList <>();
1113+ final Set <Object > followedIndices = ConcurrentCollections .newConcurrentSet ();
1114+ final AutoFollower autoFollower =
1115+ new AutoFollower ("remote" , results ::addAll , localClusterStateSupplier (localState ), () -> 1L , Runnable ::run ) {
1116+ @ Override
1117+ void getRemoteClusterState (String remoteCluster ,
1118+ long metadataVersion ,
1119+ BiConsumer <ClusterStateResponse , Exception > handler ) {
1120+ assertThat (remoteCluster , equalTo ("remote" ));
1121+ handler .accept (new ClusterStateResponse (new ClusterName ("remote" ), finalRemoteState , false ), null );
1122+ }
1123+
1124+ @ Override
1125+ void createAndFollow (Map <String , String > headers ,
1126+ PutFollowAction .Request followRequest ,
1127+ Runnable successHandler ,
1128+ Consumer <Exception > failureHandler ) {
1129+ followedIndices .add (followRequest .getLeaderIndex ());
1130+ successHandler .run ();
1131+ }
1132+
1133+ @ Override
1134+ void updateAutoFollowMetadata (Function <ClusterState , ClusterState > updateFunction , Consumer <Exception > handler ) {
1135+ lastModifiedClusterState .updateAndGet (updateFunction ::apply );
1136+ handler .accept (null );
1137+ }
1138+
1139+ @ Override
1140+ void cleanFollowedRemoteIndices (ClusterState remoteClusterState , List <String > patterns ) {
1141+ // Ignore, to avoid invoking updateAutoFollowMetadata(...) twice
1142+ }
1143+ };
1144+ autoFollower .start ();
1145+
1146+ assertThat (results , notNullValue ());
1147+ assertThat (results .size (), equalTo (1 ));
1148+
1149+ for (ObjectObjectCursor <String , IndexMetaData > index : remoteState .metaData ().indices ()) {
1150+ boolean expect = index .value .getState () == IndexMetaData .State .OPEN ;
1151+ assertThat (results .get (0 ).autoFollowExecutionResults .containsKey (index .value .getIndex ()), is (expect ));
1152+ assertThat (followedIndices .contains (index .key ), is (expect ));
1153+ }
1154+ }
1155+
10481156 private static ClusterState createRemoteClusterState (String indexName , boolean enableSoftDeletes ) {
10491157 Settings .Builder indexSettings ;
10501158 indexSettings = settings (Version .CURRENT ).put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), enableSoftDeletes );
@@ -1067,19 +1175,21 @@ private static ClusterState createRemoteClusterState(String indexName, boolean e
10671175
10681176 private static ClusterState createRemoteClusterState (ClusterState previous , String indexName ) {
10691177 IndexMetaData indexMetaData = IndexMetaData .builder (indexName )
1070- .settings (settings (Version .CURRENT ).put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), true ))
1178+ .settings (settings (Version .CURRENT )
1179+ .put (IndexSettings .INDEX_SOFT_DELETES_SETTING .getKey (), true )
1180+ .put (IndexMetaData .SETTING_INDEX_UUID , UUIDs .randomBase64UUID (random ())))
10711181 .numberOfShards (1 )
10721182 .numberOfReplicas (0 )
10731183 .build ();
1074- ClusterState .Builder csBuilder = ClusterState .builder (new ClusterName ( "remote" ))
1184+ ClusterState .Builder csBuilder = ClusterState .builder (previous . getClusterName ( ))
10751185 .metaData (MetaData .builder (previous .metaData ())
10761186 .version (previous .metaData ().version () + 1 )
10771187 .put (indexMetaData , true ));
10781188
10791189 ShardRouting shardRouting =
10801190 TestShardRouting .newShardRouting (indexName , 0 , "1" , true , ShardRoutingState .INITIALIZING ).moveToStarted ();
10811191 IndexRoutingTable indexRoutingTable = IndexRoutingTable .builder (indexMetaData .getIndex ()).addShard (shardRouting ).build ();
1082- csBuilder .routingTable (RoutingTable .builder ().add (indexRoutingTable ).build ()).build ();
1192+ csBuilder .routingTable (RoutingTable .builder (previous . routingTable () ).add (indexRoutingTable ).build ()).build ();
10831193
10841194 return csBuilder .build ();
10851195 }
0 commit comments