diff --git a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto index 777ce8636b7b..4b40c7fa4e4e 100644 --- a/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto +++ b/model/fn-execution/src/main/proto/org/apache/beam/model/fn_execution/v1/beam_fn_api.proto @@ -52,7 +52,7 @@ import "google/protobuf/duration.proto"; message FnApiTransforms { enum Runner { // DataSource is a Root Transform, and a source of data for downstream - // transforms in the same ProcessBundleDescriptor. + // transforms in the same ProcessBundleDescriptor. // It represents a stream of values coming in from an external source/over // a data channel, typically from the runner. It's not the PCollection itself // but a description of how to get the portion of the PCollection for a given @@ -82,7 +82,7 @@ message FnApiTransforms { // request will be sent with the matching instruction ID and transform ID. // Each PCollection that exits the ProcessBundleDescriptor subgraph will have // it's own DataSink, keyed by a transform ID determined by the runner. - // + // // The DataSink will take in a stream of elements for a given instruction ID // and encode them for transmission to the remote sink. The coder ID must be // for a windowed value coder. @@ -924,6 +924,35 @@ message StateKey { bytes window = 3; } + // Represents a request for the keys and values associated with a specified window in a PCollection. See + // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further + // details. + // + // This is expected to be more efficient than iterating over they keys and + // looking up the values one at a time. If a runner chooses not to implement + // this protocol, or a key has too many values to fit into a single response, + // the runner is free to fail the request and a fallback to point lookups + // will be performed by the SDK. + // + // Can only be used to perform StateGetRequests on side inputs of the URN + // beam:side_input:multimap:v1. + // + // For a PCollection>, the response data stream will be a + // concatenation of all KVs associated with the specified window, + // encoded with the the KV> coder. + // See + // https://s.apache.org/beam-fn-api-send-and-receive-data for further + // details. + message MultimapKeysValuesSideInput { + // (Required) The id of the PTransform containing a side input. + string transform_id = 1; + // (Required) The id of the side input. + string side_input_id = 2; + // (Required) The window (after mapping the currently executing elements + // window into the side input windows domain) encoded in a nested context. + bytes window = 3; + } + // Represents a request for an unordered set of values associated with a // specified user key and window for a PTransform. See // https://s.apache.org/beam-fn-state-api-and-bundle-processing for further @@ -999,6 +1028,7 @@ message StateKey { BagUserState bag_user_state = 3; IterableSideInput iterable_side_input = 4; MultimapKeysSideInput multimap_keys_side_input = 5; + MultimapKeysValuesSideInput multimap_keys_values_side_input = 8; MultimapKeysUserState multimap_keys_user_state = 6; MultimapUserState multimap_user_state = 7; } diff --git a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java index ea23e28ddb66..6a16ca18fef9 100644 --- a/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java +++ b/runners/java-fn-execution/src/test/java/org/apache/beam/runners/fnexecution/control/RemoteExecutionTest.java @@ -915,9 +915,10 @@ public Coder valueCoder() { // Expect the following requests for the first bundle: // * one to read iterable side input // * one to read keys from multimap side input + // * one to attempt multimap side input bulk read // * one to read key1 iterable from multimap side input // * one to read key2 iterable from multimap side input - assertEquals(4, stateRequestHandler.receivedRequests.size()); + assertEquals(5, stateRequestHandler.receivedRequests.size()); assertEquals( stateRequestHandler.receivedRequests.get(0).getStateKey().getIterableSideInput(), BeamFnApi.StateKey.IterableSideInput.newBuilder() @@ -931,14 +932,20 @@ public Coder valueCoder() { .setTransformId(transformId) .build()); assertEquals( - stateRequestHandler.receivedRequests.get(2).getStateKey().getMultimapSideInput(), + stateRequestHandler.receivedRequests.get(2).getStateKey().getMultimapKeysValuesSideInput(), + BeamFnApi.StateKey.MultimapKeysValuesSideInput.newBuilder() + .setSideInputId(multimapView.getTagInternal().getId()) + .setTransformId(transformId) + .build()); + assertEquals( + stateRequestHandler.receivedRequests.get(3).getStateKey().getMultimapSideInput(), BeamFnApi.StateKey.MultimapSideInput.newBuilder() .setSideInputId(multimapView.getTagInternal().getId()) .setTransformId(transformId) .setKey(encode("key1")) .build()); assertEquals( - stateRequestHandler.receivedRequests.get(3).getStateKey().getMultimapSideInput(), + stateRequestHandler.receivedRequests.get(4).getStateKey().getMultimapSideInput(), BeamFnApi.StateKey.MultimapSideInput.newBuilder() .setSideInputId(multimapView.getTagInternal().getId()) .setTransformId(transformId) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java index 4bba03875774..619eea6cc70f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/state/MultimapSideInput.java @@ -20,13 +20,21 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.function.Function; import org.apache.beam.fn.harness.Cache; import org.apache.beam.fn.harness.Caches; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateRequest; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.transforms.Materializations.MultimapView; import org.apache.beam.sdk.util.ByteStringOutputStream; +import org.apache.beam.sdk.values.KV; import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString; /** @@ -38,11 +46,14 @@ }) public class MultimapSideInput implements MultimapView { + private static final int BULK_READ_SIZE = 100; + private final Cache cache; private final BeamFnStateClient beamFnStateClient; private final StateRequest keysRequest; private final Coder keyCoder; private final Coder valueCoder; + private volatile Function> bulkReadResult; public MultimapSideInput( Cache cache, @@ -71,17 +82,66 @@ public Iterable get() { @Override public Iterable get(K k) { - ByteStringOutputStream output = new ByteStringOutputStream(); - try { - keyCoder.encode(k, output); - } catch (IOException e) { - throw new IllegalStateException( - String.format( - "Failed to encode key %s for side input id %s.", - k, keysRequest.getStateKey().getMultimapKeysSideInput().getSideInputId()), - e); + ByteString encodedKey = encodeKey(k); + + if (bulkReadResult == null) { + synchronized (this) { + if (bulkReadResult == null) { + Map> bulkRead = new HashMap<>(); + StateKey bulkReadStateKey = + StateKey.newBuilder() + .setMultimapKeysValuesSideInput( + StateKey.MultimapKeysValuesSideInput.newBuilder() + .setTransformId( + keysRequest.getStateKey().getMultimapKeysSideInput().getTransformId()) + .setSideInputId( + keysRequest.getStateKey().getMultimapKeysSideInput().getSideInputId()) + .setWindow( + keysRequest.getStateKey().getMultimapKeysSideInput().getWindow())) + .build(); + + StateRequest bulkReadRequest = + keysRequest.toBuilder().setStateKey(bulkReadStateKey).build(); + try { + Iterator>> entries = + StateFetchingIterators.readAllAndDecodeStartingFrom( + Caches.subCache(cache, "ValuesForKey", encodedKey), + beamFnStateClient, + bulkReadRequest, + KvCoder.of(keyCoder, IterableCoder.of(valueCoder))) + .iterator(); + while (bulkRead.size() < BULK_READ_SIZE && entries.hasNext()) { + KV> entry = entries.next(); + bulkRead.put(encodeKey(entry.getKey()), entry.getValue()); + } + if (entries.hasNext()) { + bulkReadResult = bulkRead::get; + } else { + bulkReadResult = + key -> { + Iterable result = bulkRead.get(key); + if (result == null) { + // As we read the entire set of values, we don't have to do a lookup to know + // this key doesn't exist. + // Missing keys are treated as empty iterables in this multimap. + return Collections.emptyList(); + } else { + return result; + } + }; + } + } catch (Exception exn) { + bulkReadResult = bulkRead::get; + } + } + } + } + + Iterable bulkReadValues = bulkReadResult.apply(encodedKey); + if (bulkReadValues != null) { + return bulkReadValues; } - ByteString encodedKey = output.toByteString(); + StateKey stateKey = StateKey.newBuilder() .setMultimapSideInput( @@ -98,4 +158,18 @@ public Iterable get(K k) { return StateFetchingIterators.readAllAndDecodeStartingFrom( Caches.subCache(cache, "ValuesForKey", encodedKey), beamFnStateClient, request, valueCoder); } + + private ByteString encodeKey(K k) { + ByteStringOutputStream output = new ByteStringOutputStream(); + try { + keyCoder.encode(k, output); + } catch (IOException e) { + throw new IllegalStateException( + String.format( + "Failed to encode key %s for side input id %s.", + k, keysRequest.getStateKey().getMultimapKeysSideInput().getSideInputId()), + e); + } + return output.toByteString(); + } } diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java index 4aaaa3d945d9..5447d6f0c6d6 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/FakeBeamFnStateClient.java @@ -136,6 +136,10 @@ public CompletableFuture handle(StateRequest.Builder requestBuild if (key.getTypeCase() == TypeCase.MULTIMAP_SIDE_INPUT || key.getTypeCase() == TypeCase.RUNNER) { assertEquals(RequestCase.GET, request.getRequestCase()); } + if (key.getTypeCase() == TypeCase.MULTIMAP_KEYS_VALUES_SIDE_INPUT && !data.containsKey(key)) { + // Allow testing this not being supported rather than blindly returning the empty list. + throw new UnsupportedOperationException("No multimap keys values states provided."); + } switch (request.getRequestCase()) { case GET: diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java index a4c20e3593ac..17ebf4234396 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/state/MultimapSideInputTest.java @@ -27,6 +27,8 @@ import org.apache.beam.model.fnexecution.v1.BeamFnApi.StateKey; import org.apache.beam.sdk.coders.ByteArrayCoder; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.IterableCoder; +import org.apache.beam.sdk.coders.KvCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.util.ByteStringOutputStream; import org.apache.beam.sdk.values.KV; @@ -50,12 +52,38 @@ public class MultimapSideInputTest { private static final byte[] B = "B".getBytes(StandardCharsets.UTF_8); private static final byte[] UNKNOWN = "UNKNOWN".getBytes(StandardCharsets.UTF_8); + @Test + public void testGetWithBulkRead() throws Exception { + FakeBeamFnStateClient fakeBeamFnStateClient = + new FakeBeamFnStateClient( + ImmutableMap.of( + keysValuesStateKey(), + KV.of( + KvCoder.of(ByteArrayCoder.of(), IterableCoder.of(StringUtf8Coder.of())), + asList(KV.of(A, asList("A1", "A2", "A3")), KV.of(B, asList("B1", "B2")))))); + + MultimapSideInput multimapSideInput = + new MultimapSideInput<>( + Caches.noop(), + fakeBeamFnStateClient, + "instructionId", + keysStateKey(), + ByteArrayCoder.of(), + StringUtf8Coder.of()); + assertArrayEquals( + new String[] {"A1", "A2", "A3"}, Iterables.toArray(multimapSideInput.get(A), String.class)); + assertArrayEquals( + new String[] {"B1", "B2"}, Iterables.toArray(multimapSideInput.get(B), String.class)); + assertArrayEquals( + new String[] {}, Iterables.toArray(multimapSideInput.get(UNKNOWN), String.class)); + } + @Test public void testGet() throws Exception { FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient( ImmutableMap.of( - stateKey(), KV.of(ByteArrayCoder.of(), asList(A, B)), + keysStateKey(), KV.of(ByteArrayCoder.of(), asList(A, B)), key(A), KV.of(StringUtf8Coder.of(), asList("A1", "A2", "A3")), key(B), KV.of(StringUtf8Coder.of(), asList("B1", "B2")))); @@ -64,7 +92,7 @@ public void testGet() throws Exception { Caches.noop(), fakeBeamFnStateClient, "instructionId", - stateKey(), + keysStateKey(), ByteArrayCoder.of(), StringUtf8Coder.of()); assertArrayEquals( @@ -82,7 +110,7 @@ public void testGetCached() throws Exception { FakeBeamFnStateClient fakeBeamFnStateClient = new FakeBeamFnStateClient( ImmutableMap.of( - stateKey(), KV.of(ByteArrayCoder.of(), asList(A, B)), + keysStateKey(), KV.of(ByteArrayCoder.of(), asList(A, B)), key(A), KV.of(StringUtf8Coder.of(), asList("A1", "A2", "A3")), key(B), KV.of(StringUtf8Coder.of(), asList("B1", "B2")))); @@ -94,7 +122,7 @@ public void testGetCached() throws Exception { cache, fakeBeamFnStateClient, "instructionId", - stateKey(), + keysStateKey(), ByteArrayCoder.of(), StringUtf8Coder.of()); assertArrayEquals( @@ -117,7 +145,7 @@ public void testGetCached() throws Exception { throw new IllegalStateException("Unexpected call for test."); }, "instructionId", - stateKey(), + keysStateKey(), ByteArrayCoder.of(), StringUtf8Coder.of()); assertArrayEquals( @@ -132,7 +160,7 @@ public void testGetCached() throws Exception { } } - private StateKey stateKey() throws IOException { + private StateKey keysStateKey() throws IOException { return StateKey.newBuilder() .setMultimapKeysSideInput( StateKey.MultimapKeysSideInput.newBuilder() @@ -142,6 +170,16 @@ private StateKey stateKey() throws IOException { .build(); } + private StateKey keysValuesStateKey() throws IOException { + return StateKey.newBuilder() + .setMultimapKeysValuesSideInput( + StateKey.MultimapKeysValuesSideInput.newBuilder() + .setTransformId("ptransformId") + .setSideInputId("sideInputId") + .setWindow(ByteString.copyFromUtf8("encodedWindow"))) + .build(); + } + private StateKey key(byte[] key) throws IOException { ByteStringOutputStream out = new ByteStringOutputStream(); ByteArrayCoder.of().encode(key, out); diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py index bc60b5dd86cd..fbcb58e16133 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/execution.py @@ -23,6 +23,7 @@ import copy import itertools import logging +import struct import typing import uuid import weakref @@ -351,7 +352,7 @@ def append(self, elements_data): self._values_by_window[key, window].append(value) def encoded_items(self): - # type: () -> Iterator[Tuple[bytes, bytes, bytes]] + # type: () -> Iterator[Tuple[bytes, bytes, bytes, int]] value_coder_impl = self._value_coder.get_impl() key_coder_impl = self._key_coder.get_impl() for (key, window), values in self._values_by_window.items(): @@ -360,7 +361,7 @@ def encoded_items(self): output_stream = create_OutputStream() for value in values: value_coder_impl.encode_to_stream(value, output_stream, True) - yield encoded_key, encoded_window, output_stream.get() + yield encoded_key, encoded_window, output_stream.get(), len(values) class GenericNonMergingWindowFn(window.NonMergingWindowFn): @@ -979,7 +980,7 @@ def commit_side_inputs_to_state( elements_by_window.append(element_data) if func_spec.urn == common_urns.side_inputs.ITERABLE.urn: - for _, window, elements_data in elements_by_window.encoded_items(): + for _, window, elements_data, _ in elements_by_window.encoded_items(): state_key = beam_fn_api_pb2.StateKey( iterable_side_input=beam_fn_api_pb2.StateKey.IterableSideInput( transform_id=consuming_transform_id, @@ -987,7 +988,8 @@ def commit_side_inputs_to_state( window=window)) self.state_servicer.append_raw(state_key, elements_data) elif func_spec.urn == common_urns.side_inputs.MULTIMAP.urn: - for key, window, elements_data in elements_by_window.encoded_items(): + for (key, window, elements_data, + elements_count) in elements_by_window.encoded_items(): state_key = beam_fn_api_pb2.StateKey( multimap_side_input=beam_fn_api_pb2.StateKey.MultimapSideInput( transform_id=consuming_transform_id, @@ -995,6 +997,17 @@ def commit_side_inputs_to_state( window=window, key=key)) self.state_servicer.append_raw(state_key, elements_data) + + kv_iter_state_key = beam_fn_api_pb2.StateKey( + multimap_keys_values_side_input=beam_fn_api_pb2.StateKey. + MultimapKeysValuesSideInput( + transform_id=consuming_transform_id, + side_input_id=tag, + window=window)) + self.state_servicer.append_raw( + kv_iter_state_key, + # KV> encoding. + key + struct.pack('>i', elements_count) + elements_data) else: raise ValueError("Unknown access pattern: '%s'" % func_spec.urn) diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py index de55235368e3..b0b4b1957dd8 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/worker_handlers.py @@ -945,6 +945,15 @@ def get_worker(self, worker_id): class StateServicer(beam_fn_api_pb2_grpc.BeamFnStateServicer, sdk_worker.StateHandler): + _SUPPORTED_STATE_TYPES = frozenset([ + 'runner', + 'multimap_side_input', + 'multimap_keys_values_side_input', + 'iterable_side_input', + 'bag_user_state', + 'multimap_user_state' + ]) + class CopyOnWriteState(object): def __init__(self, underlying): # type: (DefaultDict[bytes, Buffer]) -> None @@ -1038,6 +1047,11 @@ def get_raw(self, continuation_token=None # type: Optional[bytes] ): # type: (...) -> Tuple[bytes, Optional[bytes]] + + if state_key.WhichOneof('type') not in self._SUPPORTED_STATE_TYPES: + raise NotImplementedError( + 'Unknown state type: ' + state_key.WhichOneof('type')) + with self._lock: full_state = self._state[self._to_key(state_key)] if self._use_continuation_tokens: @@ -1104,24 +1118,27 @@ def State(self, # Note that this eagerly mutates state, assuming any failures are fatal. # Thus it is safe to ignore instruction_id. for request in request_stream: - request_type = request.WhichOneof('request') - if request_type == 'get': - data, continuation_token = self._state.get_raw( - request.state_key, request.get.continuation_token) - yield beam_fn_api_pb2.StateResponse( - id=request.id, - get=beam_fn_api_pb2.StateGetResponse( - data=data, continuation_token=continuation_token)) - elif request_type == 'append': - self._state.append_raw(request.state_key, request.append.data) - yield beam_fn_api_pb2.StateResponse( - id=request.id, append=beam_fn_api_pb2.StateAppendResponse()) - elif request_type == 'clear': - self._state.clear(request.state_key) - yield beam_fn_api_pb2.StateResponse( - id=request.id, clear=beam_fn_api_pb2.StateClearResponse()) - else: - raise NotImplementedError('Unknown state request: %s' % request_type) + try: + request_type = request.WhichOneof('request') + if request_type == 'get': + data, continuation_token = self._state.get_raw( + request.state_key, request.get.continuation_token) + yield beam_fn_api_pb2.StateResponse( + id=request.id, + get=beam_fn_api_pb2.StateGetResponse( + data=data, continuation_token=continuation_token)) + elif request_type == 'append': + self._state.append_raw(request.state_key, request.append.data) + yield beam_fn_api_pb2.StateResponse( + id=request.id, append=beam_fn_api_pb2.StateAppendResponse()) + elif request_type == 'clear': + self._state.clear(request.state_key) + yield beam_fn_api_pb2.StateResponse( + id=request.id, clear=beam_fn_api_pb2.StateClearResponse()) + else: + raise NotImplementedError('Unknown state request: %s' % request_type) + except Exception as exn: + yield beam_fn_api_pb2.StateResponse(id=request.id, error=str(exn)) class SingletonStateHandlerFactory(sdk_worker.StateHandlerFactory): diff --git a/sdks/python/apache_beam/runners/worker/bundle_processor.py b/sdks/python/apache_beam/runners/worker/bundle_processor.py index 7ff0ad258bc2..b35997c4250f 100644 --- a/sdks/python/apache_beam/runners/worker/bundle_processor.py +++ b/sdks/python/apache_beam/runners/worker/bundle_processor.py @@ -378,6 +378,11 @@ def __reduce__(self): class StateBackedSideInputMap(object): + + _BULK_READ_LIMIT = 100 + _BULK_READ_FULLY = "fully" + _BULK_READ_PARTIALLY = "partially" + def __init__(self, state_handler, # type: sdk_worker.CachingStateHandler transform_id, # type: str @@ -417,12 +422,53 @@ def __getitem__(self, window): side_input_id=self._tag, window=self._target_window_coder.encode(target_window), key=b'')) + kv_iter_state_key = beam_fn_api_pb2.StateKey( + multimap_keys_values_side_input=beam_fn_api_pb2.StateKey. + MultimapKeysValuesSideInput( + transform_id=self._transform_id, + side_input_id=self._tag, + window=self._target_window_coder.encode(target_window))) cache = {} - key_coder_impl = self._element_coder.key_coder().get_impl() + key_coder = self._element_coder.key_coder() + key_coder_impl = key_coder.get_impl() value_coder = self._element_coder.value_coder() class MultiMap(object): + _bulk_read = None + _lock = threading.Lock() + def __getitem__(self, key): + if self._bulk_read is None: + with self._lock: + if self._bulk_read is None: + try: + # Attempt to bulk read the key-values over the iterable + # protocol which, if supported, can be much more efficient + # than point lookups if it fits into memory. + for ix, (k, vs) in enumerate(_StateBackedIterable( + state_handler, + kv_iter_state_key, + coders.TupleCoder( + (key_coder, coders.IterableCoder(value_coder))))): + cache[k] = vs + if ix > StateBackedSideInputMap._BULK_READ_LIMIT: + self._bulk_read = ( + StateBackedSideInputMap._BULK_READ_PARTIALLY) + break + else: + # We reached the end of the iteration without breaking. + self._bulk_read = ( + StateBackedSideInputMap._BULK_READ_FULLY) + except Exception: + _LOGGER.error( + "Iterable access of map side inputs unsupported.", + exc_info=True) + self._bulk_read = ( + StateBackedSideInputMap._BULK_READ_PARTIALLY) + + if (self._bulk_read == StateBackedSideInputMap._BULK_READ_FULLY): + return cache.get(key, []) + if key not in cache: keyed_state_key = beam_fn_api_pb2.StateKey() keyed_state_key.CopyFrom(state_key) @@ -430,6 +476,7 @@ def __getitem__(self, key): key_coder_impl.encode_nested(key)) cache[key] = _StateBackedIterable( state_handler, keyed_state_key, value_coder) + return cache[key] def __reduce__(self):