Skip to content

Commit

Permalink
Support external MVEL UDF (#1018)
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymo001 authored Jan 30, 2023
1 parent 453b97f commit b85e753
Show file tree
Hide file tree
Showing 8 changed files with 80 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public class MvelContextUDFs {
private MvelContextUDFs() { }

// register all the udfs defined in this class to the specific parser config
public static void registerUDFs(ParserConfiguration parserConfig) {
public static void registerUDFs(Class<?> clazz, ParserConfiguration parserConfig) {
// Scans the class (MvelContextUDFs) for any methods annotated with ExportToMvel
for (Method method : MvelContextUDFs.class.getMethods()) {
for (Method method : clazz.getMethods()) {
if (method.isAnnotationPresent(ExportToMvel.class)) {
if (!Modifier.isStatic(method.getModifiers())) {
throw new Error("MVEL context set up incorrectly. Imported method " + method + " must be static but is not.");
Expand All @@ -53,7 +53,7 @@ public static void registerUDFs(ParserConfiguration parserConfig) {
// parser context. Basically this is a way of adding UDFs.
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
private @interface ExportToMvel { }
public @interface ExportToMvel { }

/**
* Get the class type of the input object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.mvel2.MVEL;
import org.mvel2.ParserConfiguration;
Expand All @@ -31,10 +32,13 @@ private MvelContext() { }

private static final ParserConfiguration PARSER_CONFIG = new ParserConfiguration();


// External UDF register class
public static Optional<Broadcast<Class<?>>> mvelAlienUDFRegisterClazz = Optional.empty();
// Flag to avoid init external UDF everytime
public static Boolean alienUDFInitialized = false;
static {

MvelContextUDFs.registerUDFs(PARSER_CONFIG);
MvelContextUDFs.registerUDFs(MvelContextUDFs.class, PARSER_CONFIG);

loadJavaClasses();

Expand Down Expand Up @@ -98,7 +102,10 @@ private static void loadJavaClasses() {

// ensure the class is loaded in memory.
public static void ensureInitialized() {

if (!alienUDFInitialized && mvelAlienUDFRegisterClazz.isPresent()) {
MvelContextUDFs.registerUDFs(mvelAlienUDFRegisterClazz.get().getValue(), PARSER_CONFIG);
alienUDFInitialized = true;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.linkedin.feathr.offline.mvel.plugins

import com.linkedin.feathr.common.FeatureValue
import com.linkedin.feathr.offline.mvel.MvelContext
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.mvel2.ConversionHandler
import org.mvel2.conversion.ArrayHandler
import org.mvel2.util.ReflectionUtil.{isAssignableFrom, toNonPrimitiveType}

import java.io.Serializable
import java.util.Optional
import scala.collection.mutable

/**
Expand Down Expand Up @@ -43,13 +45,21 @@ class FeathrExpressionExecutionContext extends Serializable {
* @param typeAdaptor the type adaptor that can convert between the "other" representation and {@link FeatureValue}
* @param < T> type parameter for the "other" feature value class
*/
def setupExecutorMvelContext[T](clazz: Class[T], typeAdaptor: FeatureValueTypeAdaptor[T], sc: SparkContext): Unit = {
def setupExecutorMvelContext[T](clazz: Class[T],
typeAdaptor: FeatureValueTypeAdaptor[T],
sc: SparkContext,
mvelExtContext: Option[Class[Any]] = None): Unit = {
localFeatureValueTypeAdaptors.put(clazz.getCanonicalName, typeAdaptor.asInstanceOf[FeatureValueTypeAdaptor[AnyRef]])
featureValueTypeAdaptors = sc.broadcast(localFeatureValueTypeAdaptors)
// Add a converter that can convert external data to feature value
addConversionHandler(classOf[FeatureValue], new ExternalDataToFeatureValueHandler(featureValueTypeAdaptors), sc)
// Add a converter that can convert a feature value to external data
addConversionHandler(clazz, new FeatureValueToExternalDataHandler(typeAdaptor), sc)
MvelContext.mvelAlienUDFRegisterClazz = if (mvelExtContext.isDefined) {
Optional.of(sc.broadcast(mvelExtContext.get))
} else {
Optional.empty()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.linkedin.feathr.common;

import com.linkedin.feathr.common.util.MvelContextUDFs;

/**
*
* MVEL is an open-source expression language and runtime that makes it easy to write concise statements that operate
* on structured data objects (such as Avro records), among other things.
*
* This class contains all the udfs used in Mvel for both online and offline
*/
public class AlienMvelContextUDFs {
// The udf naming in this class should use a_b_c form. (to be consistent with existing Spark built-in UDFs).
private AlienMvelContextUDFs() { }


/**
* convert input to upper case string
* @param input input string
* @return upper case input
*/
@MvelContextUDFs.ExportToMvel
public static String toUpperCaseExt(String input) {
return input.toUpperCase();
}
/**
* convert input to upper case string
* @param input input string
* @return upper case input
*/
@MvelContextUDFs.ExportToMvel
public static String toUpperCase(String input) {
return input.toUpperCase();
}

}

Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.linkedin.feathr.common.util;

import java.io.Serializable;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

import com.linkedin.feathr.common.AlienMvelContextUDFs;
import org.mvel2.MVEL;
import org.mvel2.ParserConfiguration;
import org.mvel2.ParserContext;
Expand All @@ -23,10 +24,18 @@ public class MvelUDFExpressionTests {

@BeforeClass
public void setup() {
MvelContextUDFs.registerUDFs(PARSER_CONFIG);
MvelContextUDFs.registerUDFs(MvelContextUDFs.class, PARSER_CONFIG);
MvelContextUDFs.registerUDFs(AlienMvelContextUDFs.class, PARSER_CONFIG);
parserContext = new ParserContext(PARSER_CONFIG);
}

@Test
public void testToUpperCaseExt() {
String getTopTerm = "toUpperCaseExt('hello')";
Serializable compiledExpression = MVEL.compileExpression(getTopTerm, parserContext);
String res = (String) (MVEL.executeExpression(compiledExpression, ""));
Assert.assertEquals(res, "HELLO");
}

@Test
public void testGetTopTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
| anchors: {
| anchor1: {
| source: "anchorAndDerivations/nullValueSource.avro.json"
| key: "mId"
| key: "toUpperCaseExt(mId)"
| features: {
| featureWithNull: "isPresent(value) ? toNumeric(value) : 0"
| }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.feathr.offline

import com.linkedin.feathr.common
import com.linkedin.feathr.common.JoiningFeatureParams
import com.linkedin.feathr.common.{AlienMvelContextUDFs, JoiningFeatureParams}
import com.linkedin.feathr.offline.client.FeathrClient
import com.linkedin.feathr.offline.config.{FeathrConfig, FeathrConfigLoader}
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
Expand Down Expand Up @@ -31,7 +31,11 @@ abstract class TestFeathr {
@BeforeClass
def setup(): Unit = {
setupSpark()
mvelContext.setupExecutorMvelContext(classOf[AlienFeatureValue], new AlienFeatureValueTypeAdaptor(), ss.sparkContext)
mvelContext.setupExecutorMvelContext(classOf[AlienFeatureValue],
new AlienFeatureValueTypeAdaptor(),
ss.sparkContext,
Some(classOf[AlienMvelContextUDFs].asInstanceOf[Class[Any]])
)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=0.10.4-rc1
version=0.10.4-rc2
SONATYPE_AUTOMATIC_RELEASE=true
POM_ARTIFACT_ID=feathr_2.12

0 comments on commit b85e753

Please sign in to comment.