Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

hdfs url feed to Apache.Beam should not contain the <hostname:port> part #740

Closed
alionkun opened this issue Oct 11, 2019 · 5 comments
Closed

Comments

@alionkun
Copy link

I used HDFS as the distribute file system to hold training dataset and pipeline_root.
My HDFS directory struction is something as follow:

  1. training dataset files store in hdfs://hdfs-master/my_model/dataset
  2. pipeline_root is hdfs://hdfs-master/my_model/pipelines

After i triggered my pipeline dag, i got the following error messages:

ERROR - Match operation failed with exceptions {'hdfs://hdfs-master/my_model/dataset/*': 
BeamIOError("List operation failed with exceptions {'hdfs://hdfs-master/my_model/dataset': 
HdfsError('File /hdfs-master/my_model/dataset does not exist.',)}",)}

It seems that Apache.Beam treats the host:port of the hdfs-url as path of the file system, and found nothing for sure.

After some source code reading and analysis, i found that TFX uses both TensorFlow APIs and Apache.Beam APIs to access training dataset and artifacts.

Problem is:
TensorFlow APIs aceepts hdfs-url like hdfs://host:port/path/to/file, but Apache.Beam APIs accepts hdfs-url like hdfs://path/to/file. and when we give something like hdfs://host:port/path/to/file to TFX, we will get the above error.

How to fix:
considering that TFX store artifacts' urls in MLMD and these urls should be complete, to fastest way to fix this problem is: TFX adjust all urls that pass to Apache.Beam APIs by removing the host:port part.

I made my own patch and haved it validated in my TFX cluster, everything looks good.

Expecting offical releases new version to fix it.

diff --git a/tfx/components/evaluator/executor.py b/tfx/components/evaluator/executor.py
index 7d55556..b94e5d1 100644
--- a/tfx/components/evaluator/executor.py
+++ b/tfx/components/evaluator/executor.py
@@ -105,12 +105,12 @@ class Executor(base_executor.BaseExecutor):
       # pylint: disable=expression-not-assigned
       (pipeline
        | 'ReadData' >> beam.io.ReadFromTFRecord(
-           file_pattern=io_utils.all_files_pattern(
-               artifact_utils.get_split_uri(input_dict['examples'], 'eval')))
+           file_pattern=io_utils.url_for_beam(io_utils.all_files_pattern(
+               artifact_utils.get_split_uri(input_dict['examples'], 'eval'))))
        |
        'ExtractEvaluateAndWriteResults' >> tfma.ExtractEvaluateAndWriteResults(
            eval_shared_model=eval_shared_model,
            slice_spec=slice_spec,
-           output_path=output_uri))
+           output_path=io_utils.url_for_beam(output_uri)))
     tf.logging.info(
         'Evaluation complete. Results written to {}.'.format(output_uri))
diff --git a/tfx/components/example_gen/base_example_gen_executor.py b/tfx/components/example_gen/base_example_gen_executor.py
index ae2c8d9..c301ec8 100644
--- a/tfx/components/example_gen/base_example_gen_executor.py
+++ b/tfx/components/example_gen/base_example_gen_executor.py
@@ -30,6 +30,7 @@ from tfx.components.base import base_executor
 from tfx.components.example_gen import utils
 from tfx.proto import example_gen_pb2
 from tfx.types import artifact_utils
+from tfx.utils import io_utils
 from google.protobuf import json_format
 
 # Default file name for TFRecord output file prefix.
@@ -58,7 +59,7 @@ def _WriteSplit(example_split: beam.pvalue.PCollection,
           | 'Shuffle' >> beam.transforms.Reshuffle()
           # TODO(jyzhao): multiple output format.
           | 'Write' >> beam.io.WriteToTFRecord(
-              os.path.join(output_split_path, DEFAULT_FILE_NAME),
+              io_utils.url_for_beam(os.path.join(output_split_path, DEFAULT_FILE_NAME)),
               file_name_suffix='.gz'))
 
 
diff --git a/tfx/components/example_gen/csv_example_gen/executor.py b/tfx/components/example_gen/csv_example_gen/executor.py
index 9c3e443..035eb2c 100644
--- a/tfx/components/example_gen/csv_example_gen/executor.py
+++ b/tfx/components/example_gen/csv_example_gen/executor.py
@@ -94,7 +94,7 @@ def _CsvToExample(  # pylint: disable=invalid-name
   decoder = csv_decoder.DecodeCSVToDict
   return (pipeline
           | 'ReadFromText' >> beam.io.ReadFromText(
-              file_pattern=csv_pattern, skip_header_lines=1)
+              file_pattern=io_utils.url_for_beam(csv_pattern), skip_header_lines=1)
           | 'ParseCSV' >> decoder(column_names)
           | 'ToTFExample' >> beam.Map(_dict_to_example))
 
