1818
1919package org .apache .hadoop .yarn .server .router ;
2020
21+ import org .apache .commons .collections .CollectionUtils ;
2122import org .apache .commons .lang3 .math .NumberUtils ;
2223import org .apache .hadoop .classification .InterfaceAudience .Private ;
2324import org .apache .hadoop .classification .InterfaceAudience .Public ;
2425import org .apache .hadoop .classification .InterfaceStability .Unstable ;
2526import org .apache .hadoop .conf .Configuration ;
27+ import org .apache .hadoop .conf .StorageUnit ;
2628import org .apache .hadoop .security .UserGroupInformation ;
2729import org .apache .hadoop .security .token .Token ;
30+ import org .apache .hadoop .thirdparty .protobuf .GeneratedMessageV3 ;
2831import org .apache .hadoop .util .ReflectionUtils ;
2932import org .apache .hadoop .util .StringUtils ;
3033import org .apache .hadoop .yarn .api .records .ReservationRequest ;
3134import org .apache .hadoop .yarn .api .records .Priority ;
3235import org .apache .hadoop .yarn .api .records .ReservationRequestInterpreter ;
3336import org .apache .hadoop .yarn .api .records .Resource ;
3437import org .apache .hadoop .yarn .api .records .ReservationRequests ;
38+ import org .apache .hadoop .yarn .api .records .impl .pb .ApplicationSubmissionContextPBImpl ;
39+ import org .apache .hadoop .yarn .api .records .impl .pb .ContainerLaunchContextPBImpl ;
40+ import org .apache .hadoop .yarn .conf .YarnConfiguration ;
41+ import org .apache .hadoop .yarn .proto .YarnProtos .StringStringMapProto ;
42+ import org .apache .hadoop .yarn .proto .YarnProtos .StringBytesMapProto ;
43+ import org .apache .hadoop .yarn .proto .YarnProtos .ApplicationACLMapProto ;
44+ import org .apache .hadoop .yarn .proto .YarnProtos .StringLocalResourceMapProto ;
3545import org .apache .hadoop .yarn .server .resourcemanager .webapp .dao .ReservationDefinitionInfo ;
3646import org .apache .hadoop .yarn .server .resourcemanager .webapp .dao .ReservationRequestsInfo ;
3747import org .apache .hadoop .yarn .server .resourcemanager .webapp .dao .ReservationRequestInfo ;
3848import org .apache .hadoop .yarn .api .records .ReservationDefinition ;
49+ import org .apache .hadoop .yarn .api .records .ContainerLaunchContext ;
3950import org .apache .hadoop .yarn .server .resourcemanager .webapp .dao .ResourceInfo ;
4051import org .apache .hadoop .yarn .exceptions .YarnException ;
4152import org .apache .hadoop .yarn .exceptions .YarnRuntimeException ;
4253import org .apache .hadoop .yarn .security .client .RMDelegationTokenIdentifier ;
4354import org .slf4j .Logger ;
4455import org .slf4j .LoggerFactory ;
4556
57+ import java .io .ByteArrayOutputStream ;
58+ import java .io .ObjectOutputStream ;
4659import java .lang .reflect .InvocationTargetException ;
4760import java .lang .reflect .Method ;
4861import java .util .ArrayList ;
@@ -624,4 +637,118 @@ public static ReservationDefinition convertReservationDefinition(
624637
625638 return definition ;
626639 }
640+
641+ /**
642+ * Checks if the ApplicationSubmissionContext submitted with the application
643+ * is valid.
644+ *
645+ * Current checks:
646+ * - if its size is within limits.
647+ *
648+ * @param appContext the app context to check.
649+ * @throws IOException if an IO error occurred.
650+ * @throws YarnException yarn exception.
651+ */
652+ @ Public
653+ @ Unstable
654+ public static void checkAppSubmissionContext (ApplicationSubmissionContextPBImpl appContext ,
655+ Configuration conf ) throws IOException , YarnException {
656+ // Prevents DoS over the ApplicationClientProtocol by checking the context
657+ // the application was submitted with for any excessively large fields.
658+ double bytesOfMaxAscSize = conf .getStorageSize (
659+ YarnConfiguration .ROUTER_ASC_INTERCEPTOR_MAX_SIZE ,
660+ YarnConfiguration .DEFAULT_ROUTER_ASC_INTERCEPTOR_MAX_SIZE , StorageUnit .BYTES );
661+ if (appContext != null ) {
662+ int bytesOfSerializedSize = appContext .getProto ().getSerializedSize ();
663+ if (bytesOfSerializedSize >= bytesOfMaxAscSize ) {
664+ logContainerLaunchContext (appContext );
665+ String applicationId = appContext .getApplicationId ().toString ();
666+ String limit = StringUtils .byteDesc ((long ) bytesOfMaxAscSize );
667+ String appContentSize = StringUtils .byteDesc (bytesOfSerializedSize );
668+ String errMsg = String .format (
669+ "The size of the ApplicationSubmissionContext of the application %s is " +
670+ "above the limit %s, size = %s." , applicationId , limit , appContentSize );
671+ LOG .error (errMsg );
672+ throw new YarnException (errMsg );
673+ }
674+ }
675+ }
676+
677+ /**
678+ * Private helper for checkAppSubmissionContext that logs the fields in the
679+ * context for debugging.
680+ *
681+ * @param appContext the app context.
682+ * @throws IOException if an IO error occurred.
683+ */
684+ @ Private
685+ @ Unstable
686+ private static void logContainerLaunchContext (ApplicationSubmissionContextPBImpl appContext )
687+ throws IOException {
688+ if (appContext == null || appContext .getAMContainerSpec () == null ||
689+ !(appContext .getAMContainerSpec () instanceof ContainerLaunchContextPBImpl )) {
690+ return ;
691+ }
692+
693+ ContainerLaunchContext launchContext = appContext .getAMContainerSpec ();
694+ ContainerLaunchContextPBImpl clc = (ContainerLaunchContextPBImpl ) launchContext ;
695+ LOG .warn ("ContainerLaunchContext size: {}." , clc .getProto ().getSerializedSize ());
696+
697+ // ContainerLaunchContext contains:
698+ // 1) Map<String, LocalResource> localResources,
699+ List <StringLocalResourceMapProto > lrs = clc .getProto ().getLocalResourcesList ();
700+ logContainerLaunchContext ("LocalResource size: {}. Length: {}." , lrs );
701+
702+ // 2) Map<String, String> environment, List<String> commands,
703+ List <StringStringMapProto > envs = clc .getProto ().getEnvironmentList ();
704+ logContainerLaunchContext ("Environment size: {}. Length: {}." , envs );
705+
706+ List <String > cmds = clc .getCommands ();
707+ if (CollectionUtils .isNotEmpty (cmds )) {
708+ LOG .warn ("Commands size: {}. Length: {}." , cmds .size (), serialize (cmds ).length );
709+ }
710+
711+ // 3) Map<String, ByteBuffer> serviceData,
712+ List <StringBytesMapProto > serviceData = clc .getProto ().getServiceDataList ();
713+ logContainerLaunchContext ("ServiceData size: {}. Length: {}." , serviceData );
714+
715+ // 4) Map<ApplicationAccessType, String> acls
716+ List <ApplicationACLMapProto > acls = clc .getProto ().getApplicationACLsList ();
717+ logContainerLaunchContext ("ACLs size: {}. Length: {}." , acls );
718+ }
719+
720+ /**
721+ * Log ContainerLaunchContext Data SerializedSize.
722+ *
723+ * @param format format of logging.
724+ * @param lists data list.
725+ * @param <R> generic type R.
726+ */
727+ private static <R extends GeneratedMessageV3 > void logContainerLaunchContext (String format ,
728+ List <R > lists ) {
729+ if (CollectionUtils .isNotEmpty (lists )) {
730+ int sumLength = 0 ;
731+ for (R item : lists ) {
732+ sumLength += item .getSerializedSize ();
733+ }
734+ LOG .warn (format , lists .size (), sumLength );
735+ }
736+ }
737+
738+ /**
739+ * Serialize an object in ByteArray.
740+ *
741+ * @return obj ByteArray.
742+ * @throws IOException if an IO error occurred.
743+ */
744+ @ Private
745+ @ Unstable
746+ private static byte [] serialize (Object obj ) throws IOException {
747+ try (ByteArrayOutputStream b = new ByteArrayOutputStream ()) {
748+ try (ObjectOutputStream o = new ObjectOutputStream (b )) {
749+ o .writeObject (obj );
750+ }
751+ return b .toByteArray ();
752+ }
753+ }
627754}
0 commit comments