Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support external/alien MVEL UDF #1018

Merged
merged 1 commit into from
Jan 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ public class MvelContextUDFs {
private MvelContextUDFs() { }

// register all the udfs defined in this class to the specific parser config
public static void registerUDFs(ParserConfiguration parserConfig) {
public static void registerUDFs(Class<?> clazz, ParserConfiguration parserConfig) {
// Scans the class (MvelContextUDFs) for any methods annotated with ExportToMvel
for (Method method : MvelContextUDFs.class.getMethods()) {
for (Method method : clazz.getMethods()) {
if (method.isAnnotationPresent(ExportToMvel.class)) {
if (!Modifier.isStatic(method.getModifiers())) {
throw new Error("MVEL context set up incorrectly. Imported method " + method + " must be static but is not.");
Expand All @@ -53,7 +53,7 @@ public static void registerUDFs(ParserConfiguration parserConfig) {
// parser context. Basically this is a way of adding UDFs.
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
private @interface ExportToMvel { }
public @interface ExportToMvel { }

/**
* Get the class type of the input object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import org.apache.avro.generic.GenericEnumSymbol;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.spark.broadcast.Broadcast;
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema;
import org.mvel2.MVEL;
import org.mvel2.ParserConfiguration;
Expand All @@ -31,10 +32,13 @@ private MvelContext() { }

private static final ParserConfiguration PARSER_CONFIG = new ParserConfiguration();


// External UDF register class
public static Optional<Broadcast<Class<?>>> mvelAlienUDFRegisterClazz = Optional.empty();
// Flag to avoid init external UDF everytime
public static Boolean alienUDFInitialized = false;
static {

MvelContextUDFs.registerUDFs(PARSER_CONFIG);
MvelContextUDFs.registerUDFs(MvelContextUDFs.class, PARSER_CONFIG);

loadJavaClasses();

Expand Down Expand Up @@ -98,7 +102,10 @@ private static void loadJavaClasses() {

// ensure the class is loaded in memory.
public static void ensureInitialized() {

if (!alienUDFInitialized && mvelAlienUDFRegisterClazz.isPresent()) {
MvelContextUDFs.registerUDFs(mvelAlienUDFRegisterClazz.get().getValue(), PARSER_CONFIG);
alienUDFInitialized = true;
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package com.linkedin.feathr.offline.mvel.plugins

import com.linkedin.feathr.common.FeatureValue
import com.linkedin.feathr.offline.mvel.MvelContext
import org.apache.spark.SparkContext
import org.apache.spark.broadcast.Broadcast
import org.mvel2.ConversionHandler
import org.mvel2.conversion.ArrayHandler
import org.mvel2.util.ReflectionUtil.{isAssignableFrom, toNonPrimitiveType}

import java.io.Serializable
import java.util.Optional
import scala.collection.mutable

/**
Expand Down Expand Up @@ -43,13 +45,21 @@ class FeathrExpressionExecutionContext extends Serializable {
* @param typeAdaptor the type adaptor that can convert between the "other" representation and {@link FeatureValue}
* @param < T> type parameter for the "other" feature value class
*/
def setupExecutorMvelContext[T](clazz: Class[T], typeAdaptor: FeatureValueTypeAdaptor[T], sc: SparkContext): Unit = {
def setupExecutorMvelContext[T](clazz: Class[T],
typeAdaptor: FeatureValueTypeAdaptor[T],
sc: SparkContext,
mvelExtContext: Option[Class[Any]] = None): Unit = {
localFeatureValueTypeAdaptors.put(clazz.getCanonicalName, typeAdaptor.asInstanceOf[FeatureValueTypeAdaptor[AnyRef]])
featureValueTypeAdaptors = sc.broadcast(localFeatureValueTypeAdaptors)
// Add a converter that can convert external data to feature value
addConversionHandler(classOf[FeatureValue], new ExternalDataToFeatureValueHandler(featureValueTypeAdaptors), sc)
// Add a converter that can convert a feature value to external data
addConversionHandler(clazz, new FeatureValueToExternalDataHandler(typeAdaptor), sc)
MvelContext.mvelAlienUDFRegisterClazz = if (mvelExtContext.isDefined) {
Optional.of(sc.broadcast(mvelExtContext.get))
} else {
Optional.empty()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.linkedin.feathr.common;

import com.linkedin.feathr.common.util.MvelContextUDFs;

/**
*
* MVEL is an open-source expression language and runtime that makes it easy to write concise statements that operate
* on structured data objects (such as Avro records), among other things.
*
* This class contains all the udfs used in Mvel for both online and offline
*/
public class AlienMvelContextUDFs {
// The udf naming in this class should use a_b_c form. (to be consistent with existing Spark built-in UDFs).
private AlienMvelContextUDFs() { }


/**
* convert input to upper case string
* @param input input string
* @return upper case input
*/
@MvelContextUDFs.ExportToMvel
public static String toUpperCaseExt(String input) {
return input.toUpperCase();
}
/**
* convert input to upper case string
* @param input input string
* @return upper case input
*/
@MvelContextUDFs.ExportToMvel
public static String toUpperCase(String input) {
return input.toUpperCase();
}

}

Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
package com.linkedin.feathr.common.util;

import java.io.Serializable;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.List;

import com.linkedin.feathr.common.AlienMvelContextUDFs;
import org.mvel2.MVEL;
import org.mvel2.ParserConfiguration;
import org.mvel2.ParserContext;
Expand All @@ -23,10 +24,18 @@ public class MvelUDFExpressionTests {

@BeforeClass
public void setup() {
MvelContextUDFs.registerUDFs(PARSER_CONFIG);
MvelContextUDFs.registerUDFs(MvelContextUDFs.class, PARSER_CONFIG);
MvelContextUDFs.registerUDFs(AlienMvelContextUDFs.class, PARSER_CONFIG);
parserContext = new ParserContext(PARSER_CONFIG);
}

@Test
public void testToUpperCaseExt() {
String getTopTerm = "toUpperCaseExt('hello')";
Serializable compiledExpression = MVEL.compileExpression(getTopTerm, parserContext);
String res = (String) (MVEL.executeExpression(compiledExpression, ""));
Assert.assertEquals(res, "HELLO");
}

@Test
public void testGetTopTerm() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ class AnchoredFeaturesIntegTest extends FeathrIntegTest {
| anchors: {
| anchor1: {
| source: "anchorAndDerivations/nullValueSource.avro.json"
| key: "mId"
| key: "toUpperCaseExt(mId)"
| features: {
| featureWithNull: "isPresent(value) ? toNumeric(value) : 0"
| }
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.feathr.offline

import com.linkedin.feathr.common
import com.linkedin.feathr.common.JoiningFeatureParams
import com.linkedin.feathr.common.{AlienMvelContextUDFs, JoiningFeatureParams}
import com.linkedin.feathr.offline.client.FeathrClient
import com.linkedin.feathr.offline.config.{FeathrConfig, FeathrConfigLoader}
import com.linkedin.feathr.offline.mvel.plugins.FeathrExpressionExecutionContext
Expand Down Expand Up @@ -31,7 +31,11 @@ abstract class TestFeathr {
@BeforeClass
def setup(): Unit = {
setupSpark()
mvelContext.setupExecutorMvelContext(classOf[AlienFeatureValue], new AlienFeatureValueTypeAdaptor(), ss.sparkContext)
mvelContext.setupExecutorMvelContext(classOf[AlienFeatureValue],
new AlienFeatureValueTypeAdaptor(),
ss.sparkContext,
Some(classOf[AlienMvelContextUDFs].asInstanceOf[Class[Any]])
)
}

/**
Expand Down
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
version=0.10.4-rc1
version=0.10.4-rc2
SONATYPE_AUTOMATIC_RELEASE=true
jaymo001 marked this conversation as resolved.
Show resolved Hide resolved
POM_ARTIFACT_ID=feathr_2.12