|
21 | 21 | import org.junit.Before; |
22 | 22 | import static org.mockito.Matchers.argThat; |
23 | 23 | import static org.mockito.Mockito.doNothing; |
| 24 | +import static org.mockito.Mockito.doAnswer; |
24 | 25 | import static org.mockito.Mockito.spy; |
25 | 26 |
|
26 | 27 | import java.util.ArrayList; |
|
37 | 38 | import org.apache.commons.logging.Log; |
38 | 39 | import org.apache.commons.logging.LogFactory; |
39 | 40 | import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; |
| 41 | +import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest; |
40 | 42 | import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest; |
41 | 43 | import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; |
42 | 44 | import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; |
43 | 45 | import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; |
44 | 46 | import org.apache.hadoop.yarn.api.protocolrecords.KillApplicationRequest; |
45 | 47 | import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; |
| 48 | +import org.apache.hadoop.yarn.api.records.ApplicationId; |
46 | 49 | import org.apache.hadoop.yarn.api.records.ApplicationReport; |
47 | 50 | import org.apache.hadoop.yarn.api.records.Container; |
48 | 51 | import org.apache.hadoop.yarn.api.records.ContainerId; |
49 | 52 | import org.apache.hadoop.yarn.api.records.ContainerState; |
| 53 | +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; |
50 | 54 | import org.apache.hadoop.yarn.api.records.NMToken; |
51 | 55 | import org.apache.hadoop.yarn.api.records.NodeId; |
52 | 56 | import org.apache.hadoop.yarn.api.records.ResourceRequest; |
|
57 | 61 | import org.apache.hadoop.yarn.event.AsyncDispatcher; |
58 | 62 | import org.apache.hadoop.yarn.event.Dispatcher; |
59 | 63 | import org.apache.hadoop.yarn.event.EventHandler; |
| 64 | +import org.apache.hadoop.yarn.server.resourcemanager.TestRMRestart.TestSecurityMockRM; |
| 65 | +import org.apache.hadoop.yarn.server.resourcemanager.recovery.MemoryRMStateStore; |
60 | 66 | import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; |
61 | 67 | import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; |
62 | 68 | import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; |
|
73 | 79 | import org.apache.log4j.Logger; |
74 | 80 | import org.junit.Test; |
75 | 81 | import org.mockito.ArgumentMatcher; |
| 82 | +import org.mockito.invocation.InvocationOnMock; |
| 83 | +import org.mockito.stubbing.Answer; |
76 | 84 |
|
77 | 85 | @SuppressWarnings({"unchecked", "rawtypes"}) |
78 | 86 | public class TestRM extends ParameterizedSchedulerTestBase { |
@@ -638,4 +646,107 @@ protected Dispatcher createDispatcher() { |
638 | 646 | Assert.assertEquals(appsSubmitted + 1, metrics.getAppsSubmitted()); |
639 | 647 | } |
640 | 648 |
|
| 649 | + // Test Kill an app while the app is finishing in the meanwhile. |
| 650 | + @Test (timeout = 30000) |
| 651 | + public void testKillFinishingApp() throws Exception{ |
| 652 | + |
| 653 | + // this dispatcher ignores RMAppAttemptEventType.KILL event |
| 654 | + final Dispatcher dispatcher = new AsyncDispatcher() { |
| 655 | + @Override |
| 656 | + public EventHandler getEventHandler() { |
| 657 | + |
| 658 | + class EventArgMatcher extends ArgumentMatcher<AbstractEvent> { |
| 659 | + @Override |
| 660 | + public boolean matches(Object argument) { |
| 661 | + if (argument instanceof RMAppAttemptEvent) { |
| 662 | + if (((RMAppAttemptEvent) argument).getType().equals( |
| 663 | + RMAppAttemptEventType.KILL)) { |
| 664 | + return true; |
| 665 | + } |
| 666 | + } |
| 667 | + return false; |
| 668 | + } |
| 669 | + } |
| 670 | + |
| 671 | + EventHandler handler = spy(super.getEventHandler()); |
| 672 | + doNothing().when(handler).handle(argThat(new EventArgMatcher())); |
| 673 | + return handler; |
| 674 | + } |
| 675 | + }; |
| 676 | + |
| 677 | + MockRM rm1 = new MockRM(conf){ |
| 678 | + @Override |
| 679 | + protected Dispatcher createDispatcher() { |
| 680 | + return dispatcher; |
| 681 | + } |
| 682 | + }; |
| 683 | + rm1.start(); |
| 684 | + MockNM nm1 = |
| 685 | + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); |
| 686 | + nm1.registerNode(); |
| 687 | + RMApp app1 = rm1.submitApp(200); |
| 688 | + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); |
| 689 | + |
| 690 | + rm1.killApp(app1.getApplicationId()); |
| 691 | + |
| 692 | + FinishApplicationMasterRequest req = |
| 693 | + FinishApplicationMasterRequest.newInstance( |
| 694 | + FinalApplicationStatus.SUCCEEDED, "", ""); |
| 695 | + am1.unregisterAppAttempt(req,true); |
| 696 | + |
| 697 | + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHING); |
| 698 | + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); |
| 699 | + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED); |
| 700 | + rm1.waitForState(app1.getApplicationId(), RMAppState.FINISHED); |
| 701 | + } |
| 702 | + |
| 703 | + // Test Kill an app while the app is failing |
| 704 | + @Test (timeout = 30000) |
| 705 | + public void testKillFailingApp() throws Exception{ |
| 706 | + |
| 707 | + // this dispatcher ignores RMAppAttemptEventType.KILL event |
| 708 | + final Dispatcher dispatcher = new AsyncDispatcher() { |
| 709 | + @Override |
| 710 | + public EventHandler getEventHandler() { |
| 711 | + |
| 712 | + class EventArgMatcher extends ArgumentMatcher<AbstractEvent> { |
| 713 | + @Override |
| 714 | + public boolean matches(Object argument) { |
| 715 | + if (argument instanceof RMAppAttemptEvent) { |
| 716 | + if (((RMAppAttemptEvent) argument).getType().equals( |
| 717 | + RMAppAttemptEventType.KILL)) { |
| 718 | + return true; |
| 719 | + } |
| 720 | + } |
| 721 | + return false; |
| 722 | + } |
| 723 | + } |
| 724 | + |
| 725 | + EventHandler handler = spy(super.getEventHandler()); |
| 726 | + doNothing().when(handler).handle(argThat(new EventArgMatcher())); |
| 727 | + return handler; |
| 728 | + } |
| 729 | + }; |
| 730 | + |
| 731 | + MockRM rm1 = new MockRM(conf){ |
| 732 | + @Override |
| 733 | + protected Dispatcher createDispatcher() { |
| 734 | + return dispatcher; |
| 735 | + } |
| 736 | + }; |
| 737 | + rm1.start(); |
| 738 | + MockNM nm1 = |
| 739 | + new MockNM("127.0.0.1:1234", 8192, rm1.getResourceTrackerService()); |
| 740 | + nm1.registerNode(); |
| 741 | + RMApp app1 = rm1.submitApp(200); |
| 742 | + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); |
| 743 | + |
| 744 | + rm1.killApp(app1.getApplicationId()); |
| 745 | + |
| 746 | + // fail the app by sending container_finished event. |
| 747 | + nm1.nodeHeartbeat(am1.getApplicationAttemptId(), 1, ContainerState.COMPLETE); |
| 748 | + rm1.waitForState(am1.getApplicationAttemptId(), RMAppAttemptState.FAILED); |
| 749 | + // app is killed, not launching a new attempt |
| 750 | + rm1.waitForState(app1.getApplicationId(), RMAppState.KILLED); |
| 751 | + } |
641 | 752 | } |
0 commit comments