1818
1919package org .apache .hadoop .hive .metastore ;
2020
21+ import org .apache .commons .lang3 .tuple .Pair ;
2122import org .apache .hadoop .conf .Configuration ;
2223import org .apache .hadoop .hdfs .MiniDFSCluster ;
24+ import org .apache .hadoop .hive .common .TableName ;
2325import org .apache .hadoop .hive .metastore .conf .MetastoreConf ;
2426import org .apache .hadoop .hive .metastore .conf .MetastoreConf .ConfVars ;
27+ import org .apache .hadoop .hive .metastore .leader .LeaderElection ;
28+ import org .apache .hadoop .hive .metastore .leader .LeaderElectionContext ;
29+ import org .apache .hadoop .hive .metastore .leader .LeaderElectionFactory ;
30+ import org .apache .hadoop .hive .metastore .leader .LeaseLeaderElection ;
2531import org .apache .hadoop .hive .metastore .security .HadoopThriftAuthBridge ;
2632import org .apache .hadoop .hive .ql .stats .StatsUpdaterThread ;
2733import org .apache .hadoop .hive .ql .txn .compactor .Cleaner ;
3137import org .slf4j .Logger ;
3238import org .slf4j .LoggerFactory ;
3339
40+ import java .io .IOException ;
41+ import java .util .ArrayList ;
3442import java .util .HashMap ;
43+ import java .util .List ;
3544import java .util .Map ;
3645import java .util .Set ;
46+ import java .util .concurrent .CountDownLatch ;
47+ import java .util .concurrent .ScheduledExecutorService ;
3748import java .util .concurrent .TimeUnit ;
3849
3950/**
4051 * Base class for HMS leader config testing.
4152 */
42- class MetastoreHousekeepingLeaderTestBase {
53+ abstract class MetastoreHousekeepingLeaderTestBase {
4354 private static final Logger LOG = LoggerFactory .getLogger (MetastoreHousekeepingLeaderTestBase .class );
4455 private static HiveMetaStoreClient client ;
45- protected static Configuration conf = MetastoreConf . newMetastoreConf () ;
56+ protected Configuration conf ;
4657 private static Warehouse warehouse ;
4758 private static boolean isServerStarted = false ;
4859 private static int port ;
@@ -54,12 +65,15 @@ class MetastoreHousekeepingLeaderTestBase {
5465 static Map <String , Boolean > threadNames = new HashMap <>();
5566 static Map <Class <? extends Thread >, Boolean > threadClasses = new HashMap <>();
5667
57- void internalSetup (final String leaderHostName , boolean configuredLeader ) throws Exception {
68+ void setup (final String leaderHostName , Configuration configuration ) throws Exception {
69+ this .conf = configuration ;
5870 MetaStoreTestUtils .setConfForStandloneMode (conf );
5971 MetastoreConf .setVar (conf , ConfVars .THRIFT_BIND_HOST , "localhost" );
60- MetastoreConf .setVar (conf , ConfVars .METASTORE_HOUSEKEEPING_LEADER_HOSTNAME , leaderHostName );
6172 MetastoreConf .setVar (conf , ConfVars .METASTORE_HOUSEKEEPING_LEADER_ELECTION ,
62- configuredLeader ? "host" : "lock" );
73+ leaderHostName != null ? LeaderElectionFactory .Method .HOST .name () : LeaderElectionFactory .Method .LOCK .name ());
74+ if (leaderHostName != null ) {
75+ MetastoreConf .setVar (conf , ConfVars .METASTORE_HOUSEKEEPING_LEADER_HOSTNAME , leaderHostName );
76+ }
6377 MetastoreConf .setBoolVar (conf , MetastoreConf .ConfVars .COMPACTOR_INITIATOR_ON , true );
6478 MetastoreConf .setBoolVar (conf , MetastoreConf .ConfVars .COMPACTOR_CLEANER_ON , true );
6579
@@ -193,5 +207,199 @@ private void resetThreadStatus() {
193207 threadNames .forEach ((name , status ) -> threadNames .put (name , false ));
194208 threadClasses .forEach ((thread , status ) -> threadClasses .put (thread , false ));
195209 }
210+
211+ static class CombinedLeaderElector implements AutoCloseable {
212+ List <Pair <TableName , LeaderElection <TableName >>> elections = new ArrayList <>();
213+ private final Configuration configuration ;
214+ private String name ;
215+
216+ CombinedLeaderElector (Configuration conf ) throws IOException {
217+ this .configuration = conf ;
218+ for (LeaderElectionContext .TTYPE type : LeaderElectionContext .TTYPE .values ()) {
219+ TableName table = type .getTableName ();
220+ elections .add (Pair .of (table , new LeaseLeaderElection ()));
221+ }
222+ }
223+
224+ public void tryBeLeader () throws Exception {
225+ int i = 0 ;
226+ for (Pair <TableName , LeaderElection <TableName >> election : elections ) {
227+ LeaderElection <TableName > le = election .getRight ();
228+ le .setName (name + "-" + i ++);
229+ le .tryBeLeader (configuration , election .getLeft ());
230+ }
231+ }
232+
233+ public boolean isLeader () {
234+ boolean isLeader = true ;
235+ for (Pair <TableName , LeaderElection <TableName >> election : elections ) {
236+ isLeader &= election .getRight ().isLeader ();
237+ }
238+ return isLeader ;
239+ }
240+
241+ public void setName (String name ) {
242+ this .name = name ;
243+ }
244+
245+ @ Override
246+ public void close () throws Exception {
247+ for (Pair <TableName , LeaderElection <TableName >> election : elections ) {
248+ election .getRight ().close ();
249+ }
250+ }
251+ }
252+
253+ static class ReleaseAndRequireLease extends LeaseLeaderElection {
254+ private static CountDownLatch latch ;
255+ private final Configuration configuration ;
256+ private final boolean needRenewLease ;
257+ private TableName tableName ;
258+
259+ public static void setMonitor (CountDownLatch latch ) {
260+ ReleaseAndRequireLease .latch = latch ;
261+ }
262+ public static void reset () {
263+ ReleaseAndRequireLease .latch = null ;
264+ }
265+
266+ public ReleaseAndRequireLease (Configuration conf , boolean needRenewLease ) throws IOException {
267+ super ();
268+ this .configuration = conf ;
269+ this .needRenewLease = needRenewLease ;
270+ }
271+
272+ @ Override
273+ public void setName (String name ) {
274+ super .setName (name );
275+ LeaderElectionContext .TTYPE type = null ;
276+ for (LeaderElectionContext .TTYPE value : LeaderElectionContext .TTYPE .values ()) {
277+ if (value .getName ().equalsIgnoreCase (name )) {
278+ type = value ;
279+ break ;
280+ }
281+ }
282+ if (type == null ) {
283+ // This shouldn't happen at all
284+ throw new AssertionError ("Unknown elector name: " + name );
285+ }
286+ this .tableName = type .getTableName ();
287+ }
288+
289+ @ Override
290+ protected void notifyListener () {
291+ ScheduledExecutorService service = null ;
292+ if (!isLeader ) {
293+ try {
294+ service = ThreadPool .getPool ();
295+ } catch (Exception ignored ) {
296+ }
297+ }
298+ super .notifyListener ();
299+ if (isLeader ) {
300+ if (!needRenewLease ) {
301+ super .shutdownWatcher ();
302+ // In our tests, the time spent on notifying the listener might be greater than the lease timeout,
303+ // which makes the leader loss the leadership quickly after wake up, and kill all housekeeping services.
304+ // Make sure the leader is still valid while notifying the listener, and switch to ReleaseAndRequireWatcher
305+ // after all listeners finish their work.
306+ heartbeater = new ReleaseAndRequireWatcher (configuration , tableName );
307+ heartbeater .startWatch ();
308+ }
309+ } else {
310+ if (service != null ) {
311+ // If the housekeeping task is running behind
312+ Assert .assertTrue (service .isShutdown ());
313+ // Interrupt all sleeping tasks
314+ service .shutdownNow ();
315+ try {
316+ // This is the last one get notified, sleep some time to make sure all other
317+ // services have been stopped before return
318+ Thread .sleep (12000 );
319+ } catch (InterruptedException ignore ) {
320+ }
321+ }
322+ }
323+ if (latch != null ) {
324+ latch .countDown ();
325+ }
326+ }
327+
328+ // For testing purpose only, lock would become timeout and then acquire it again
329+ private class ReleaseAndRequireWatcher extends LeaseWatcher {
330+ long timeout ;
331+ public ReleaseAndRequireWatcher (Configuration conf ,
332+ TableName tableName ) {
333+ super (conf , tableName );
334+ timeout = MetastoreConf .getTimeVar (conf ,
335+ MetastoreConf .ConfVars .TXN_TIMEOUT , TimeUnit .MILLISECONDS ) + 3000 ;
336+ setName ("ReleaseAndRequireWatcher-" + ((name != null ) ? name + "-" : "" ) + ID .incrementAndGet ());
337+ }
338+
339+ @ Override
340+ public void beforeRun () {
341+ try {
342+ Thread .sleep (timeout );
343+ } catch (InterruptedException e ) {
344+ // ignore this
345+ }
346+ }
347+
348+ @ Override
349+ public void runInternal () {
350+ shutDown ();
351+ // The timeout lock should be cleaned,
352+ // sleep some time to let others take the chance to become the leader
353+ try {
354+ Thread .sleep (5000 );
355+ } catch (InterruptedException e ) {
356+ // ignore
357+ }
358+ // Acquire the lock again
359+ conf = new Configuration (conf );
360+ reclaim ();
361+ }
362+ }
363+ }
364+
365+ public void checkHouseKeepingThreadExistence (boolean isLeader ) throws Exception {
366+ searchHousekeepingThreads ();
367+
368+ // Verify existence of threads
369+ for (Map .Entry <String , Boolean > entry : threadNames .entrySet ()) {
370+ if (entry .getValue ()) {
371+ LOG .info ("Found thread with name {}" , entry .getKey ());
372+ } else {
373+ LOG .info ("No thread found with name {}" , entry .getKey ());
374+ }
375+ if (isLeader ) {
376+ Assert .assertTrue ("No thread with name " + entry .getKey () + " found." , entry .getValue ());
377+ } else {
378+ Assert .assertFalse ("Thread with name " + entry .getKey () + " found." , entry .getValue ());
379+ }
380+ }
381+
382+ for (Map .Entry <Class <? extends Thread >, Boolean > entry : threadClasses .entrySet ()) {
383+ if (isLeader ) {
384+ if (entry .getValue ()) {
385+ LOG .info ("Found thread for {}" , entry .getKey ().getSimpleName ());
386+ }
387+ Assert .assertTrue ("No thread found for class " + entry .getKey ().getSimpleName (), entry .getValue ());
388+ } else {
389+ // A non-leader HMS will still run the configured number of Compaction worker threads.
390+ if (entry .getKey () == Worker .class ) {
391+ if (entry .getValue ()) {
392+ LOG .info ("Thread found for " + entry .getKey ().getSimpleName ());
393+ }
394+ } else {
395+ if (!entry .getValue ()) {
396+ LOG .info ("No thread found for " + entry .getKey ().getSimpleName ());
397+ }
398+ Assert .assertFalse ("Thread found for class " + entry .getKey ().getSimpleName (),
399+ entry .getValue ());
400+ }
401+ }
402+ }
403+ }
196404}
197405
0 commit comments