Skip to content

Commit

Permalink
Addressed the review feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
Ark-kun committed May 5, 2020
1 parent b83e5d7 commit d5d4b46
Show file tree
Hide file tree
Showing 18 changed files with 108 additions and 90 deletions.
11 changes: 6 additions & 5 deletions components/tfx/Evaluator/with_URI_IO/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ def Evaluator(
import os
import tempfile
import tensorflow
import uuid
from google.protobuf import json_format, message
from tfx.types import channel_utils, artifact_utils
from tfx.components.base import base_executor
Expand All @@ -34,20 +33,22 @@ def Evaluator(
component_class_args = {}

for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items():
argument_value_obj = argument_value = arguments.get(name, None)
argument_value = arguments.get(name, None)
if argument_value is None:
continue
parameter_type = execution_parameter.type
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message):
argument_value_obj = parameter_type()
json_format.Parse(argument_value, argument_value_obj)
else:
argument_value_obj = argument_value
component_class_args[name] = argument_value_obj

for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items():
artifact_path = arguments.get(name + '_uri', None) or arguments.get(name + '_path', None)
if artifact_path:
artifact = channel_parameter.type()
artifact.uri = artifact_path + '/' # ?
artifact.uri = artifact_path.rstrip('/') + '/' # Some TFX components require that the artifact URIs end with slash
if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES:
# Recovering splits
subdirs = tensorflow.io.gfile.listdir(artifact_path)
Expand Down Expand Up @@ -81,7 +82,7 @@ def Evaluator(
executor_context = base_executor.BaseExecutor.Context(
beam_pipeline_args=beam_pipeline_args,
tmp_dir=tempfile.gettempdir(),
unique_id=uuid.uuid4(),
unique_id='tfx_component',
)
executor = component_class_instance.executor_spec.executor_class(executor_context)
executor.Do(
Expand Down
11 changes: 6 additions & 5 deletions components/tfx/Evaluator/with_URI_IO/component.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ implementation:
import os
import tempfile
import tensorflow
import uuid
from google.protobuf import json_format, message
from tfx.types import channel_utils, artifact_utils
from tfx.components.base import base_executor
Expand All @@ -56,20 +55,22 @@ implementation:
component_class_args = {}
for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items():
argument_value_obj = argument_value = arguments.get(name, None)
argument_value = arguments.get(name, None)
if argument_value is None:
continue
parameter_type = execution_parameter.type
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message):
argument_value_obj = parameter_type()
json_format.Parse(argument_value, argument_value_obj)
else:
argument_value_obj = argument_value
component_class_args[name] = argument_value_obj
for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items():
artifact_path = arguments.get(name + '_uri', None) or arguments.get(name + '_path', None)
if artifact_path:
artifact = channel_parameter.type()
artifact.uri = artifact_path + '/' # ?
artifact.uri = artifact_path.rstrip('/') + '/' # Some TFX components require that the artifact URIs end with a slash
if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES:
# Recovering splits
subdirs = tensorflow.io.gfile.listdir(artifact_path)
Expand Down Expand Up @@ -103,7 +104,7 @@ implementation:
executor_context = base_executor.BaseExecutor.Context(
beam_pipeline_args=beam_pipeline_args,
tmp_dir=tempfile.gettempdir(),
unique_id=uuid.uuid4(),
unique_id='tfx_component',
)
executor = component_class_instance.executor_spec.executor_class(executor_context)
executor.Do(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ def BigQueryExampleGen(
import os
import tempfile
import tensorflow
import uuid
from google.protobuf import json_format, message
from tfx.types import channel_utils, artifact_utils
from tfx.components.base import base_executor
Expand All @@ -28,20 +27,22 @@ def BigQueryExampleGen(
component_class_args = {}

for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items():
argument_value_obj = argument_value = arguments.get(name, None)
argument_value = arguments.get(name, None)
if argument_value is None:
continue
parameter_type = execution_parameter.type
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message):
argument_value_obj = parameter_type()
json_format.Parse(argument_value, argument_value_obj)
else:
argument_value_obj = argument_value
component_class_args[name] = argument_value_obj

for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items():
artifact_path = arguments.get(name + '_uri', None) or arguments.get(name + '_path', None)
if artifact_path:
artifact = channel_parameter.type()
artifact.uri = artifact_path + '/' # ?
artifact.uri = artifact_path.rstrip('/') + '/' # Some TFX components require that the artifact URIs end with a slash
if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES:
# Recovering splits
subdirs = tensorflow.io.gfile.listdir(artifact_path)
Expand Down Expand Up @@ -75,7 +76,7 @@ def BigQueryExampleGen(
executor_context = base_executor.BaseExecutor.Context(
beam_pipeline_args=beam_pipeline_args,
tmp_dir=tempfile.gettempdir(),
unique_id=uuid.uuid4(),
unique_id='tfx_component',
)
executor = component_class_instance.executor_spec.executor_class(executor_context)
executor.Do(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ implementation:
import os
import tempfile
import tensorflow
import uuid
from google.protobuf import json_format, message
from tfx.types import channel_utils, artifact_utils
from tfx.components.base import base_executor
Expand All @@ -46,20 +45,22 @@ implementation:
component_class_args = {}
for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items():
argument_value_obj = argument_value = arguments.get(name, None)
argument_value = arguments.get(name, None)
if argument_value is None:
continue
parameter_type = execution_parameter.type
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message):
argument_value_obj = parameter_type()
json_format.Parse(argument_value, argument_value_obj)
else:
argument_value_obj = argument_value
component_class_args[name] = argument_value_obj
for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items():
artifact_path = arguments.get(name + '_uri', None) or arguments.get(name + '_path', None)
if artifact_path:
artifact = channel_parameter.type()
artifact.uri = artifact_path + '/' # ?
artifact.uri = artifact_path.rstrip('/') + '/' # Some TFX components require that the artifact URIs end with a slash
if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES:
# Recovering splits
subdirs = tensorflow.io.gfile.listdir(artifact_path)
Expand Down Expand Up @@ -93,7 +94,7 @@ implementation:
executor_context = base_executor.BaseExecutor.Context(
beam_pipeline_args=beam_pipeline_args,
tmp_dir=tempfile.gettempdir(),
unique_id=uuid.uuid4(),
unique_id='tfx_component',
)
executor = component_class_instance.executor_spec.executor_class(executor_context)
executor.Do(
Expand Down
11 changes: 6 additions & 5 deletions components/tfx/ExampleGen/CsvExampleGen/with_URI_IO/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def CsvExampleGen(
import os
import tempfile
import tensorflow
import uuid
from google.protobuf import json_format, message
from tfx.types import channel_utils, artifact_utils
from tfx.components.base import base_executor
Expand All @@ -29,20 +28,22 @@ def CsvExampleGen(
component_class_args = {}

for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items():
argument_value_obj = argument_value = arguments.get(name, None)
argument_value = arguments.get(name, None)
if argument_value is None:
continue
parameter_type = execution_parameter.type
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message):
argument_value_obj = parameter_type()
json_format.Parse(argument_value, argument_value_obj)
else:
argument_value_obj = argument_value
component_class_args[name] = argument_value_obj

for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items():
artifact_path = arguments.get(name + '_uri', None) or arguments.get(name + '_path', None)
if artifact_path:
artifact = channel_parameter.type()
artifact.uri = artifact_path + '/' # ?
artifact.uri = artifact_path.rstrip('/') + '/' # Some TFX components require that the artifact URIs end with a slash
if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES:
# Recovering splits
subdirs = tensorflow.io.gfile.listdir(artifact_path)
Expand Down Expand Up @@ -76,7 +77,7 @@ def CsvExampleGen(
executor_context = base_executor.BaseExecutor.Context(
beam_pipeline_args=beam_pipeline_args,
tmp_dir=tempfile.gettempdir(),
unique_id=uuid.uuid4(),
unique_id='tfx_component',
)
executor = component_class_instance.executor_spec.executor_class(executor_context)
executor.Do(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ implementation:
import os
import tempfile
import tensorflow
import uuid
from google.protobuf import json_format, message
from tfx.types import channel_utils, artifact_utils
from tfx.components.base import base_executor
Expand All @@ -48,20 +47,22 @@ implementation:
component_class_args = {}
for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items():
argument_value_obj = argument_value = arguments.get(name, None)
argument_value = arguments.get(name, None)
if argument_value is None:
continue
parameter_type = execution_parameter.type
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message):
argument_value_obj = parameter_type()
json_format.Parse(argument_value, argument_value_obj)
else:
argument_value_obj = argument_value
component_class_args[name] = argument_value_obj
for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items():
artifact_path = arguments.get(name + '_uri', None) or arguments.get(name + '_path', None)
if artifact_path:
artifact = channel_parameter.type()
artifact.uri = artifact_path + '/' # ?
artifact.uri = artifact_path.rstrip('/') + '/' # Some TFX components require that the artifact URIs end with a slash
if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES:
# Recovering splits
subdirs = tensorflow.io.gfile.listdir(artifact_path)
Expand Down Expand Up @@ -95,7 +96,7 @@ implementation:
executor_context = base_executor.BaseExecutor.Context(
beam_pipeline_args=beam_pipeline_args,
tmp_dir=tempfile.gettempdir(),
unique_id=uuid.uuid4(),
unique_id='tfx_component',
)
executor = component_class_instance.executor_spec.executor_class(executor_context)
executor.Do(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ def ImportExampleGen(
import os
import tempfile
import tensorflow
import uuid
from google.protobuf import json_format, message
from tfx.types import channel_utils, artifact_utils
from tfx.components.base import base_executor
Expand All @@ -29,20 +28,22 @@ def ImportExampleGen(
component_class_args = {}

for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items():
argument_value_obj = argument_value = arguments.get(name, None)
argument_value = arguments.get(name, None)
if argument_value is None:
continue
parameter_type = execution_parameter.type
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message):
argument_value_obj = parameter_type()
json_format.Parse(argument_value, argument_value_obj)
else:
argument_value_obj = argument_value
component_class_args[name] = argument_value_obj

for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items():
artifact_path = arguments.get(name + '_uri', None) or arguments.get(name + '_path', None)
if artifact_path:
artifact = channel_parameter.type()
artifact.uri = artifact_path + '/' # ?
artifact.uri = artifact_path.rstrip('/') + '/' # Some TFX components require that the artifact URIs end with a slash
if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES:
# Recovering splits
subdirs = tensorflow.io.gfile.listdir(artifact_path)
Expand Down Expand Up @@ -76,7 +77,7 @@ def ImportExampleGen(
executor_context = base_executor.BaseExecutor.Context(
beam_pipeline_args=beam_pipeline_args,
tmp_dir=tempfile.gettempdir(),
unique_id=uuid.uuid4(),
unique_id='tfx_component',
)
executor = component_class_instance.executor_spec.executor_class(executor_context)
executor.Do(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ implementation:
import os
import tempfile
import tensorflow
import uuid
from google.protobuf import json_format, message
from tfx.types import channel_utils, artifact_utils
from tfx.components.base import base_executor
Expand All @@ -48,20 +47,22 @@ implementation:
component_class_args = {}
for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items():
argument_value_obj = argument_value = arguments.get(name, None)
argument_value = arguments.get(name, None)
if argument_value is None:
continue
parameter_type = execution_parameter.type
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message):
argument_value_obj = parameter_type()
json_format.Parse(argument_value, argument_value_obj)
else:
argument_value_obj = argument_value
component_class_args[name] = argument_value_obj
for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items():
artifact_path = arguments.get(name + '_uri', None) or arguments.get(name + '_path', None)
if artifact_path:
artifact = channel_parameter.type()
artifact.uri = artifact_path + '/' # ?
artifact.uri = artifact_path.rstrip('/') + '/' # Some TFX components require that the artifact URIs end with a slash
if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES:
# Recovering splits
subdirs = tensorflow.io.gfile.listdir(artifact_path)
Expand Down Expand Up @@ -95,7 +96,7 @@ implementation:
executor_context = base_executor.BaseExecutor.Context(
beam_pipeline_args=beam_pipeline_args,
tmp_dir=tempfile.gettempdir(),
unique_id=uuid.uuid4(),
unique_id='tfx_component',
)
executor = component_class_instance.executor_spec.executor_class(executor_context)
executor.Do(
Expand Down
11 changes: 6 additions & 5 deletions components/tfx/ExampleValidator/with_URI_IO/component.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ def ExampleValidator(
import os
import tempfile
import tensorflow
import uuid
from google.protobuf import json_format, message
from tfx.types import channel_utils, artifact_utils
from tfx.components.base import base_executor
Expand All @@ -27,20 +26,22 @@ def ExampleValidator(
component_class_args = {}

for name, execution_parameter in component_class.SPEC_CLASS.PARAMETERS.items():
argument_value_obj = argument_value = arguments.get(name, None)
argument_value = arguments.get(name, None)
if argument_value is None:
continue
parameter_type = execution_parameter.type
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message): # Maybe FIX: execution_parameter.type can also be a tuple
if isinstance(parameter_type, type) and issubclass(parameter_type, message.Message):
argument_value_obj = parameter_type()
json_format.Parse(argument_value, argument_value_obj)
else:
argument_value_obj = argument_value
component_class_args[name] = argument_value_obj

for name, channel_parameter in component_class.SPEC_CLASS.INPUTS.items():
artifact_path = arguments.get(name + '_uri', None) or arguments.get(name + '_path', None)
if artifact_path:
artifact = channel_parameter.type()
artifact.uri = artifact_path + '/' # ?
artifact.uri = artifact_path.rstrip('/') + '/' # Some TFX components require that the artifact URIs end with a slash
if channel_parameter.type.PROPERTIES and 'split_names' in channel_parameter.type.PROPERTIES:
# Recovering splits
subdirs = tensorflow.io.gfile.listdir(artifact_path)
Expand Down Expand Up @@ -74,7 +75,7 @@ def ExampleValidator(
executor_context = base_executor.BaseExecutor.Context(
beam_pipeline_args=beam_pipeline_args,
tmp_dir=tempfile.gettempdir(),
unique_id=uuid.uuid4(),
unique_id='tfx_component',
)
executor = component_class_instance.executor_spec.executor_class(executor_context)
executor.Do(
Expand Down
Loading

0 comments on commit d5d4b46

Please sign in to comment.