diff --git a/tfx/components/example_gen/custom_executors/avro_executor.py b/tfx/components/example_gen/custom_executors/avro_executor.py
index 9373970..1ccfe6c 100644
--- a/tfx/components/example_gen/custom_executors/avro_executor.py
+++ b/tfx/components/example_gen/custom_executors/avro_executor.py
@@ -56,7 +56,7 @@ def _AvroToExample(  # pylint: disable=invalid-name
       'Processing input avro data {} to TFExample.'.format(avro_pattern))
 
   return (pipeline
-          | 'ReadFromAvro' >> beam.io.ReadFromAvro(avro_pattern)
+          | 'ReadFromAvro' >> beam.io.ReadFromAvro(io_utils.url_for_beam(avro_pattern))
           | 'ToTFExample' >> beam.Map(dict_to_example))
 
 
diff --git a/tfx/components/example_gen/custom_executors/parquet_executor.py b/tfx/components/example_gen/custom_executors/parquet_executor.py
index b121199..74d3fcb 100644
--- a/tfx/components/example_gen/custom_executors/parquet_executor.py
+++ b/tfx/components/example_gen/custom_executors/parquet_executor.py
@@ -57,7 +57,7 @@ def _ParquetToExample(  # pylint: disable=invalid-name
 
   return (pipeline
           # TODO(jyzhao): support per column read by input_config.
-          | 'ReadFromParquet' >> beam.io.ReadFromParquet(parquet_pattern)
+          | 'ReadFromParquet' >> beam.io.ReadFromParquet(io_utils.url_for_beam(parquet_pattern))
           | 'ToTFExample' >> beam.Map(dict_to_example))
 
 
diff --git a/tfx/components/example_gen/import_example_gen/executor.py b/tfx/components/example_gen/import_example_gen/executor.py
index 88bad2d..4e70af7 100644
--- a/tfx/components/example_gen/import_example_gen/executor.py
+++ b/tfx/components/example_gen/import_example_gen/executor.py
@@ -59,7 +59,7 @@ def _ImportExample(  # pylint: disable=invalid-name
   return (pipeline
           # TODO(jyzhao): support multiple input format.
           | 'ReadFromTFRecord' >>
-          beam.io.ReadFromTFRecord(file_pattern=input_split_pattern)
+          beam.io.ReadFromTFRecord(file_pattern=io_utils.url_for_beam(input_split_pattern))
           # TODO(jyzhao): consider move serialization out of base example gen.
           | 'ToTFExample' >> beam.Map(tf.train.Example.FromString))
 
diff --git a/tfx/components/model_validator/executor.py b/tfx/components/model_validator/executor.py
index 704c596..071ee31 100644
--- a/tfx/components/model_validator/executor.py
+++ b/tfx/components/model_validator/executor.py
@@ -90,14 +90,14 @@ class Executor(base_executor.BaseExecutor):
     with beam.Pipeline(argv=self._get_beam_pipeline_args()) as pipeline:
       eval_data = (
           pipeline | 'ReadData' >> beam.io.ReadFromTFRecord(
-              file_pattern=io_utils.all_files_pattern(eval_examples_uri)))
+              file_pattern=io_utils.url_for_beam(io_utils.all_files_pattern(eval_examples_uri))))
 
       current_model = tfma.default_eval_shared_model(
           eval_saved_model_path=path_utils.eval_model_path(current_model_dir))
       (eval_data | 'EvalCurrentModel' >> tfma.ExtractEvaluateAndWriteResults(  # pylint: disable=expression-not-assigned
           eval_shared_model=current_model,
           slice_spec=slice_spec,
-          output_path=current_model_eval_result_path))
+          output_path=io_utils.url_for_beam(current_model_eval_result_path)))
 
       if blessed_model_dir is not None:
         blessed_model = tfma.default_eval_shared_model(
@@ -105,7 +105,7 @@ class Executor(base_executor.BaseExecutor):
         (eval_data | 'EvalBlessedModel' >> tfma.ExtractEvaluateAndWriteResults(  # pylint: disable=expression-not-assigned
             eval_shared_model=blessed_model,
             slice_spec=slice_spec,
-            output_path=blessed_model_eval_result_path))
+            output_path=io_utils.url_for_beam(blessed_model_eval_result_path)))
 
     current_model_eval_result = tfma.load_eval_result(
         output_path=current_model_eval_result_path)
diff --git a/tfx/components/statistics_gen/executor.py b/tfx/components/statistics_gen/executor.py
index 4b20fa7..32d8172 100644
--- a/tfx/components/statistics_gen/executor.py
+++ b/tfx/components/statistics_gen/executor.py
@@ -76,12 +76,12 @@ class Executor(base_executor.BaseExecutor):
         _ = (
             p
             | 'ReadData.' + split >>
-            beam.io.ReadFromTFRecord(file_pattern=input_uri)
+            beam.io.ReadFromTFRecord(file_pattern=io_utils.url_for_beam(input_uri))
             | 'DecodeData.' + split >> tf_example_decoder.DecodeTFExample()
             | 'GenerateStatistics.' + split >>
             stats_api.GenerateStatistics(stats_options)
             | 'WriteStatsOutput.' + split >> beam.io.WriteToTFRecord(
-                output_path,
+                io_utils.url_for_beam(output_path),
                 shard_name_template='',
                 coder=beam.coders.ProtoCoder(
                     statistics_pb2.DatasetFeatureStatisticsList)))
