diff --git a/feathr-impl/src/main/java/com/linkedin/feathr/common/util/MvelContextUDFs.java b/feathr-impl/src/main/java/com/linkedin/feathr/common/util/MvelContextUDFs.java index 16c8079e9..7e583b784 100644 --- a/feathr-impl/src/main/java/com/linkedin/feathr/common/util/MvelContextUDFs.java +++ b/feathr-impl/src/main/java/com/linkedin/feathr/common/util/MvelContextUDFs.java @@ -37,9 +37,9 @@ public class MvelContextUDFs { private MvelContextUDFs() { } // register all the udfs defined in this class to the specific parser config - public static void registerUDFs(ParserConfiguration parserConfig) { + public static void registerUDFs(Class clazz, ParserConfiguration parserConfig) { // Scans the class (MvelContextUDFs) for any methods annotated with ExportToMvel - for (Method method : MvelContextUDFs.class.getMethods()) { + for (Method method : clazz.getMethods()) { if (method.isAnnotationPresent(ExportToMvel.class)) { if (!Modifier.isStatic(method.getModifiers())) { throw new Error("MVEL context set up incorrectly. Imported method " + method + " must be static but is not."); @@ -53,7 +53,7 @@ public static void registerUDFs(ParserConfiguration parserConfig) { // parser context. Basically this is a way of adding UDFs. @Retention(RetentionPolicy.RUNTIME) @Target(ElementType.METHOD) - private @interface ExportToMvel { } + public @interface ExportToMvel { } /** * Get the class type of the input object diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/MvelContext.java b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/MvelContext.java index 1ce8136c9..337481445 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/MvelContext.java +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/MvelContext.java @@ -8,6 +8,7 @@ import org.apache.avro.generic.GenericEnumSymbol; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; +import org.apache.spark.broadcast.Broadcast; import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema; import org.mvel2.MVEL; import org.mvel2.ParserConfiguration; @@ -31,10 +32,13 @@ private MvelContext() { } private static final ParserConfiguration PARSER_CONFIG = new ParserConfiguration(); - + // External UDF register class + public static Optional>> mvelAlienUDFRegisterClazz = Optional.empty(); + // Flag to avoid init external UDF everytime + public static Boolean alienUDFInitialized = false; static { - MvelContextUDFs.registerUDFs(PARSER_CONFIG); + MvelContextUDFs.registerUDFs(MvelContextUDFs.class, PARSER_CONFIG); loadJavaClasses(); @@ -98,7 +102,10 @@ private static void loadJavaClasses() { // ensure the class is loaded in memory. public static void ensureInitialized() { - + if (!alienUDFInitialized && mvelAlienUDFRegisterClazz.isPresent()) { + MvelContextUDFs.registerUDFs(mvelAlienUDFRegisterClazz.get().getValue(), PARSER_CONFIG); + alienUDFInitialized = true; + } } /** diff --git a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/plugins/FeathrExpressionExecutionContext.scala b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/plugins/FeathrExpressionExecutionContext.scala index 67371464f..ba63a654c 100644 --- a/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/plugins/FeathrExpressionExecutionContext.scala +++ b/feathr-impl/src/main/scala/com/linkedin/feathr/offline/mvel/plugins/FeathrExpressionExecutionContext.scala @@ -1,6 +1,7 @@ package com.linkedin.feathr.offline.mvel.plugins import com.linkedin.feathr.common.FeatureValue +import com.linkedin.feathr.offline.mvel.MvelContext import org.apache.spark.SparkContext import org.apache.spark.broadcast.Broadcast import org.mvel2.ConversionHandler @@ -8,6 +9,7 @@ import org.mvel2.conversion.ArrayHandler import org.mvel2.util.ReflectionUtil.{isAssignableFrom, toNonPrimitiveType} import java.io.Serializable +import java.util.Optional import scala.collection.mutable /** @@ -43,13 +45,21 @@ class FeathrExpressionExecutionContext extends Serializable { * @param typeAdaptor the type adaptor that can convert between the "other" representation and {@link FeatureValue} * @param < T> type parameter for the "other" feature value class */ - def setupExecutorMvelContext[T](clazz: Class[T], typeAdaptor: FeatureValueTypeAdaptor[T], sc: SparkContext): Unit = { + def setupExecutorMvelContext[T](clazz: Class[T], + typeAdaptor: FeatureValueTypeAdaptor[T], + sc: SparkContext, + mvelExtContext: Option[Class[Any]] = None): Unit = { localFeatureValueTypeAdaptors.put(clazz.getCanonicalName, typeAdaptor.asInstanceOf[FeatureValueTypeAdaptor[AnyRef]]) featureValueTypeAdaptors = sc.broadcast(localFeatureValueTypeAdaptors) // Add a converter that can convert external data to feature value addConversionHandler(classOf[FeatureValue], new ExternalDataToFeatureValueHandler(featureValueTypeAdaptors), sc) // Add a converter that can convert a feature value to external data addConversionHandler(clazz, new FeatureValueToExternalDataHandler(typeAdaptor), sc) + MvelContext.mvelAlienUDFRegisterClazz = if (mvelExtContext.isDefined) { + Optional.of(sc.broadcast(mvelExtContext.get)) + } else { + Optional.empty() + } } /** diff --git a/feathr-impl/src/test/java/com/linkedin/feathr/common/AlienMvelContextUDFs.java b/feathr-impl/src/test/java/com/linkedin/feathr/common/AlienMvelContextUDFs.java new file mode 100644 index 000000000..0b9f04442 --- /dev/null +++ b/feathr-impl/src/test/java/com/linkedin/feathr/common/AlienMvelContextUDFs.java @@ -0,0 +1,37 @@ +package com.linkedin.feathr.common; + +import com.linkedin.feathr.common.util.MvelContextUDFs; + +/** + * + * MVEL is an open-source expression language and runtime that makes it easy to write concise statements that operate + * on structured data objects (such as Avro records), among other things. + * + * This class contains all the udfs used in Mvel for both online and offline + */ +public class AlienMvelContextUDFs { + // The udf naming in this class should use a_b_c form. (to be consistent with existing Spark built-in UDFs). + private AlienMvelContextUDFs() { } + + + /** + * convert input to upper case string + * @param input input string + * @return upper case input + */ + @MvelContextUDFs.ExportToMvel + public static String toUpperCaseExt(String input) { + return input.toUpperCase(); + } + /** + * convert input to upper case string + * @param input input string + * @return upper case input + */ + @MvelContextUDFs.ExportToMvel + public static String toUpperCase(String input) { + return input.toUpperCase(); + } + +} + diff --git a/feathr-impl/src/test/java/com/linkedin/feathr/common/util/MvelUDFExpressionTests.java b/feathr-impl/src/test/java/com/linkedin/feathr/common/util/MvelUDFExpressionTests.java index 8ef18f292..734c1fbeb 100644 --- a/feathr-impl/src/test/java/com/linkedin/feathr/common/util/MvelUDFExpressionTests.java +++ b/feathr-impl/src/test/java/com/linkedin/feathr/common/util/MvelUDFExpressionTests.java @@ -1,9 +1,10 @@ package com.linkedin.feathr.common.util; import java.io.Serializable; -import java.net.URISyntaxException; import java.util.ArrayList; import java.util.List; + +import com.linkedin.feathr.common.AlienMvelContextUDFs; import org.mvel2.MVEL; import org.mvel2.ParserConfiguration; import org.mvel2.ParserContext; @@ -23,10 +24,18 @@ public class MvelUDFExpressionTests { @BeforeClass public void setup() { - MvelContextUDFs.registerUDFs(PARSER_CONFIG); + MvelContextUDFs.registerUDFs(MvelContextUDFs.class, PARSER_CONFIG); + MvelContextUDFs.registerUDFs(AlienMvelContextUDFs.class, PARSER_CONFIG); parserContext = new ParserContext(PARSER_CONFIG); } + @Test + public void testToUpperCaseExt() { + String getTopTerm = "toUpperCaseExt('hello')"; + Serializable compiledExpression = MVEL.compileExpression(getTopTerm, parserContext); + String res = (String) (MVEL.executeExpression(compiledExpression, "")); + Assert.assertEquals(res, "HELLO"); + } @Test public void testGetTopTerm() { diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala index 02964dab2..a9185f7a2 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/AnchoredFeaturesIntegTest.scala @@ -294,7 +294,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest { | anchors: { | anchor1: { | source: "anchorAndDerivations/nullValueSource.avro.json" - | key: "mId" + | key: "toUpperCaseExt(mId)" | features: { | featureWithNull: "isPresent(value) ? toNumeric(value) : 0" | } diff --git a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/TestFeathr.scala b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/TestFeathr.scala index e9fb8840e..b12c2452d 100644 --- a/feathr-impl/src/test/scala/com/linkedin/feathr/offline/TestFeathr.scala +++ b/feathr-impl/src/test/scala/com/linkedin/feathr/offline/TestFeathr.scala @@ -1,7 +1,7 @@ package com.linkedin.feathr.offline import com.linkedin.feathr.common -import com.linkedin.feathr.common.JoiningFeatureParams +import com.linkedin.feathr.common.{AlienMvelContextUDFs, JoiningFeatureParams} import com.linkedin.feathr.offline.client.FeathrClient import com.linkedin.feathr.offline.config.{FeathrConfig, FeathrConfigLoader} import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext @@ -31,7 +31,11 @@ abstract class TestFeathr { @BeforeClass def setup(): Unit = { setupSpark() - mvelContext.setupExecutorMvelContext(classOf[AlienFeatureValue], new AlienFeatureValueTypeAdaptor(), ss.sparkContext) + mvelContext.setupExecutorMvelContext(classOf[AlienFeatureValue], + new AlienFeatureValueTypeAdaptor(), + ss.sparkContext, + Some(classOf[AlienMvelContextUDFs].asInstanceOf[Class[Any]]) + ) } /** diff --git a/gradle.properties b/gradle.properties index b9c4f8a70..e7206fd95 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,3 @@ -version=0.10.4-rc1 +version=0.10.4-rc2 SONATYPE_AUTOMATIC_RELEASE=true POM_ARTIFACT_ID=feathr_2.12