Skip to content

Commit

Permalink
Add kafka-client dependency for databricks on Azure
Browse files Browse the repository at this point in the history
  • Loading branch information
jaymo001 committed Apr 21, 2022
1 parent c77eb4a commit 72acc2b
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 19 deletions.
4 changes: 2 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ val localAndCloudDiffDependencies = Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion,
"org.apache.spark" %% "spark-hive" % sparkVersion,
"org.apache.spark" %% "spark-catalyst" % sparkVersion,
"com.google.guava" % "guava" % "17.0",
"org.apache.logging.log4j" % "log4j-core" % "2.17.1",
"com.typesafe" % "config" % "1.3.2",
"com.fasterxml.jackson.core" % "jackson-databind" % "2.6.5",
Expand All @@ -23,9 +22,10 @@ val localAndCloudDiffDependencies = Seq(

val cloudProvidedDeps = localAndCloudDiffDependencies.map(x => x % "provided")


val localAndCloudCommonDependencies = Seq(
"com.microsoft.azure" % "azure-eventhubs-spark_2.12" % "2.3.15",
"org.apache.kafka" % "kafka-clients" % "3.1.0",
"com.google.guava" % "guava" % "31.1-jre",
"org.testng" % "testng" % "6.14.3" % Test,
"org.mockito" % "mockito-core" % "3.1.0" % Test,
"nl.jqno.equalsverifier" % "equalsverifier" % "3.1.12" % Test,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ project_config:
- JDBC_TABLE
- JDBC_USER
- JDBC_PASSWORD
- KAFKA_SASL_JAAS_CONFIG

offline_store:
# paths starts with abfss:// or abfs://
Expand Down
4 changes: 3 additions & 1 deletion feathr_project/test/test_azure_kafka_e2e.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import os
from pathlib import Path
import pytest
from feathr import (BackfillTime, MaterializationSettings)
from feathr import RedisSink
from test_fixture import kafka_test_setup


@pytest.mark.skipif(os.environ.get('SPARK_CONFIG__SPARK_CLUSTER') != "azure_synapse",
reason="skip for databricks, as it cannot stop streaming job automatically for now.")
def test_feathr_kafa_streaming_features():
"""
Test FeathrClient() materialize_features can ingest streaming feature correctly
Expand Down
7 changes: 4 additions & 3 deletions feathr_project/test/test_user_workspace/feathr_config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ project_config:
- JDBC_TABLE
- JDBC_USER
- JDBC_PASSWORD
- KAFKA_SASL_JAAS_CONFIG

offline_store:
# paths starts with abfss:// or abfs://
Expand Down Expand Up @@ -66,7 +67,7 @@ offline_store:
spark_config:
# choice for spark runtime. Currently support: azure_synapse, databricks
# The `databricks` configs will be ignored if `azure_synapse` is set and vice versa.
spark_cluster: 'azure_synapse'
spark_cluster: 'databricks'
# configure number of parts for the spark output for feature generation job
spark_result_output_parts: '1'

Expand All @@ -83,11 +84,11 @@ spark_config:
feathr_runtime_location: "../../target/scala-2.12/feathr-assembly-0.1.0.jar"
databricks:
# workspace instance
workspace_instance_url: 'https://adb-2474129336842816.16.azuredatabricks.net/'
workspace_instance_url: 'https://adb-5638037984879289.9.azuredatabricks.net/'
workspace_token_value: ''
# config string including run time information, spark version, machine size, etc.
# the config follows the format in the databricks documentation: https://docs.microsoft.com/en-us/azure/databricks/dev-tools/api/2.0/jobs
config_template: {'run_name':'','new_cluster':{'spark_version':'9.1.x-scala2.12','node_type_id':'Standard_D3_v2','num_workers':2,'spark_conf':{}},'libraries':[{'jar':''}],'spark_jar_task':{'main_class_name':'','parameters':['']}}
config_template: {'run_name':'','new_cluster':{'spark_version':'9.1.x-scala2.12','node_type_id':'Standard_F4s','num_workers':2,'spark_conf':{}},'libraries':[{'jar':''}],'spark_jar_task':{'main_class_name':'','parameters':['']}}
# Feathr Job location. Support local paths, path start with http(s)://, and paths start with dbfs:/
work_dir: 'dbfs:/feathr_getting_started'
# this is the default location so end users don't have to compile the runtime again.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.linkedin.feathr.common;

import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
import com.linkedin.feathr.common.exception.FeathrException;

import java.io.Serializable;
import java.util.ArrayList;
Expand All @@ -16,7 +16,6 @@
import java.util.TreeMap;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.*;
import static java.util.stream.Collectors.*;


Expand Down Expand Up @@ -56,7 +55,6 @@ public FeatureDependencyGraph(Map<String, Set<ErasedEntityTaggedFeature>> depend
// Going with #2 right now, but in the future with stricter typing we may want to switch to #1.
dependencyFeatures.forEach((featureName, inputFeatures) -> {
Node node = nodes.computeIfAbsent(featureName, Node::new);
Preconditions.checkArgument(!inputFeatures.isEmpty());
node._dependencies = inputFeatures.stream()
.map(x -> x.getFeatureName().toString())
.distinct()
Expand Down Expand Up @@ -205,10 +203,9 @@ public List<String> getPlan(Collection<String> features) {
}

private void checkReachable(Pair<Boolean, String> reachableWithError, String feature) {
checkArgument(
reachableWithError.fst,
"Requirement failed. Feature %s can't be resolved in the dependency graph.",
feature);
if (!reachableWithError.fst) {
throw new IllegalArgumentException("Requirement failed. Feature " + feature + " can't be resolved in the dependency graph.");
}
}

/**
Expand Down
13 changes: 8 additions & 5 deletions src/main/java/com/linkedin/feathr/common/TaggedFeatureUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,16 @@
import java.util.List;
import java.util.stream.Collectors;

import static com.google.common.base.Preconditions.*;


public class TaggedFeatureUtils {
private TaggedFeatureUtils() { }


public static void checkArgument(Boolean value) {
if (!value) {
throw new IllegalArgumentException("Unexpected false");
}
}
public static ErasedEntityTaggedFeature eraseStringTags(TaggedFeatureName input, List<String> keyNames) {
/*
* This function should be run rarely (during setup etc).
Expand All @@ -21,9 +24,9 @@ public static ErasedEntityTaggedFeature eraseStringTags(TaggedFeatureName input,
checkArgument(keyNames.size() == keyNames.stream().distinct().count());

List<Integer> keyBindingIndexes = input.getKeyTag().stream().map(keyNames::indexOf).collect(Collectors.toList());
checkArgument(!keyBindingIndexes.contains(-1),
"input %s contained some key not present in %s", input, keyNames);

if(keyBindingIndexes.contains(-1)) {
throw new IllegalArgumentException("input " + input + " contained some key not present in " + keyNames);
}
return new ErasedEntityTaggedFeature(keyBindingIndexes, input.getFeatureName());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import java.util.Map;
import java.util.stream.IntStream;

import static com.google.common.base.Preconditions.*;
import static java.util.Collections.*;
import static java.util.stream.Collectors.*;
import static com.linkedin.feathr.common.FeatureValue.DEFAULT_VALUE;
Expand Down Expand Up @@ -313,6 +312,12 @@ public static float safeToFloat(Object item) {
}
}

private static <T> void checkNotNull(T item) {
if (null == item) {
throw new RuntimeException("Unexpected null");
}
}

/**
* Convert handle a Map as a term->value vector. Expect the keys to be safely coercible to string, and values to be safely
* coercible to float
Expand Down

0 comments on commit 72acc2b

Please sign in to comment.