Skip to content

Commit

Permalink
Rebasing changes
Browse files Browse the repository at this point in the history
  • Loading branch information
voonhous committed Dec 18, 2019
1 parent 7571ba7 commit 023f490
Show file tree
Hide file tree
Showing 9 changed files with 857 additions and 299 deletions.
22 changes: 11 additions & 11 deletions .prow/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -145,18 +145,18 @@ presubmits:
postsubmits:
gojek/feast:
- name: publish-python-sdk
decorate: true
decorate: true
spec:
containers:
- image: python:3
command:
- sh
- -c
- -c
- |
.prow/scripts/publish-python-sdk.sh \
--directory-path sdk/python --repository pypi
volumeMounts:
- name: pypirc
- name: pypirc
mountPath: /root/.pypirc
subPath: .pypirc
readOnly: true
Expand All @@ -170,7 +170,7 @@ postsubmits:
- ^v(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(-(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*)?(\+[0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*)?$

- name: publish-docker-images
decorate: true
decorate: true
spec:
containers:
- image: google/cloud-sdk:273.0.0
Expand All @@ -182,14 +182,14 @@ postsubmits:
--archive-uri gs://feast-templocation-kf-feast/.m2.2019-10-24.tar \
--output-dir $PWD/
if [ $PULL_BASE_REF == "master" ]; then
if [ $PULL_BASE_REF == "master" ]; then
.prow/scripts/publish-docker-image.sh \
--repository gcr.io/kf-feast/feast-core \
--tag dev \
--file infra/docker/core/Dockerfile \
--google-service-account-file /etc/gcloud/service-account.json
.prow/scripts/publish-docker-image.sh \
--repository gcr.io/kf-feast/feast-serving \
--tag dev \
Expand All @@ -203,13 +203,13 @@ postsubmits:
docker push gcr.io/kf-feast/feast-serving:${PULL_BASE_SHA}
else
.prow/scripts/publish-docker-image.sh \
--repository gcr.io/kf-feast/feast-core \
--tag ${PULL_BASE_REF:1} \
--file infra/docker/core/Dockerfile \
--google-service-account-file /etc/gcloud/service-account.json
.prow/scripts/publish-docker-image.sh \
--repository gcr.io/kf-feast/feast-serving \
--tag ${PULL_BASE_REF:1} \
Expand Down Expand Up @@ -244,7 +244,7 @@ postsubmits:
- ^v(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(-(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*)?(\+[0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*)?$

- name: publish-helm-chart
decorate: true
decorate: true
spec:
containers:
- image: google/cloud-sdk:273.0.0-slim
Expand All @@ -253,7 +253,7 @@ postsubmits:
- -c
- |
gcloud auth activate-service-account --key-file /etc/gcloud/service-account.json
curl -s https://get.helm.sh/helm-v2.16.1-linux-amd64.tar.gz | tar -C /tmp -xz
mv /tmp/linux-amd64/helm /usr/bin/helm
helm init --client-only
Expand Down
Empty file added sdk/__init__.py
Empty file.
196 changes: 140 additions & 56 deletions sdk/python/feast/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@

import logging
import os
import sys
import time
from collections import OrderedDict
from typing import Dict, Union
from typing import List

import grpc
import time
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
Expand All @@ -35,11 +35,12 @@
)
from feast.core.CoreService_pb2_grpc import CoreServiceStub
from feast.core.FeatureSet_pb2 import FeatureSetStatus
from feast.exceptions import format_grpc_exception
from feast.feature_set import FeatureSet, Entity
from feast.job import Job
from feast.loaders.abstract_producer import get_producer
from feast.loaders.file import export_dataframe_to_staging_location
from feast.loaders.ingest import ingest_table_to_kafka
from feast.loaders.ingest import KAFKA_CHUNK_PRODUCTION_TIMEOUT
from feast.loaders.ingest import get_feature_row_chunks
from feast.serving.ServingService_pb2 import GetFeastServingInfoResponse
from feast.serving.ServingService_pb2 import (
GetOnlineFeaturesRequest,
Expand Down Expand Up @@ -259,7 +260,7 @@ def _apply_feature_set(self, feature_set: FeatureSet):
print(f"No change detected or applied: {feature_set.name}")

# Deep copy from the returned feature set to the local feature set
feature_set.update_from_feature_set(applied_fs)
feature_set._update_from_feature_set(applied_fs)

def list_feature_sets(self) -> List[FeatureSet]:
"""
Expand Down Expand Up @@ -472,35 +473,55 @@ def get_online_features(
) # type: GetOnlineFeaturesResponse

def ingest(
self,
feature_set: Union[str, FeatureSet],
source: Union[pd.DataFrame, str],
version: int = None,
force_update: bool = False,
max_workers: int = CPU_COUNT,
disable_progress_bar: bool = False,
chunk_size: int = 5000,
timeout: int = None,
):
self,
feature_set: Union[str, FeatureSet],
source: Union[pd.DataFrame, str],
chunk_size: int = 10000,
version: int = None,
force_update: bool = False,
max_workers: int = max(CPU_COUNT - 1, 1),
disable_progress_bar: bool = False,
timeout: int = KAFKA_CHUNK_PRODUCTION_TIMEOUT
) -> None:
"""
Loads feature data into Feast for a specific feature set.
Args:
feature_set: Name of feature set or a feature set object
source: Either a file path or Pandas Dataframe to ingest into Feast
feature_set (typing.Union[str, FeatureSet]):
Feature set object or the string name of the feature set
(without a version).
source (typing.Union[pd.DataFrame, str]):
Either a file path or Pandas Dataframe to ingest into Feast
Files that are currently supported:
* parquet
* csv
* json
version: Feature set version
force_update: Automatically update feature set based on source data
prior to ingesting. This will also register changes to Feast
max_workers: Number of worker processes to use to encode values
disable_progress_bar: Disable printing of progress statistics
chunk_size: Maximum amount of rows to load into memory and ingest at
a time
timeout: Seconds to wait before ingestion times out
* parquet
* csv
* json
chunk_size (int):
Amount of rows to load and ingest at a time.
version (int):
Feature set version.
force_update (bool):
Automatically update feature set based on source data prior to
ingesting. This will also register changes to Feast.
max_workers (int):
Number of worker processes to use to encode values.
disable_progress_bar (bool):
Disable printing of progress statistics.
timeout (int):
Timeout in seconds to wait for completion.
Returns:
None:
None
"""

if isinstance(feature_set, FeatureSet):
name = feature_set.name
if version is None:
Expand All @@ -510,15 +531,21 @@ def ingest(
else:
raise Exception(f"Feature set name must be provided")

table = _read_table_from_source(source)
# Read table and get row count
tmp_table_name = _read_table_from_source(
source, chunk_size, max_workers
)

pq_file = pq.ParquetFile(tmp_table_name)

# Update the feature set based on DataFrame schema
if force_update:
# Use a small as reference DataFrame to infer fields
ref_df = table.to_batches(max_chunksize=20)[0].to_pandas()
row_count = pq_file.metadata.num_rows

feature_set.infer_fields_from_df(
ref_df, discard_unused_fields=True, replace_existing_features=True
# Update the feature set based on PyArrow table of first row group
if force_update:
feature_set.infer_fields_from_pa(
table=pq_file.read_row_group(0),
discard_unused_fields=True,
replace_existing_features=True
)
self.apply(feature_set)
current_time = time.time()
Expand All @@ -538,22 +565,49 @@ def ingest(
if timeout is not None:
timeout = timeout - int(time.time() - current_time)

if feature_set.source.source_type == "Kafka":
print("Ingesting to kafka...")
ingest_table_to_kafka(
feature_set=feature_set,
table=table,
max_workers=max_workers,
disable_pbar=disable_progress_bar,
chunk_size=chunk_size,
timeout=timeout,
)
else:
raise Exception(
f"Could not determine source type for feature set "
f'"{feature_set.name}" with source type '
f'"{feature_set.source.source_type}"'
)
try:
# Kafka configs
brokers = feature_set.get_kafka_source_brokers()
topic = feature_set.get_kafka_source_topic()
producer = get_producer(brokers, row_count, disable_progress_bar)

# Loop optimization declarations
produce = producer.produce
flush = producer.flush

# Transform and push data to Kafka
if feature_set.source.source_type == "Kafka":
for chunk in get_feature_row_chunks(
file=tmp_table_name,
row_groups=list(range(pq_file.num_row_groups)),
fs=feature_set,
max_workers=max_workers):

# Push FeatureRow one chunk at a time to kafka
for serialized_row in chunk:
produce(topic=topic, value=serialized_row)

# Force a flush after each chunk
flush(timeout=timeout)

# Remove chunk from memory
del chunk

else:
raise Exception(
f"Could not determine source type for feature set "
f'"{feature_set.name}" with source type '
f'"{feature_set.source.source_type}"'
)

# Print ingestion statistics
producer.print_results()
finally:
# Remove parquet file(s) that were created earlier
print("Removing temporary file(s)...")
os.remove(tmp_table_name)

return None


def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest]:
Expand Down Expand Up @@ -583,18 +637,38 @@ def _build_feature_set_request(feature_ids: List[str]) -> List[FeatureSetRequest
return list(feature_set_request.values())


def _read_table_from_source(source: Union[pd.DataFrame, str]) -> pa.lib.Table:
def _read_table_from_source(
source: Union[pd.DataFrame, str],
chunk_size: int,
max_workers: int
) -> str:
"""
Infers a data source type (path or Pandas Dataframe) and reads it in as
a PyArrow Table.
The PyArrow Table that is read will be written to a parquet file with row
group size determined by the minimum of:
* (table.num_rows / max_workers)
* chunk_size
The parquet file that is created will be passed as file path to the
multiprocessing pool workers.
Args:
source: Either a string path or Pandas Dataframe
source (Union[pd.DataFrame, str]):
Either a string path or Pandas DataFrame.
chunk_size (int):
Number of worker processes to use to encode values.
max_workers (int):
Amount of rows to load and ingest at a time.
Returns:
PyArrow table
str: Path to parquet file that was created.
"""
# Pandas dataframe detected

# Pandas DataFrame detected
if isinstance(source, pd.DataFrame):
table = pa.Table.from_pandas(df=source)

Expand All @@ -618,4 +692,14 @@ def _read_table_from_source(source: Union[pd.DataFrame, str]) -> pa.lib.Table:

# Ensure that PyArrow table is initialised
assert isinstance(table, pa.lib.Table)
return table

# Write table as parquet file with a specified row_group_size
tmp_table_name = f"{int(time.time())}.parquet"
row_group_size = min(int(table.num_rows/max_workers), chunk_size)
pq.write_table(table=table, where=tmp_table_name,
row_group_size=row_group_size)

# Remove table from memory
del table

return tmp_table_name
Loading

0 comments on commit 023f490

Please sign in to comment.