diff --git a/tfx/components/transform/executor.py b/tfx/components/transform/executor.py
index b7c4c39..b7865d4 100644
--- a/tfx/components/transform/executor.py
+++ b/tfx/components/transform/executor.py
@@ -48,6 +48,7 @@ from tfx.components.util import value_utils
 from tfx.types import artifact_utils
 from tfx.utils import import_utils
 from tfx.utils import io_utils
+import tempfile
 
 
 RAW_EXAMPLE_KEY = 'raw_example'
@@ -336,7 +337,7 @@ class Executor(base_executor.BaseExecutor):
     result = (
         pipeline
         | 'Read' >> beam.io.ReadFromTFRecord(
-            dataset.file_pattern,
+            io_utils.url_for_beam(dataset.file_pattern),
             coder=beam.coders.BytesCoder(),
             # TODO(b/114938612): Eventually remove this override.
             validate=False)
@@ -369,7 +370,7 @@ class Executor(base_executor.BaseExecutor):
     return (pcollection
             | 'DropNoneKeys' >> beam.Values()
             | 'Write' >> beam.io.WriteToTFRecord(
-                transformed_example_path,
+                io_utils.url_for_beam(transformed_example_path),
                 file_name_suffix='.gz',
                 coder=beam.coders.ProtoCoder(example_pb2.Example)))
 
@@ -515,7 +516,7 @@ class Executor(base_executor.BaseExecutor):
     tf.io.gfile.makedirs(os.path.dirname(stats_output_path))
     # TODO(b/117601471): Replace with utility method to write stats.
     return (pcollection_stats | 'Write' >> beam.io.WriteToText(
-        stats_output_path,
+        io_utils.url_for_beam(stats_output_path),
         append_trailing_newlines=False,
         shard_name_template='',  # To force unsharded output.
         coder=beam.coders.ProtoCoder(
@@ -844,7 +845,7 @@ class Executor(base_executor.BaseExecutor):
 
     with self._CreatePipeline(outputs) as p:
       with tft_beam.Context(
-          temp_dir=temp_path,
+          temp_dir=tempfile.mkdtemp(),
           desired_batch_size=desired_batch_size,
           passthrough_keys={_TRANSFORM_INTERNAL_FEATURE_FOR_KEY},
           use_deep_copy_optimization=True):
diff --git a/tfx/utils/io_utils.py b/tfx/utils/io_utils.py
index 2cc89ab..8fafa58 100644
--- a/tfx/utils/io_utils.py
+++ b/tfx/utils/io_utils.py
@@ -165,3 +165,9 @@ class SchemaReader(object):
     contents = file_io.read_file_to_string(schema_path)
     text_format.Parse(contents, result)
     return result
+
+def url_for_beam(url: Text) -> Text:
+    if url.startswith('hdfs://'):
+        i = url.find('/', 7)
+        url = 'hdfs:/' + url[i:]
+    return url
@zhitaoli
Copy link
Contributor

zhitaoli commented Oct 11, 2019

Thanks @alionkun for reporting this. Amazing work for digging into the details of both Tensorflow and Beam code base to understand this.

Let me circular this back into relevant engineers maintaining Beam and Tensorflow gfile API and see whether they should unified their convention first. If that cannot be achieved, we can discuss a fix from TFX side.

My sense is that I'd rather take the HDFS path (likely from pipeline_root) using the format compatible with hadoop fs command which is more widely understood by Hadoop users. Its format aligns with Tensorflow's better and keeps host:port inside.

@zhitaoli
Copy link
Contributor

@alionkun The Beam team is kind of enough to file https://issues.apache.org/jira/browse/BEAM-8399 to track this fix. Can you voice your thoughts on this one there, especially confirm if a host:port format is guaranteed to be used (i.e, always carries :)?

Depending the progress of the other one, let's see whether TFX team should carry a patch like you suggested.

@alionkun
Copy link
Author

@zhitaoli I have added a comment on that issue.

But I wonder whether the Beam team will supports this in a short time :)
HDFS is the de facto standard of dfs outside Google, it's good for TFX to popularize if there's a solution with predictable time.

@zhitaoli
Copy link
Contributor

@zhitaoli I have added a comment on that issue.

But I wonder whether the Beam team will supports this in a short time :)
HDFS is the de facto standard of dfs outside Google, it's good for TFX to popularize if there's a solution with predictable time.

I would like to try to fix this from Beam side directly. Discussing with some relevant folks on whether we can fast track this in next version of Beam (since the fix is much more isolated than patching this from TFX side).

@zhitaoli
Copy link
Contributor

Update: this is being fixed from beam side directly: apache/beam#10223 (comment)

Once that is merged and released in beam, TFX will pick up in a future release.

For now I'll close this one from TFX side.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants