From 02083d64b8fab01fb2b98d5894da5bf90e9f85f4 Mon Sep 17 00:00:00 2001 From: "James Z.M. Gao" Date: Fri, 14 Mar 2014 18:50:08 +0800 Subject: [PATCH] fix compile error for hadoop CDH 4.4+ Using a macro, we work round the difference between hadoop 2.0-alpha and 2.1-beta api, and fix the compilation error when set SPARK_HADOOP_VERSION to 2.0.0-cdh4.4.0. That is, the yarn-alpha project should work with hadoop CDH 4.4+ and later. --- .../yarn/YarnAllocationHandlerMacro.scala | 46 +++++++++++++++++++ .../deploy/yarn/YarnAllocationHandler.scala | 4 +- 2 files changed, 48 insertions(+), 2 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandlerMacro.scala diff --git a/core/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandlerMacro.scala b/core/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandlerMacro.scala new file mode 100644 index 000000000000..004f10b3f587 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandlerMacro.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.deploy.yarn + +import scala.language.experimental.macros +import scala.reflect.macros.Context + +private[yarn] object YarnAllocationHandlerMacro { + def getAMResp(resp: Any): Any = macro getAMRespImpl + + /** + * From Hadoop CDH 4.4.0+ (2.1.0-beta), + * AMResponse is merged into AllocateResponse, + * so we don't need to call getAMResponse(), just use AllocateResponse directly. + * This macro will test the existence of AMResponse, + * and generate diffenert expressions. + * + * This macro now is only used in spark's alpha version of yarn api. + * It stays in the core project, for the two-stage compiling of + * the scala macro system. + */ + def getAMRespImpl(c: Context)(resp: c.Expr[Any]) = { + try { + import c.universe._ + c.mirror.staticClass("org.apache.hadoop.yarn.api.records.AMResponse") + c.Expr[Any](Apply(Select(resp.tree, newTermName("getAMResponse")), List())) + } catch { + case _: Throwable => resp + } + } +} diff --git a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala index 2056667af50c..243e41598276 100644 --- a/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala +++ b/yarn/alpha/src/main/scala/org/apache/spark/deploy/yarn/YarnAllocationHandler.scala @@ -33,7 +33,7 @@ import org.apache.spark.util.Utils import org.apache.hadoop.conf.Configuration import org.apache.hadoop.yarn.api.AMRMProtocol -import org.apache.hadoop.yarn.api.records.{AMResponse, ApplicationAttemptId} +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId import org.apache.hadoop.yarn.api.records.{Container, ContainerId, ContainerStatus} import org.apache.hadoop.yarn.api.records.{Priority, Resource, ResourceRequest} import org.apache.hadoop.yarn.api.protocolrecords.{AllocateRequest, AllocateResponse} @@ -103,7 +103,7 @@ private[yarn] class YarnAllocationHandler( // this much. // Keep polling the Resource Manager for containers - val amResp = allocateExecutorResources(executorsToRequest).getAMResponse + val amResp = YarnAllocationHandlerMacro.getAMResp(allocateExecutorResources(executorsToRequest)) val _allocatedContainers = amResp.getAllocatedContainers()