Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

3.7.0 rc2 #1236

Merged
merged 4 commits into from
Feb 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion java/beam/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>3.7.0-RC1</version>
<version>3.7.0-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.logicalclocks.hsfs.beam.StreamFeatureGroup;
import com.logicalclocks.hsfs.metadata.DatasetApi;
import com.logicalclocks.hsfs.engine.EngineBase;
import com.logicalclocks.hsfs.engine.FeatureGroupUtils;
import com.logicalclocks.hsfs.metadata.HopsworksInternalClient;
import org.apache.avro.Schema;

Expand All @@ -34,6 +35,7 @@

public class BeamEngine extends EngineBase {
private static BeamEngine INSTANCE = null;
private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();

public static synchronized BeamEngine getInstance() throws FeatureStoreException {
if (INSTANCE == null) {
Expand Down Expand Up @@ -71,7 +73,7 @@ public String addFile(String filePath) throws IOException, FeatureStoreException
}
String targetPath = System.getProperty("java.io.tmpdir") + filePath.substring(filePath.lastIndexOf("/"));
try (FileOutputStream outputStream = new FileOutputStream(targetPath)) {
outputStream.write(DatasetApi.readContent(filePath, "HIVEDB"));
outputStream.write(DatasetApi.readContent(filePath, featureGroupUtils.getDatasetType(filePath)));
}
return targetPath;
}
Expand Down
2 changes: 1 addition & 1 deletion java/flink/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>3.7.0-RC1</version>
<version>3.7.0-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
2 changes: 1 addition & 1 deletion java/hsfs/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>3.7.0-RC1</version>
<version>3.7.0-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,11 @@ private void checkListdiff(List<String> primaryPartitionKeyNames, List<String> f
public Subject getSubject(FeatureGroupBase featureGroup) throws FeatureStoreException, IOException {
return kafkaApi.getSubject(featureGroup.getFeatureStore(), getFgName(featureGroup));
}

public String getDatasetType(String path) {
if (Pattern.compile("^(?:hdfs://|)/apps/hive/warehouse/*").matcher(path).find()) {
return "HIVEDB";
}
return "DATASET";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Copyright (c) 2024. Hopsworks AB
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
*
* See the License for the specific language governing permissions and limitations under the License.
*
*/

package com.logicalclocks.hsfs.engine;

import org.junit.Assert;
import org.junit.jupiter.api.Test;

import com.fasterxml.jackson.core.JsonProcessingException;

public class TestFeatureGroupUtils {

private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();

@Test
void testGetDatasetTypeHIVEDB() throws JsonProcessingException {
// Arrange
String path = "/apps/hive/warehouse/temp_featurestore.db/storage_connector_resources/kafka__tstore.jks";

// Act
String databaseType = featureGroupUtils.getDatasetType(path);

// Assert
Assert.assertEquals("HIVEDB", databaseType);
}

@Test
void testGetDatasetTypeHIVEDBWithDfs() throws JsonProcessingException {
// Arrange
String path = "hdfs:///apps/hive/warehouse/temp_featurestore.db/storage_connector_resources/kafka__tstore.jks";

// Act
String databaseType = featureGroupUtils.getDatasetType(path);

// Assert
Assert.assertEquals("HIVEDB", databaseType);
}

@Test
void testGetDatasetTypeDATASET() throws JsonProcessingException {
// Arrange
String path = "/Projects/temp/Resources/kafka__tstore.jks";

// Act
String databaseType = featureGroupUtils.getDatasetType(path);

// Assert
Assert.assertEquals("DATASET", databaseType);
}

@Test
void testGetDatasetTypeDATASETWithDfs() throws JsonProcessingException {
// Arrange
String path = "hdfs:///Projects/temp/Resources/kafka__tstore.jks";

// Act
String databaseType = featureGroupUtils.getDatasetType(path);

// Assert
Assert.assertEquals("DATASET", databaseType);
}
}
2 changes: 1 addition & 1 deletion java/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
<groupId>com.logicalclocks</groupId>
<artifactId>hsfs-parent</artifactId>
<packaging>pom</packaging>
<version>3.7.0-RC1</version>
<version>3.7.0-RC2</version>
<modules>
<module>hsfs</module>
<module>spark</module>
Expand Down
2 changes: 1 addition & 1 deletion java/spark/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<parent>
<artifactId>hsfs-parent</artifactId>
<groupId>com.logicalclocks</groupId>
<version>3.7.0-RC1</version>
<version>3.7.0-RC2</version>
</parent>
<modelVersion>4.0.0</modelVersion>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,6 @@
public class SparkEngine extends EngineBase {

private final StorageConnectorUtils storageConnectorUtils = new StorageConnectorUtils();

private FeatureGroupUtils featureGroupUtils = new FeatureGroupUtils();

private static SparkEngine INSTANCE = null;
Expand All @@ -143,7 +142,6 @@ public static void setInstance(SparkEngine sparkEngine) {
@Getter
private SparkSession sparkSession;

private FeatureGroupUtils utils = new FeatureGroupUtils();
private HudiEngine hudiEngine = new HudiEngine();

private SparkEngine() {
Expand Down Expand Up @@ -657,7 +655,7 @@ private Dataset<Row> onlineFeatureGroupToAvro(FeatureGroupBase featureGroupBase,

public void writeEmptyDataframe(FeatureGroupBase featureGroup)
throws IOException, FeatureStoreException, ParseException {
String fgTableName = utils.getTableName(featureGroup);
String fgTableName = featureGroupUtils.getTableName(featureGroup);
Dataset emptyDf = sparkSession.table(fgTableName).limit(0);
writeOfflineDataframe(featureGroup, emptyDf, HudiOperationType.UPSERT, new HashMap<>(), null);
}
Expand All @@ -681,8 +679,8 @@ private void writeSparkDataset(FeatureGroupBase featureGroup, Dataset<Row> datas
.mode(SaveMode.Append)
// write options cannot be null
.options(writeOptions == null ? new HashMap<>() : writeOptions)
.partitionBy(utils.getPartitionColumns(featureGroup))
.saveAsTable(utils.getTableName(featureGroup));
.partitionBy(featureGroupUtils.getPartitionColumns(featureGroup))
.saveAsTable(featureGroupUtils.getTableName(featureGroup));
}

public String profile(Dataset<Row> df, List<String> restrictToColumns, Boolean correlation,
Expand Down Expand Up @@ -935,7 +933,7 @@ public String addFile(String filePath) throws FeatureStoreException {
java.nio.file.Path targetPath = Paths.get(SparkFiles.getRootDirectory(), fileName);

try (FileOutputStream outputStream = new FileOutputStream(targetPath.toString())) {
outputStream.write(DatasetApi.readContent(filePath, "HIVEDB"));
outputStream.write(DatasetApi.readContent(filePath, featureGroupUtils.getDatasetType(filePath)));
} catch (IOException e) {
throw new FeatureStoreException("Error setting up file: " + filePath, e);
}
Expand Down
35 changes: 31 additions & 4 deletions python/hsfs/core/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@
#

import humps
from hsfs import engine
import time

from hsfs import util
from hsfs.client.exceptions import FeatureStoreException
from hsfs.core import job_api
from hsfs.core import job_schedule as js
Expand Down Expand Up @@ -58,7 +60,7 @@ def __init__(
@classmethod
def from_response_json(cls, json_dict):
# Job config should not be decamelized when updated
config = json_dict.pop("config")
config = json_dict.pop("config", None)
json_decamelized = humps.decamelize(json_dict)
json_decamelized["config"] = config
return cls(**json_decamelized)
Expand Down Expand Up @@ -116,10 +118,10 @@ def run(self, args: str = None, await_termination: bool = True):
self._job_api.launch(self.name, args=args)
print(
"Job started successfully, you can follow the progress at \n{}".format(
engine.get_instance().get_job_url(self.href)
util.get_job_url(self.href)
)
)
engine.get_instance().wait_for_job(self, await_termination=await_termination)
self._wait_for_job(await_termination=await_termination)

def get_state(self):
"""Get the state of the job.
Expand Down Expand Up @@ -223,3 +225,28 @@ def _update_schedule(self, job_schedule):
self._name, job_schedule.to_dict()
)
return self._job_schedule

def _wait_for_job(self, await_termination=True):
# If the user passed the wait_for_job option consider it,
# otherwise use the default True
while await_termination:
executions = self._job_api.last_execution(self)
if len(executions) > 0:
execution = executions[0]
else:
return

if execution.final_status.lower() == "succeeded":
return
elif execution.final_status.lower() == "failed":
raise FeatureStoreException(
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
)
elif execution.final_status.lower() == "killed":
raise FeatureStoreException("The Hopsworks Job was stopped")
elif execution.state.lower() == "framework_failure":
raise FeatureStoreException(
"The Hopsworks Job monitoring failed, could not determine the final status"
)

time.sleep(3)
57 changes: 9 additions & 48 deletions python/hsfs/engine/python.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import pandas as pd
import numpy as np
import boto3
import time
import re
import ast
import warnings
Expand All @@ -39,7 +38,6 @@

from io import BytesIO
from pyhive import hive
from urllib.parse import urlparse
from typing import TypeVar, Optional, Dict, Any
from confluent_kafka import Consumer, Producer, TopicPartition, KafkaError
from tqdm.auto import tqdm
Expand All @@ -65,7 +63,7 @@
)
from hsfs.constructor import query
from hsfs.training_dataset_split import TrainingDatasetSplit
from hsfs.client import exceptions, hopsworks
from hsfs.client import hopsworks
from hsfs.feature_group import FeatureGroup
from thrift.transport.TTransport import TTransportException
from pyhive.exc import OperationalError
Expand Down Expand Up @@ -384,11 +382,11 @@ def profile_by_spark(self, metadata_instance):
job = stat_api.compute(metadata_instance)
print(
"Statistics Job started successfully, you can follow the progress at \n{}".format(
self.get_job_url(job.href)
util.get_job_url(job.href)
)
)

self.wait_for_job(job)
job._wait_for_job()
return job

def profile(
Expand Down Expand Up @@ -807,15 +805,13 @@ def write_training_dataset(
td_job = td_api.compute(training_dataset, td_app_conf)
print(
"Training dataset job started successfully, you can follow the progress at \n{}".format(
self.get_job_url(td_job.href)
util.get_job_url(td_job.href)
)
)

self.wait_for_job(
td_job,
await_termination=user_write_options.get("wait_for_job", True),
td_job._wait_for_job(
await_termination=user_write_options.get("wait_for_job", True)
)

return td_job

def _create_hive_connection(self, feature_store, hive_config=None):
Expand Down Expand Up @@ -882,22 +878,6 @@ def save_empty_dataframe(self, feature_group):
"""Wrapper around save_dataframe in order to provide no-op."""
pass

def get_job_url(self, href: str):
"""Use the endpoint returned by the API to construct the UI url for jobs

Args:
href (str): the endpoint returned by the API
"""
url = urlparse(href)
url_splits = url.path.split("/")
project_id = url_splits[4]
job_name = url_splits[6]
ui_url = url._replace(
path="p/{}/jobs/named/{}/executions".format(project_id, job_name)
)
ui_url = client.get_instance().replace_public_host(ui_url)
return ui_url.geturl()

def _get_app_options(self, user_write_options={}):
"""
Generate the options that should be passed to the application doing the ingestion.
Expand All @@ -916,27 +896,6 @@ def _get_app_options(self, user_write_options={}):
spark_job_configuration=spark_job_configuration,
)

def wait_for_job(self, job, await_termination=True):
# If the user passed the wait_for_job option consider it,
# otherwise use the default True
while await_termination:
executions = self._job_api.last_execution(job)
if len(executions) > 0:
execution = executions[0]
else:
return

if execution.final_status.lower() == "succeeded":
return
elif execution.final_status.lower() == "failed":
raise exceptions.FeatureStoreException(
"The Hopsworks Job failed, use the Hopsworks UI to access the job logs"
)
elif execution.final_status.lower() == "killed":
raise exceptions.FeatureStoreException("The Hopsworks Job was stopped")

time.sleep(3)

def add_file(self, file):
if not file:
return file
Expand All @@ -947,7 +906,9 @@ def add_file(self, file):

local_file = os.path.join("/tmp", os.path.basename(file))
if not os.path.exists(local_file):
content_stream = self._dataset_api.read_content(file, "HIVEDB")
content_stream = self._dataset_api.read_content(
file, util.get_dataset_type(file)
)
bytesio_object = BytesIO(content_stream.content)
# Write the stuff
with open(local_file, "wb") as f:
Expand Down
2 changes: 1 addition & 1 deletion python/hsfs/engine/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -792,7 +792,7 @@ def add_file(self, file):
if isinstance(client.get_instance(), client.external.Client):
tmp_file = os.path.join(SparkFiles.getRootDirectory(), file_name)
print("Reading key file from storage connector.")
response = self._dataset_api.read_content(tmp_file, "HIVEDB")
response = self._dataset_api.read_content(file, util.get_dataset_type(file))

with open(tmp_file, "wb") as f:
f.write(response.content)
Expand Down
Loading
Loading