diff --git a/Python/basics/pardo_with_output.py b/Python/basics/pardo_with_output.py index 4d2b16b..f7471a7 100644 --- a/Python/basics/pardo_with_output.py +++ b/Python/basics/pardo_with_output.py @@ -23,6 +23,10 @@ class SplitFn(DoFn): def process(self, element): + # Generate 3 PCollections from the input: + # 1) Even elements, with the 'even' tag + # 2) Odd elements, with the 'odd' tag + # 3) All elements emitted as the main untagged output if element % 2 == 0: yield pvalue.TaggedOutput("even", element) else: @@ -40,6 +44,8 @@ def run(argv=None): | "Split Output" >> ParDo(SplitFn()).with_outputs("even", "odd") ) + # Log each element of both tagged PCollections + # and the main untagged PCollection odd = output.odd | "odd log" >> Map( lambda x: logging.info("odds %d" % x) ) diff --git a/Python/bigquery/read_query_bigquery.py b/Python/bigquery/read_query_bigquery.py index acc9cb0..0c7572a 100644 --- a/Python/bigquery/read_query_bigquery.py +++ b/Python/bigquery/read_query_bigquery.py @@ -25,6 +25,9 @@ class ReadQueryOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): + # Add a command line flag to be parsed along with other + # normal PipelineOptions. This flag will store the SQL query + # to be run against BigQuery. parser.add_argument( "--query", default=( @@ -37,6 +40,8 @@ def _add_argparse_args(cls, parser): def run(argv=None): options = ReadQueryOptions() + # Create a Beam pipeline with 2 steps: + # run a query against BigQuery and log the results with beam.Pipeline(options=options) as p: output = ( p diff --git a/Python/bigquery/read_table_ref_bigquery.py b/Python/bigquery/read_table_ref_bigquery.py index 6962713..63ee5f4 100644 --- a/Python/bigquery/read_table_ref_bigquery.py +++ b/Python/bigquery/read_table_ref_bigquery.py @@ -23,12 +23,14 @@ def run(argv=None): + # Configure the table we are reading from. table = bigquery.TableReference( projectId="bigquery-public-data", datasetId="samples", tableId="github_timeline", ) + # Create a Beam pipeline with 2 steps: read from BigQuery and log the data. with beam.Pipeline() as p: output = ( p diff --git a/Python/bigquery/write_bigquery.py b/Python/bigquery/write_bigquery.py index 709f807..db7255f 100644 --- a/Python/bigquery/write_bigquery.py +++ b/Python/bigquery/write_bigquery.py @@ -44,6 +44,8 @@ def run(argv=None): class WriteBigQueryOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): + # Add a command line flag to be parsed along + # with other normal PipelineOptions parser.add_argument( "--output_table", required=True, help="BQ Table to write" ) diff --git a/Python/extra_examples/file_system_dynamics.py b/Python/extra_examples/file_system_dynamics.py index a83609c..b2e3815 100644 --- a/Python/extra_examples/file_system_dynamics.py +++ b/Python/extra_examples/file_system_dynamics.py @@ -24,6 +24,10 @@ class WriteFileSystems(DoFn): def process(self, element): + # Beam's built-in FileSystems module has built in support for many + # different backing storage systems, we use this to write our element. + # Each input element is formatted as a Tuple of the form + # writer = FileSystems.create(element[0]) writer.write(bytes(element[1], encoding="utf8")) writer.close() diff --git a/Python/gcs/read_textio.py b/Python/gcs/read_textio.py index 56f7534..257f226 100644 --- a/Python/gcs/read_textio.py +++ b/Python/gcs/read_textio.py @@ -27,6 +27,8 @@ def run(argv=None): class ReadTextOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): + # Add a command line flag to be parsed along + # with other normal PipelineOptions parser.add_argument( "--path", default="gs://dataflow-samples/shakespeare/kinglear.txt", @@ -35,6 +37,11 @@ def _add_argparse_args(cls, parser): options = ReadTextOptions() + # Create a Beam pipeline with 3 steps: + # 1) Read text. This will emit one record per line + # 2) Count.Globally(). This will count the number of + # elements in the PCollection. + # 3) Log the output. with beam.Pipeline(options=options) as p: ( p diff --git a/Python/pubsub/gcloud_logs_filter_with_dlq.py b/Python/pubsub/gcloud_logs_filter_with_dlq.py index 73e91dc..c654770 100644 --- a/Python/pubsub/gcloud_logs_filter_with_dlq.py +++ b/Python/pubsub/gcloud_logs_filter_with_dlq.py @@ -12,46 +12,46 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging +# standard libraries import json +import logging +# third party libraries import apache_beam as beam -from apache_beam import DoFn -from apache_beam import Filter -from apache_beam import Map -from apache_beam import ParDo -from apache_beam.io import ReadFromPubSub -from apache_beam.io import WriteToPubSub +from apache_beam import DoFn, Filter, Map, ParDo +from apache_beam.io import ReadFromPubSub, WriteToPubSub from apache_beam.options.pipeline_options import PipelineOptions - PROCESSED_TAG = "processed" UNPROCESSED_TAG = "unprocessed" class PubSubOptions(PipelineOptions): - @classmethod def _add_argparse_args(cls, parser): parser.add_argument( "--input_topic", default="projects/your-project/topics/your-input-test", - help="Input PubSub topic") + help="Input PubSub topic", + ) parser.add_argument( "--output_topic", default="projects/your-project/topics/your-output-test", - help="Output PubSub topic") + help="Output PubSub topic", + ) parser.add_argument( "--dlq_topic", default="projects/your-project/topics/your-dlq-test", - help="Dead Letter Queue PubSub topic") + help="Dead Letter Queue PubSub topic", + ) def run(): """ - This Apache Beam pipeline processes log messages from a Google Cloud Pub/Sub topic. - The expected data format follows the standard Google Cloud log format, - which can be achieved by routing logs to a Pub/Sub topic via https://console.cloud.google.com/logs/router. + This Apache Beam pipeline processes log messages from a Google Cloud + Pub/Sub topic. The expected data format follows the standard Google Cloud + log format, which can be achieved by routing logs to a Pub/Sub topic via + https://console.cloud.google.com/logs/router. It performs the following steps: 1. Input Configuration: @@ -66,40 +66,57 @@ def run(): b. UNPROCESSED (missing one or both of these fields). 4. Severity Filtering: - - For PROCESSED messages, filters out those with severity other than "ERROR". + - For PROCESSED messages, filters out those with severity other than + "ERROR". 5. Data Transformation: - - Extracts timestamp and message content from the 'jsonPayload' field for PROCESSED messages. + - Extracts timestamp and message content from the 'jsonPayload' field + for PROCESSED messages. 6. Output Handling: - - Writes transformed PROCESSED messages to a specified output Pub/Sub topic. + - Writes transformed PROCESSED messages to a specified output Pub/Sub + topic. - Sends UNPROCESSED messages to a Dead Letter Queue (DLQ) topic. """ options = PubSubOptions(streaming=True) with beam.Pipeline(options=options) as p: - split_result = (p | "Read from PubSub" >> ReadFromPubSub(topic=options.input_topic) - | "Parse JSON" >> Map(lambda msg: json.loads(msg)) - | "Split Messages" >> ParDo(SplitMessages()).with_outputs(UNPROCESSED_TAG, PROCESSED_TAG)) + split_result = ( + p + | "Read from PubSub" >> ReadFromPubSub(topic=options.input_topic) + | "Parse JSON" >> Map(lambda msg: json.loads(msg)) + | "Split Messages" + >> ParDo(SplitMessages()).with_outputs( + UNPROCESSED_TAG, PROCESSED_TAG + ) + ) # Filter processed messages and write to output topic - (split_result[PROCESSED_TAG] - | "Filter by Severity" >> Filter(filter_by_severity) - | "Map to PubsubMessage for output" >> Map(to_pubsub_message_for_output) - | "Write to PubSub" >> WriteToPubSub(options.output_topic, with_attributes=True)) + ( + split_result[PROCESSED_TAG] + | "Filter by Severity" >> Filter(filter_by_severity) + | "Map to PubsubMessage for output" + >> Map(to_pubsub_message_for_output) + | "Write to PubSub" + >> WriteToPubSub(options.output_topic, with_attributes=True) + ) # Write unprocessed messages to DLQ - (split_result[UNPROCESSED_TAG] - | "Map to PubsubMessage for DLQ" >> Map(to_pubsub_message_for_dlq) - | "Write to DLQ" >> WriteToPubSub(options.dlq_topic, with_attributes=True)) + ( + split_result[UNPROCESSED_TAG] + | "Map to PubsubMessage for DLQ" >> Map(to_pubsub_message_for_dlq) + | "Write to DLQ" + >> WriteToPubSub(options.dlq_topic, with_attributes=True) + ) class SplitMessages(DoFn): def process(self, element): + # third party libraries from apache_beam.pvalue import TaggedOutput - if ('severity' in element) & ('jsonPayload' in element): + if ("severity" in element) & ("jsonPayload" in element): yield TaggedOutput(PROCESSED_TAG, element) else: yield TaggedOutput(UNPROCESSED_TAG, element) @@ -111,20 +128,26 @@ def filter_by_severity(log): def to_pubsub_message_for_dlq(msg): + # third party libraries from apache_beam.io import PubsubMessage return PubsubMessage(data=bytes(json.dumps(msg), "utf-8"), attributes=None) def to_pubsub_message_for_output(log): + # third party libraries from apache_beam.io import PubsubMessage # Example transformation: Extract relevant information from the log transformed_data = { "timestamp": log.get("timestamp"), - "message": log.get("jsonPayload").get("message") + "message": log.get("jsonPayload").get("message"), } - data = bytes(f"Error log message: {transformed_data['message']} [{transformed_data['timestamp']}]", "utf-8") + data = bytes( + f"Error log message: {transformed_data['message']} " + f"[{transformed_data['timestamp']}]", + "utf-8", + ) return PubsubMessage(data=data, attributes=transformed_data) diff --git a/Python/pubsub/read_pubsub_multiple.py b/Python/pubsub/read_pubsub_multiple.py index 787638a..1967106 100644 --- a/Python/pubsub/read_pubsub_multiple.py +++ b/Python/pubsub/read_pubsub_multiple.py @@ -26,14 +26,18 @@ def run(): class ReadPubSubOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): + # Add a command line flag to be parsed along + # with other normal PipelineOptions parser.add_argument( "--sources", required=True, - help="PubSub topics or subscriptions, separated by a coma," + help="PubSub topics or subscriptions, separated by a comma," "e.g.: projects/a/topics/t1,projects/a/topics/t2.", ) options = ReadPubSubOptions(streaming=True) + # Split the source argument into a list of sources that can be read by + # Beam's MultipleReadFromPubSub transform sources = [PubSubSourceDescriptor(s) for s in options.sources.split(",")] with beam.Pipeline(options=options) as p: