1+ /**
2+ * Licensed to the Apache Software Foundation (ASF) under one
3+ * or more contributor license agreements. See the NOTICE file
4+ * distributed with this work for additional information
5+ * regarding copyright ownership. The ASF licenses this file
6+ * to you under the Apache License, Version 2.0 (the
7+ * "License"); you may not use this file except in compliance
8+ * with the License. You may obtain a copy of the License at
9+ *
10+ * http://www.apache.org/licenses/LICENSE-2.0
11+ *
12+ * Unless required by applicable law or agreed to in writing, software
13+ * distributed under the License is distributed on an "AS IS" BASIS,
14+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+ * See the License for the specific language governing permissions and
16+ * limitations under the License.
17+ */
18+
19+ package org .apache .hadoop .hdfs .server .federation .router ;
20+
21+ import org .apache .hadoop .conf .Configuration ;
22+ import org .apache .hadoop .fs .CacheFlag ;
23+ import org .apache .hadoop .fs .FSDataOutputStream ;
24+ import org .apache .hadoop .fs .FileSystem ;
25+ import org .apache .hadoop .fs .Path ;
26+ import org .apache .hadoop .hdfs .protocol .CacheDirectiveEntry ;
27+ import org .apache .hadoop .hdfs .protocol .CacheDirectiveInfo ;
28+ import org .apache .hadoop .hdfs .protocol .CachePoolEntry ;
29+ import org .apache .hadoop .hdfs .protocol .CachePoolInfo ;
30+ import org .apache .hadoop .hdfs .server .federation .MiniRouterDFSCluster ;
31+ import org .apache .hadoop .hdfs .server .federation .MockResolver ;
32+ import org .apache .hadoop .hdfs .server .federation .RouterConfigBuilder ;
33+ import org .apache .hadoop .fs .BatchedRemoteIterator .BatchedEntries ;
34+ import org .apache .hadoop .ipc .CallerContext ;
35+ import org .junit .After ;
36+ import org .junit .AfterClass ;
37+ import org .junit .Before ;
38+ import org .junit .BeforeClass ;
39+ import org .junit .Test ;
40+ import org .mockito .Mockito ;
41+
42+ import java .io .IOException ;
43+ import java .util .EnumSet ;
44+ import java .util .concurrent .TimeUnit ;
45+
46+ import static org .apache .hadoop .hdfs .server .federation .FederationTestUtils .NAMENODES ;
47+ import static org .apache .hadoop .hdfs .server .federation .MiniRouterDFSCluster .DEFAULT_HEARTBEAT_INTERVAL_MS ;
48+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT ;
49+ import static org .apache .hadoop .hdfs .server .federation .router .RBFConfigKeys .DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT ;
50+ import static org .apache .hadoop .hdfs .server .federation .router .async .AsyncUtil .syncReturn ;
51+ import static org .junit .Assert .assertEquals ;
52+ import static org .junit .Assert .assertTrue ;
53+
54+ public class TestRouterAsyncCacheAdmin {
55+ private static Configuration routerConf ;
56+ /** Federated HDFS cluster. */
57+ private static MiniRouterDFSCluster cluster ;
58+ private static String ns0 ;
59+
60+ /** Random Router for this federated cluster. */
61+ private MiniRouterDFSCluster .RouterContext router ;
62+ private FileSystem routerFs ;
63+ private RouterRpcServer routerRpcServer ;
64+ private RouterAsyncCacheAdmin asyncCacheAdmin ;
65+
66+ @ BeforeClass
67+ public static void setUpCluster () throws Exception {
68+ cluster = new MiniRouterDFSCluster (true , 1 , 2 ,
69+ DEFAULT_HEARTBEAT_INTERVAL_MS , 1000 );
70+ cluster .setNumDatanodesPerNameservice (3 );
71+
72+ cluster .startCluster ();
73+
74+ // Making one Namenode active per nameservice
75+ if (cluster .isHighAvailability ()) {
76+ for (String ns : cluster .getNameservices ()) {
77+ cluster .switchToActive (ns , NAMENODES [0 ]);
78+ cluster .switchToStandby (ns , NAMENODES [1 ]);
79+ }
80+ }
81+ // Start routers with only an RPC service
82+ routerConf = new RouterConfigBuilder ()
83+ .rpc ()
84+ .build ();
85+
86+ // Reduce the number of RPC clients threads to overload the Router easy
87+ routerConf .setInt (RBFConfigKeys .DFS_ROUTER_CLIENT_THREADS_SIZE , 1 );
88+ routerConf .setInt (DFS_ROUTER_RPC_ASYNC_HANDLER_COUNT , 1 );
89+ routerConf .setInt (DFS_ROUTER_RPC_ASYNC_RESPONDER_COUNT , 1 );
90+ // We decrease the DN cache times to make the test faster
91+ routerConf .setTimeDuration (
92+ RBFConfigKeys .DN_REPORT_CACHE_EXPIRE , 1 , TimeUnit .SECONDS );
93+ cluster .addRouterOverrides (routerConf );
94+ // Start routers with only an RPC service
95+ cluster .startRouters ();
96+
97+ // Register and verify all NNs with all routers
98+ cluster .registerNamenodes ();
99+ cluster .waitNamenodeRegistration ();
100+ cluster .waitActiveNamespaces ();
101+ ns0 = cluster .getNameservices ().get (0 );
102+ }
103+
104+ @ AfterClass
105+ public static void shutdownCluster () throws Exception {
106+ if (cluster != null ) {
107+ cluster .shutdown ();
108+ }
109+ }
110+
111+ @ Before
112+ public void setUp () throws IOException {
113+ router = cluster .getRandomRouter ();
114+ routerFs = router .getFileSystem ();
115+ routerRpcServer = router .getRouterRpcServer ();
116+ routerRpcServer .initAsyncThreadPool ();
117+ RouterAsyncRpcClient asyncRpcClient = new RouterAsyncRpcClient (
118+ routerConf , router .getRouter (), routerRpcServer .getNamenodeResolver (),
119+ routerRpcServer .getRPCMonitor (),
120+ routerRpcServer .getRouterStateIdContext ());
121+ RouterRpcServer spy = Mockito .spy (routerRpcServer );
122+ Mockito .when (spy .getRPCClient ()).thenReturn (asyncRpcClient );
123+ asyncCacheAdmin = new RouterAsyncCacheAdmin (spy );
124+
125+ // Create mock locations
126+ MockResolver resolver = (MockResolver ) router .getRouter ().getSubclusterResolver ();
127+ resolver .addLocation ("/" , ns0 , "/" );
128+ FSDataOutputStream fsDataOutputStream = routerFs .create (
129+ new Path ("/testCache.file" ), true );
130+ fsDataOutputStream .write (new byte [1024 ]);
131+ fsDataOutputStream .close ();
132+ }
133+
134+ @ After
135+ public void tearDown () throws IOException {
136+ // clear client context
137+ CallerContext .setCurrent (null );
138+ boolean delete = routerFs .delete (new Path ("/testCache.file" ));
139+ assertTrue (delete );
140+ if (routerFs != null ) {
141+ routerFs .close ();
142+ }
143+ }
144+
145+ @ Test
146+ public void testRouterAsyncCacheAdmin () throws Exception {
147+ asyncCacheAdmin .addCachePool (new CachePoolInfo ("pool" ));
148+ syncReturn (null );
149+
150+ CacheDirectiveInfo path = new CacheDirectiveInfo .Builder ().
151+ setPool ("pool" ).
152+ setPath (new Path ("/testCache.file" )).
153+ build ();
154+ asyncCacheAdmin .addCacheDirective (path , EnumSet .of (CacheFlag .FORCE ));
155+ long result = syncReturn (long .class );
156+ assertEquals (1 , result );
157+
158+ asyncCacheAdmin .listCachePools ("" );
159+ BatchedEntries <CachePoolEntry > cachePoolEntries = syncReturn (BatchedEntries .class );
160+ assertEquals ("pool" , cachePoolEntries .get (0 ).getInfo ().getPoolName ());
161+
162+ CacheDirectiveInfo filter = new CacheDirectiveInfo .Builder ().
163+ setPool ("pool" ).
164+ build ();
165+ asyncCacheAdmin .listCacheDirectives (0 , filter );
166+ BatchedEntries <CacheDirectiveEntry > cacheDirectiveEntries = syncReturn (BatchedEntries .class );
167+ assertEquals (new Path ("/testCache.file" ), cacheDirectiveEntries .get (0 ).getInfo ().getPath ());
168+
169+ CachePoolInfo pool = new CachePoolInfo ("pool" ).setOwnerName ("pool_user" );
170+ asyncCacheAdmin .modifyCachePool (pool );
171+ syncReturn (null );
172+
173+ asyncCacheAdmin .listCachePools ("" );
174+ cachePoolEntries = syncReturn (BatchedEntries .class );
175+ assertEquals ("pool_user" , cachePoolEntries .get (0 ).getInfo ().getOwnerName ());
176+
177+ path = new CacheDirectiveInfo .Builder ().
178+ setPool ("pool" ).
179+ setPath (new Path ("/testCache.file" )).
180+ setReplication ((short ) 2 ).
181+ setId (1L ).
182+ build ();
183+ asyncCacheAdmin .modifyCacheDirective (path , EnumSet .of (CacheFlag .FORCE ));
184+ syncReturn (null );
185+
186+ asyncCacheAdmin .listCacheDirectives (0 , filter );
187+ cacheDirectiveEntries = syncReturn (BatchedEntries .class );
188+ assertEquals (Short .valueOf ((short ) 2 ), cacheDirectiveEntries .get (0 ).getInfo ().getReplication ());
189+
190+ asyncCacheAdmin .removeCacheDirective (1L );
191+ syncReturn (null );
192+ asyncCacheAdmin .removeCachePool ("pool" );
193+ syncReturn (null );
194+ }
195+ }
0 commit comments