-
Notifications
You must be signed in to change notification settings - Fork 4.3k
/
Copy pathpvalue.py
692 lines (566 loc) · 23 KB
/
pvalue.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""PValue, PCollection: one node of a dataflow graph.
A node of a dataflow processing graph is a PValue. Currently, there is only
one type: PCollection (a potentially very large set of arbitrary values).
Once created, a PValue belongs to a pipeline and has an associated
transform (of type PTransform), which describes how the value will be
produced when the pipeline gets executed.
"""
# pytype: skip-file
import collections
import itertools
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Generic
from typing import Iterator
from typing import Optional
from typing import Sequence
from typing import TypeVar
from typing import Union
from apache_beam import coders
from apache_beam import typehints
from apache_beam.internal import pickler
from apache_beam.portability import common_urns
from apache_beam.portability import python_urns
from apache_beam.portability.api import beam_runner_api_pb2
if TYPE_CHECKING:
from apache_beam.transforms import sideinputs
from apache_beam.transforms.core import ParDo
from apache_beam.transforms.core import Windowing
from apache_beam.pipeline import AppliedPTransform
from apache_beam.pipeline import Pipeline
from apache_beam.runners.pipeline_context import PipelineContext
__all__ = [
'PCollection',
'TaggedOutput',
'AsSideInput',
'AsSingleton',
'AsIter',
'AsList',
'AsDict',
'AsMultiMap',
'EmptySideInput',
'Row',
]
T = TypeVar('T')
class PValue(object):
"""Base class for PCollection.
Dataflow users should not construct PValue objects directly in their
pipelines.
A PValue has the following main characteristics:
(1) Belongs to a pipeline. Added during object initialization.
(2) Has a transform that can compute the value if executed.
(3) Has a value which is meaningful if the transform was executed.
"""
def __init__(
self,
pipeline: 'Pipeline',
tag: Optional[str] = None,
element_type: Optional[Union[type, 'typehints.TypeConstraint']] = None,
windowing: Optional['Windowing'] = None,
is_bounded=True,
):
"""Initializes a PValue with all arguments hidden behind keyword arguments.
Args:
pipeline: Pipeline object for this PValue.
tag: Tag of this PValue.
element_type: The type of this PValue.
"""
self.pipeline = pipeline
self.tag = tag
self.element_type = element_type
# The AppliedPTransform instance for the application of the PTransform
# generating this PValue. The field gets initialized when a transform
# gets applied.
self.producer: Optional[AppliedPTransform] = None
self.is_bounded = is_bounded
if windowing:
self._windowing = windowing
self.requires_deterministic_key_coder = None
def __str__(self):
return self._str_internal()
def __repr__(self):
return '<%s at %s>' % (self._str_internal(), hex(id(self)))
def _str_internal(self):
return "%s[%s.%s]" % (
self.__class__.__name__,
self.producer.full_label if self.producer else None,
self.tag)
def apply(self, *args, **kwargs):
"""Applies a transform or callable to a PValue.
Args:
*args: positional arguments.
**kwargs: keyword arguments.
The method will insert the pvalue as the next argument following an
optional first label and a transform/callable object. It will call the
pipeline.apply() method with this modified argument list.
"""
arglist = list(args)
arglist.insert(1, self)
return self.pipeline.apply(*arglist, **kwargs)
def __or__(self, ptransform):
return self.pipeline.apply(ptransform, self)
class PCollection(PValue, Generic[T]):
"""A multiple values (potentially huge) container.
Dataflow users should not construct PCollection objects directly in their
pipelines.
"""
def __eq__(self, other):
if isinstance(other, PCollection):
return self.tag == other.tag and self.producer == other.producer
def __hash__(self):
return hash((self.tag, self.producer))
@property
def windowing(self) -> 'Windowing':
if not hasattr(self, '_windowing'):
assert self.producer is not None and self.producer.transform is not None
self._windowing = self.producer.transform.get_windowing(
self.producer.inputs)
return self._windowing
def __reduce_ex__(self, unused_version):
# Pickling a PCollection is almost always the wrong thing to do, but we
# can't prohibit it as it often gets implicitly picked up (e.g. as part
# of a closure).
return _InvalidUnpickledPCollection, ()
@staticmethod
def from_(pcoll: PValue, is_bounded: Optional[bool] = None) -> 'PCollection':
"""Create a PCollection, using another PCollection as a starting point.
Transfers relevant attributes.
"""
if is_bounded is None:
is_bounded = pcoll.is_bounded
return PCollection(pcoll.pipeline, is_bounded=is_bounded)
def to_runner_api(
self, context: 'PipelineContext') -> beam_runner_api_pb2.PCollection:
return beam_runner_api_pb2.PCollection(
unique_name=self._unique_name(),
coder_id=context.coder_id_from_element_type(
self.element_type, self.requires_deterministic_key_coder),
is_bounded=beam_runner_api_pb2.IsBounded.BOUNDED
if self.is_bounded else beam_runner_api_pb2.IsBounded.UNBOUNDED,
windowing_strategy_id=context.windowing_strategies.get_id(
self.windowing))
def _unique_name(self) -> str:
if self.producer:
return '%d%s.%s' % (
len(self.producer.full_label), self.producer.full_label, self.tag)
else:
return 'PCollection%s' % id(self)
@staticmethod
def from_runner_api(
proto: beam_runner_api_pb2.PCollection,
context: 'PipelineContext') -> 'PCollection':
# Producer and tag will be filled in later, the key point is that the same
# object is returned for the same pcollection id.
# We pass None for the PCollection's Pipeline to avoid a cycle during
# deserialization. It will be populated soon after this call, in
# Pipeline.from_runner_api(). This brief period is the only time that
# PCollection.pipeline is allowed to be None.
return PCollection(
None, # type: ignore[arg-type]
element_type=context.element_type_from_coder_id(proto.coder_id),
windowing=context.windowing_strategies.get_by_id(
proto.windowing_strategy_id),
is_bounded=proto.is_bounded == beam_runner_api_pb2.IsBounded.BOUNDED)
class _InvalidUnpickledPCollection(object):
pass
class PBegin(PValue):
"""A pipeline begin marker used as input to create/read transforms.
The class is used internally to represent inputs to Create and Read
transforms. This allows us to have transforms that uniformly take PValue(s)
as inputs.
"""
pass
class PDone(PValue):
"""PDone is the output of a transform that has a trivial result such as Write.
"""
pass
class DoOutputsTuple(object):
"""An object grouping the multiple outputs of a ParDo or FlatMap transform."""
def __init__(
self,
pipeline: 'Pipeline',
transform: 'ParDo',
tags: Sequence[str],
main_tag: Optional[str],
allow_unknown_tags: Optional[bool] = None,
):
self._pipeline = pipeline
self._tags = tags
self._main_tag = main_tag
self._transform = transform
self._allow_unknown_tags = (
not tags if allow_unknown_tags is None else allow_unknown_tags)
# The ApplyPTransform instance for the application of the multi FlatMap
# generating this value. The field gets initialized when a transform
# gets applied.
self.producer: Optional[AppliedPTransform] = None
# Dictionary of PCollections already associated with tags.
self._pcolls: Dict[Optional[str], PCollection] = {}
def __str__(self):
return '<%s>' % self._str_internal()
def __repr__(self):
return '<%s at %s>' % (self._str_internal(), hex(id(self)))
def _str_internal(self):
return '%s main_tag=%s tags=%s transform=%s' % (
self.__class__.__name__, self._main_tag, self._tags, self._transform)
def __iter__(self) -> Iterator[PCollection]:
"""Iterates over tags returning for each call a (tag, pcollection) pair."""
if self._main_tag is not None:
yield self[self._main_tag]
for tag in self._tags:
yield self[tag]
def __getattr__(self, tag: str) -> PCollection:
# Special methods which may be accessed before the object is
# fully constructed (e.g. in unpickling).
if tag[:2] == tag[-2:] == '__':
return object.__getattr__(self, tag) # type: ignore
return self[tag]
def __getitem__(self, tag: Union[int, str, None]) -> PCollection:
# Accept int tags so that we can look at Partition tags with the
# same ints that we used in the partition function.
# TODO(gildea): Consider requiring string-based tags everywhere.
# This will require a partition function that does not return ints.
if isinstance(tag, int):
tag = str(tag)
if tag == self._main_tag:
tag = None
elif self._tags and tag not in self._tags and not self._allow_unknown_tags:
raise ValueError(
"Tag '%s' is neither the main tag '%s' "
"nor any of the tags %s" % (tag, self._main_tag, self._tags))
# Check if we accessed this tag before.
if tag in self._pcolls:
return self._pcolls[tag]
assert self.producer is not None
if tag is not None:
self._transform.output_tags.add(tag)
is_bounded = all(i.is_bounded for i in self.producer.main_inputs.values())
pcoll = PCollection(
self._pipeline,
tag=tag,
element_type=typehints.Any,
is_bounded=is_bounded)
# Transfer the producer from the DoOutputsTuple to the resulting
# PCollection.
pcoll.producer = self.producer.parts[0]
# Add this as an output to both the inner ParDo and the outer _MultiParDo
# PTransforms.
if tag not in self.producer.parts[0].outputs:
self.producer.parts[0].add_output(pcoll, tag)
self.producer.add_output(pcoll, tag)
else:
# Main output is output of inner ParDo.
pval = self.producer.parts[0].outputs[None]
assert isinstance(pval,
PCollection), ("DoOutputsTuple should follow a ParDo.")
pcoll = pval
self._pcolls[tag] = pcoll
return pcoll
class TaggedOutput(object):
"""An object representing a tagged value.
ParDo, Map, and FlatMap transforms can emit values on multiple outputs which
are distinguished by string tags. The DoFn will return plain values
if it wants to emit on the main output and TaggedOutput objects
if it wants to emit a value on a specific tagged output.
"""
def __init__(self, tag: str, value: Any) -> None:
if not isinstance(tag, str):
raise TypeError(
'Attempting to create a TaggedOutput with non-string tag %s' %
(tag, ))
self.tag = tag
self.value = value
class AsSideInput(object):
"""Marker specifying that a PCollection will be used as a side input.
When a PCollection is supplied as a side input to a PTransform, it is
necessary to indicate how the PCollection should be made available
as a PTransform side argument (e.g. in the form of an iterable, mapping,
or single value). This class is the superclass of all the various
options, and should not be instantiated directly. (See instead AsSingleton,
AsIter, etc.)
"""
def __init__(self, pcoll: PCollection) -> None:
from apache_beam.transforms import sideinputs
self.pvalue = pcoll
self._window_mapping_fn = sideinputs.default_window_mapping_fn(
pcoll.windowing.windowfn)
def _view_options(self):
"""Internal options corresponding to specific view.
Intended for internal use by runner implementations.
Returns:
Tuple of options for the given view.
"""
return {
'window_mapping_fn': self._window_mapping_fn,
'coder': self._windowed_coder(),
}
@property
def element_type(self):
return typehints.Any
def _windowed_coder(self):
return coders.WindowedValueCoder(
coders.registry.get_coder(
self.pvalue.element_type or self.element_type),
self.pvalue.windowing.windowfn.get_window_coder())
# TODO(robertwb): Get rid of _from_runtime_iterable and _view_options
# in favor of _side_input_data().
def _side_input_data(self) -> 'SideInputData':
view_options = self._view_options()
from_runtime_iterable = type(self)._from_runtime_iterable
return SideInputData(
common_urns.side_inputs.ITERABLE.urn,
self._window_mapping_fn,
lambda iterable: from_runtime_iterable(iterable, view_options))
def to_runner_api(
self, context: 'PipelineContext') -> beam_runner_api_pb2.SideInput:
return self._side_input_data().to_runner_api(context)
@staticmethod
def from_runner_api(
proto: beam_runner_api_pb2.SideInput,
context: 'PipelineContext') -> '_UnpickledSideInput':
return _UnpickledSideInput(SideInputData.from_runner_api(proto, context))
@staticmethod
def _from_runtime_iterable(it, options):
raise NotImplementedError
def requires_keyed_input(self):
return False
class _UnpickledSideInput(AsSideInput):
def __init__(self, side_input_data: 'SideInputData') -> None:
self._data = side_input_data
self._window_mapping_fn = side_input_data.window_mapping_fn
@staticmethod
def _from_runtime_iterable(it, options):
access_pattern = options['data'].access_pattern
if access_pattern == common_urns.side_inputs.ITERABLE.urn:
raw_view = it
elif access_pattern == common_urns.side_inputs.MULTIMAP.urn:
raw_view = collections.defaultdict(list)
for k, v in it:
raw_view[k].append(v)
else:
raise ValueError('Unknown access_pattern: %s' % access_pattern)
return options['data'].view_fn(raw_view)
def _view_options(self):
return {
'data': self._data,
# For non-fn-api runners.
'window_mapping_fn': self._data.window_mapping_fn,
'coder': self._windowed_coder(),
}
def _side_input_data(self):
return self._data
class SideInputData(object):
"""All of the data about a side input except for the bound PCollection."""
def __init__(
self,
access_pattern: str,
window_mapping_fn: 'sideinputs.WindowMappingFn',
view_fn):
self.access_pattern = access_pattern
self.window_mapping_fn = window_mapping_fn
self.view_fn = view_fn
def to_runner_api(
self, context: 'PipelineContext') -> beam_runner_api_pb2.SideInput:
return beam_runner_api_pb2.SideInput(
access_pattern=beam_runner_api_pb2.FunctionSpec(
urn=self.access_pattern),
view_fn=beam_runner_api_pb2.FunctionSpec(
urn=python_urns.PICKLED_VIEWFN,
payload=pickler.dumps(self.view_fn)),
window_mapping_fn=beam_runner_api_pb2.FunctionSpec(
urn=python_urns.PICKLED_WINDOW_MAPPING_FN,
payload=pickler.dumps(self.window_mapping_fn)))
@staticmethod
def from_runner_api(
proto: beam_runner_api_pb2.SideInput,
unused_context: 'PipelineContext') -> 'SideInputData':
assert proto.view_fn.urn == python_urns.PICKLED_VIEWFN
assert (
proto.window_mapping_fn.urn == python_urns.PICKLED_WINDOW_MAPPING_FN)
return SideInputData(
proto.access_pattern.urn,
pickler.loads(proto.window_mapping_fn.payload),
pickler.loads(proto.view_fn.payload))
class AsSingleton(AsSideInput):
"""Marker specifying that an entire PCollection is to be used as a side input.
When a PCollection is supplied as a side input to a PTransform, it is
necessary to indicate whether the entire PCollection should be made available
as a PTransform side argument (in the form of an iterable), or whether just
one value should be pulled from the PCollection and supplied as the side
argument (as an ordinary value).
Wrapping a PCollection side input argument to a PTransform in this container
(e.g., data.apply('label', MyPTransform(), AsSingleton(my_side_input) )
selects the latter behavior.
The input PCollection must contain exactly one value per window, unless a
default is given, in which case it may be empty.
"""
_NO_DEFAULT = object()
def __init__(
self, pcoll: PCollection, default_value: Any = _NO_DEFAULT) -> None:
super().__init__(pcoll)
self.default_value = default_value
def __repr__(self):
return 'AsSingleton(%s)' % self.pvalue
def _view_options(self):
base = super()._view_options()
if self.default_value != AsSingleton._NO_DEFAULT:
return dict(base, default=self.default_value)
return base
@staticmethod
def _from_runtime_iterable(it, options):
head = list(itertools.islice(it, 2))
if not head:
return options.get('default', EmptySideInput())
elif len(head) == 1:
return head[0]
raise ValueError(
'PCollection of size %d with more than one element accessed as a '
'singleton view. First two elements encountered are "%s", "%s".' %
(len(head), str(head[0]), str(head[1])))
@property
def element_type(self):
return self.pvalue.element_type
class AsIter(AsSideInput):
"""Marker specifying that an entire PCollection is to be used as a side input.
When a PCollection is supplied as a side input to a PTransform, it is
necessary to indicate whether the entire PCollection should be made available
as a PTransform side argument (in the form of an iterable), or whether just
one value should be pulled from the PCollection and supplied as the side
argument (as an ordinary value).
Wrapping a PCollection side input argument to a PTransform in this container
(e.g., data.apply('label', MyPTransform(), AsIter(my_side_input) ) selects the
former behavor.
"""
def __repr__(self):
return 'AsIter(%s)' % self.pvalue
@staticmethod
def _from_runtime_iterable(it, options):
return it
def _side_input_data(self) -> SideInputData:
return SideInputData(
common_urns.side_inputs.ITERABLE.urn,
self._window_mapping_fn,
lambda iterable: iterable)
@property
def element_type(self):
return typehints.Iterable[self.pvalue.element_type]
class AsList(AsSideInput):
"""Marker specifying that an entire PCollection is to be used as a side input.
Intended for use in side-argument specification---the same places where
AsSingleton and AsIter are used, but forces materialization of this
PCollection as a list.
Args:
pcoll: Input pcollection.
Returns:
An AsList-wrapper around a PCollection whose one element is a list
containing all elements in pcoll.
"""
@staticmethod
def _from_runtime_iterable(it, options):
return list(it)
def _side_input_data(self) -> SideInputData:
return SideInputData(
common_urns.side_inputs.ITERABLE.urn, self._window_mapping_fn, list)
class AsDict(AsSideInput):
"""Marker specifying a PCollection to be used as an indexable side input.
Intended for use in side-argument specification---the same places where
AsSingleton and AsIter are used, but returns an interface that allows
key lookup.
Args:
pcoll: Input pcollection. All elements should be key-value pairs (i.e.
2-tuples) with unique keys.
Returns:
An AsDict-wrapper around a PCollection whose one element is a dict with
entries for uniquely-keyed pairs in pcoll.
"""
@staticmethod
def _from_runtime_iterable(it, options):
return dict(it)
def _side_input_data(self) -> SideInputData:
return SideInputData(
common_urns.side_inputs.ITERABLE.urn, self._window_mapping_fn, dict)
class AsMultiMap(AsSideInput):
"""Marker specifying a PCollection to be used as an indexable side input.
Similar to AsDict, but multiple values may be associated per key, and
the keys are fetched lazily rather than all having to fit in memory.
Intended for use in side-argument specification---the same places where
AsSingleton and AsIter are used, but returns an interface that allows
key lookup.
"""
@staticmethod
def _from_runtime_iterable(it, options):
# Legacy implementation.
result = collections.defaultdict(list)
for k, v in it:
result[k].append(v)
return result
def _side_input_data(self) -> SideInputData:
return SideInputData(
common_urns.side_inputs.MULTIMAP.urn,
self._window_mapping_fn,
lambda x: x)
def requires_keyed_input(self):
return True
class EmptySideInput(object):
"""Value indicating when a singleton side input was empty.
If a PCollection was furnished as a singleton side input to a PTransform, and
that PCollection was empty, then this value is supplied to the DoFn in the
place where a value from a non-empty PCollection would have gone. This alerts
the DoFn that the side input PCollection was empty. Users may want to check
whether side input values are EmptySideInput, but they will very likely never
want to create new instances of this class themselves.
"""
pass
class Row(object):
"""A dynamic schema'd row object.
This objects attributes are initialized from the keywords passed into its
constructor, e.g. Row(x=3, y=4) will create a Row with two attributes x and y.
More importantly, when a Row object is returned from a `Map`, `FlatMap`, or
`DoFn` type inference is able to deduce the schema of the resulting
PCollection, e.g.
pc | beam.Map(lambda x: Row(x=x, y=0.5 * x))
when applied to a PCollection of ints will produce a PCollection with schema
`(x=int, y=float)`.
Note that in Beam 2.30.0 and later, Row objects are sensitive to field order.
So `Row(x=3, y=4)` is not considered equal to `Row(y=4, x=3)`.
"""
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
def as_dict(self):
return dict(self.__dict__)
# For compatibility with named tuples.
_asdict = as_dict
def __iter__(self):
for _, value in self.__dict__.items():
yield value
def __repr__(self):
return 'Row(%s)' % ', '.join('%s=%r' % kv for kv in self.__dict__.items())
def __hash__(self):
return hash(self.__dict__.items())
def __eq__(self, other):
return (
type(self) == type(other) and
len(self.__dict__) == len(other.__dict__) and all(
s == o for s,
o in zip(self.__dict__.items(), other.__dict__.items())))
def __reduce__(self):
return _make_Row, tuple(self.__dict__.items())
def _make_Row(*items):
return Row(**dict(items))