From b51109ecf3d3e2a7619a3e4f145c75d9ddd3d176 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 6 Dec 2025 16:54:18 +0530 Subject: [PATCH 01/14] add topological sort --- .../harness/control/ProcessBundleHandler.java | 216 +++++++++++++----- .../control/ProcessBundleHandlerTest.java | 151 ++++++++++++ 2 files changed, 307 insertions(+), 60 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index fe422939e535..b8ca28118565 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -86,6 +86,7 @@ import org.apache.beam.sdk.util.construction.BeamUrns; import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.Timer; +import org.apache.beam.sdk.util.construction.graph.QueryablePipeline; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.TextFormat; @@ -161,6 +162,7 @@ public class ProcessBundleHandler { @VisibleForTesting final BundleProcessorCache bundleProcessorCache; private final Set runnerCapabilities; private final @Nullable DataSampler dataSampler; + private final LoadingCache> topologicalOrderCache; public ProcessBundleHandler( PipelineOptions options, @@ -220,6 +222,42 @@ public ProcessBundleHandler( this.processWideCache = processWideCache; this.bundleProcessorCache = bundleProcessorCache; this.dataSampler = dataSampler; + + // Initialize topological-order cache. Use same timeout idiom as BundleProcessorCache. + CacheBuilder topoBuilder = CacheBuilder.newBuilder(); + Duration topoTimeout = options.as(SdkHarnessOptions.class).getBundleProcessorCacheTimeout(); + if (topoTimeout.compareTo(Duration.ZERO) > 0) { + topoBuilder = topoBuilder.expireAfterAccess(topoTimeout); + } + this.topologicalOrderCache = + topoBuilder.build( + new CacheLoader>() { + @Override + public ImmutableList load(String descriptorId) throws Exception { + ProcessBundleDescriptor desc = fnApiRegistry.apply(descriptorId); + RunnerApi.Components comps = + RunnerApi.Components.newBuilder() + .putAllCoders(desc.getCodersMap()) + .putAllPcollections(desc.getPcollectionsMap()) + .putAllWindowingStrategies(desc.getWindowingStrategiesMap()) + .build(); + QueryablePipeline qp = + QueryablePipeline.forTransforms(desc.getRootTransformIdsList(), comps); + ImmutableList.Builder ids = ImmutableList.builder(); + for (PipelineNode.PTransformNode node : qp.getTopologicallyOrderedTransforms()) { + ids.add(node.getTransform().getId()); + } + ImmutableList topo = ids.build(); + // Treat incomplete topo as a cycle/error so loader fails and caller falls back. + if (topo.size() != desc.getTransformsMap().size()) { + throw new IllegalStateException( + String.format( + "Topological ordering incomplete for descriptor %s: %d of %d", + descriptorId, topo.size(), desc.getTransformsMap().size())); + } + return topo; + } + }); } private void addRunnerAndConsumersForPTransformRecursively( @@ -843,68 +881,126 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { bundleFinalizationCallbackRegistrations, runnerCapabilities); - // Create a BeamFnStateClient - for (Map.Entry entry : bundleDescriptor.getTransformsMap().entrySet()) { + // Build components once for this descriptor. + final RunnerApi.Components components = + RunnerApi.Components.newBuilder() + .putAllCoders(bundleDescriptor.getCodersMap()) + .putAllPcollections(bundleDescriptor.getPcollectionsMap()) + .putAllWindowingStrategies(bundleDescriptor.getWindowingStrategiesMap()) + .build(); - // Skip anything which isn't a root. - // Also force data output transforms to be unconditionally instantiated (see BEAM-10450). - // TODO: Remove source as a root and have it be triggered by the Runner. - if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn()) - && !DATA_OUTPUT_URN.equals(entry.getValue().getSpec().getUrn()) - && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn()) - && !PTransformTranslation.READ_TRANSFORM_URN.equals( - entry.getValue().getSpec().getUrn())) { - continue; + // Use cached topological order when available. Fall back to descriptor order on error. + try { + ImmutableList topo = topologicalOrderCache.get(bundleId); + for (String transformId : topo) { + PTransform pTransform = bundleDescriptor.getTransformsMap().get(transformId); + if (pTransform == null) { + continue; // defensive + } + if (!DATA_INPUT_URN.equals(pTransform.getSpec().getUrn()) + && !DATA_OUTPUT_URN.equals(pTransform.getSpec().getUrn()) + && !JAVA_SOURCE_URN.equals(pTransform.getSpec().getUrn()) + && !PTransformTranslation.READ_TRANSFORM_URN.equals(pTransform.getSpec().getUrn())) { + continue; + } + addRunnerAndConsumersForPTransformRecursively( + beamFnStateClient, + beamFnDataClient, + transformId, + pTransform, + bundleProcessor::getInstructionId, + bundleProcessor::getCacheTokens, + bundleProcessor::getBundleCache, + bundleDescriptor, + components, + pCollectionIdsToConsumingPTransforms, + pCollectionConsumerRegistry, + processedPTransformIds, + startFunctionRegistry, + finishFunctionRegistry, + resetFunctions::add, + tearDownFunctions::add, + (apiServiceDescriptor, dataEndpoint) -> { + if (!bundleProcessor + .getInboundEndpointApiServiceDescriptors() + .contains(apiServiceDescriptor)) { + bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor); + } + bundleProcessor.getInboundDataEndpoints().add(dataEndpoint); + }, + (timerEndpoint) -> { + if (!bundleDescriptor.hasTimerApiServiceDescriptor()) { + throw new IllegalStateException( + String.format( + "Timers are unsupported because the " + + "ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", + bundleId)); + } + bundleProcessor.getTimerEndpoints().add(timerEndpoint); + }, + bundleProgressReporterAndRegistrar::register, + splitListener, + bundleFinalizer, + bundleProcessor.getChannelRoots(), + bundleProcessor.getOutboundAggregators(), + bundleProcessor.getRunnerCapabilities()); + } + } catch (Exception e) { + LOG.warn( + "Topological ordering failed for descriptor {}. Falling back to descriptor order. Cause: {}", + bundleId, + e.toString()); + // Fallback: previous descriptor-order iteration. + for (Map.Entry entry : bundleDescriptor.getTransformsMap().entrySet()) { + if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn()) + && !DATA_OUTPUT_URN.equals(entry.getValue().getSpec().getUrn()) + && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn()) + && !PTransformTranslation.READ_TRANSFORM_URN.equals( + entry.getValue().getSpec().getUrn())) { + continue; + } + addRunnerAndConsumersForPTransformRecursively( + beamFnStateClient, + beamFnDataClient, + entry.getKey(), + entry.getValue(), + bundleProcessor::getInstructionId, + bundleProcessor::getCacheTokens, + bundleProcessor::getBundleCache, + bundleDescriptor, + components, + pCollectionIdsToConsumingPTransforms, + pCollectionConsumerRegistry, + processedPTransformIds, + startFunctionRegistry, + finishFunctionRegistry, + resetFunctions::add, + tearDownFunctions::add, + (apiServiceDescriptor, dataEndpoint) -> { + if (!bundleProcessor + .getInboundEndpointApiServiceDescriptors() + .contains(apiServiceDescriptor)) { + bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor); + } + bundleProcessor.getInboundDataEndpoints().add(dataEndpoint); + }, + (timerEndpoint) -> { + if (!bundleDescriptor.hasTimerApiServiceDescriptor()) { + throw new IllegalStateException( + String.format( + "Timers are unsupported because the " + + "ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", + bundleId)); + } + bundleProcessor.getTimerEndpoints().add(timerEndpoint); + }, + bundleProgressReporterAndRegistrar::register, + splitListener, + bundleFinalizer, + bundleProcessor.getChannelRoots(), + bundleProcessor.getOutboundAggregators(), + bundleProcessor.getRunnerCapabilities()); } - - RunnerApi.Components components = - RunnerApi.Components.newBuilder() - .putAllCoders(bundleDescriptor.getCodersMap()) - .putAllPcollections(bundleDescriptor.getPcollectionsMap()) - .putAllWindowingStrategies(bundleDescriptor.getWindowingStrategiesMap()) - .build(); - - addRunnerAndConsumersForPTransformRecursively( - beamFnStateClient, - beamFnDataClient, - entry.getKey(), - entry.getValue(), - bundleProcessor::getInstructionId, - bundleProcessor::getCacheTokens, - bundleProcessor::getBundleCache, - bundleDescriptor, - components, - pCollectionIdsToConsumingPTransforms, - pCollectionConsumerRegistry, - processedPTransformIds, - startFunctionRegistry, - finishFunctionRegistry, - resetFunctions::add, - tearDownFunctions::add, - (apiServiceDescriptor, dataEndpoint) -> { - if (!bundleProcessor - .getInboundEndpointApiServiceDescriptors() - .contains(apiServiceDescriptor)) { - bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor); - } - bundleProcessor.getInboundDataEndpoints().add(dataEndpoint); - }, - (timerEndpoint) -> { - if (!bundleDescriptor.hasTimerApiServiceDescriptor()) { - throw new IllegalStateException( - String.format( - "Timers are unsupported because the " - + "ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", - bundleId)); - } - bundleProcessor.getTimerEndpoints().add(timerEndpoint); - }, - bundleProgressReporterAndRegistrar::register, - splitListener, - bundleFinalizer, - bundleProcessor.getChannelRoots(), - bundleProcessor.getOutboundAggregators(), - bundleProcessor.getRunnerCapabilities()); } bundleProcessor.finish(); diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index a7a62571e38e..b97a8512da27 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -66,6 +66,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Function; import java.util.function.Supplier; import org.apache.beam.fn.harness.BeamFnDataReadRunner; import org.apache.beam.fn.harness.Cache; @@ -1929,4 +1930,154 @@ public void testTimerRegistrationsFailIfNoTimerApiServiceDescriptorSpecified() t private static void throwException() { throw new IllegalStateException("TestException"); } + + + @Test + public void testTopologicalCacheProducesAndCachesOrder() throws Exception { + // Build a tiny descriptor with two transforms connected by a PCollection. + ProcessBundleDescriptor processBundleDescriptor = + ProcessBundleDescriptor.newBuilder() + .putTransforms( + "2L", + PTransform.newBuilder() + .setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build()) + .putOutputs("2L-output", "2L-output-pc") + .build()) + .putTransforms( + "3L", + PTransform.newBuilder() + .setSpec(FunctionSpec.newBuilder().setUrn(DATA_OUTPUT_URN).build()) + .putInputs("3L-input", "2L-output-pc") + .build()) + .putPcollections("2L-output-pc", PCollection.getDefaultInstance()) + .build(); + + Map registry = ImmutableMap.of("1L", processBundleDescriptor); + final AtomicInteger calls = new AtomicInteger(0); + Function fnApiRegistry = id -> { + calls.incrementAndGet(); + return registry.get(id); + }; + + ProcessBundleHandler handler = + new ProcessBundleHandler( + PipelineOptionsFactory.create(), + Collections.emptySet(), + fnApiRegistry::apply, + beamFnDataClient, + null, + null, + new ShortIdMap(), + executionStateSampler, + ImmutableMap.of(DATA_INPUT_URN, (context) -> {}, DATA_OUTPUT_URN, (context) -> {}), + Caches.noop(), + new BundleProcessorCache(Duration.ZERO), + null); + + java.lang.reflect.Field f = + ProcessBundleHandler.class.getDeclaredField("topologicalOrderCache"); + f.setAccessible(true); + @SuppressWarnings("unchecked") + com.google.common.cache.LoadingCache> + cache = + (com.google.common.cache.LoadingCache>) f.get(handler); + + com.google.common.collect.ImmutableList topo1 = cache.get("1L"); + // topo should cover all transforms in the descriptor + assertEquals(processBundleDescriptor.getTransformsMap().size(), topo1.size()); + + // Second get should hit the cache (loader not invoked again) + com.google.common.collect.ImmutableList topo2 = cache.get("1L"); + assertSame(topo1, topo2); + assertEquals(1, calls.get()); + } + + @Test + public void testTopologicalCacheThrowsOnCycleAndProcessBundleFallsBackToDescriptorOrder() + throws Exception { + // Create a cyclic descriptor: A depends on B and B depends on A. + ProcessBundleDescriptor cyclicDescriptor = + ProcessBundleDescriptor.newBuilder() + .putTransforms( + "A", + PTransform.newBuilder() + .setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build()) + .putInputs("A-in", "B-out-pc") + .putOutputs("A-out", "A-out-pc") + .build()) + .putTransforms( + "B", + PTransform.newBuilder() + .setSpec(FunctionSpec.newBuilder().setUrn(DATA_OUTPUT_URN).build()) + .putInputs("B-in", "A-out-pc") + .putOutputs("B-out", "B-out-pc") + .build()) + .putPcollections("A-out-pc", PCollection.getDefaultInstance()) + .putPcollections("B-out-pc", PCollection.getDefaultInstance()) + .build(); + + Map registry = ImmutableMap.of("cyclic", cyclicDescriptor); + Function fnApiRegistry = registry::get; + + // Use a PTransformRunnerFactory that records invocation to confirm fallback path executed. + final AtomicBoolean fallbackRunnerInvoked = new AtomicBoolean(false); + Map urnToFactory = + ImmutableMap.of( + DATA_INPUT_URN, + (context) -> { + fallbackRunnerInvoked.set(true); + }, + DATA_OUTPUT_URN, + (context) -> { + fallbackRunnerInvoked.set(true); + }); + + ProcessBundleHandler handler = + new ProcessBundleHandler( + PipelineOptionsFactory.create(), + Collections.emptySet(), + fnApiRegistry::apply, + beamFnDataClient, + null, + null, + new ShortIdMap(), + executionStateSampler, + urnToFactory, + Caches.noop(), + new BundleProcessorCache(Duration.ZERO), + null); + + java.lang.reflect.Field f = + ProcessBundleHandler.class.getDeclaredField("topologicalOrderCache"); + f.setAccessible(true); + @SuppressWarnings("unchecked") + com.google.common.cache.LoadingCache> + cache = + (com.google.common.cache.LoadingCache>) f.get(handler); + + // Loader should fail because topo size != transforms size -> ExecutionException wrapping IllegalStateException + assertThrows( + java.util.concurrent.ExecutionException.class, + () -> { + try { + cache.get("cyclic"); + } catch (java.util.concurrent.ExecutionException ee) { + // assert cause is the IllegalStateException we throw for incomplete topo + assertTrue(ee.getCause() instanceof IllegalStateException); + throw ee; + } + }); + + // Now exercise processBundle which should catch the loader error and fall back to descriptor-order path. + handler.processBundle( + InstructionRequest.newBuilder() + .setInstructionId("instr-cyclic") + .setProcessBundle( + ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("cyclic")) + .build()); + + assertTrue("Fallback descriptor-order runner should have been invoked", fallbackRunnerInvoked.get()); + } } From 9eaac1025d0db74a7ea7b05cbe9a613711b6def1 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 6 Dec 2025 17:36:25 +0530 Subject: [PATCH 02/14] changes --- .../harness/control/ProcessBundleHandler.java | 9 +- .../control/ProcessBundleHandlerTest.java | 140 +++++++++--------- 2 files changed, 75 insertions(+), 74 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index b8ca28118565..ccc28a6cbf6e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -227,10 +227,10 @@ public ProcessBundleHandler( CacheBuilder topoBuilder = CacheBuilder.newBuilder(); Duration topoTimeout = options.as(SdkHarnessOptions.class).getBundleProcessorCacheTimeout(); if (topoTimeout.compareTo(Duration.ZERO) > 0) { - topoBuilder = topoBuilder.expireAfterAccess(topoTimeout); + topoBuilder = topoBuilder.expireAfterAccess(topoTimeout); } this.topologicalOrderCache = - topoBuilder.build( + topoBuilder.build( new CacheLoader>() { @Override public ImmutableList load(String descriptorId) throws Exception { @@ -239,6 +239,7 @@ public ImmutableList load(String descriptorId) throws Exception { RunnerApi.Components.newBuilder() .putAllCoders(desc.getCodersMap()) .putAllPcollections(desc.getPcollectionsMap()) + .putAllTransforms(desc.getTransformsMap()) .putAllWindowingStrategies(desc.getWindowingStrategiesMap()) .build(); QueryablePipeline qp = @@ -904,7 +905,7 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { continue; } addRunnerAndConsumersForPTransformRecursively( - beamFnStateClient, + beamFnStateClient, beamFnDataClient, transformId, pTransform, @@ -918,7 +919,7 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { processedPTransformIds, startFunctionRegistry, finishFunctionRegistry, - resetFunctions::add, + resetFunctions::add, tearDownFunctions::add, (apiServiceDescriptor, dataEndpoint) -> { if (!bundleProcessor diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index b97a8512da27..246a9db6cbbf 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -1931,33 +1931,41 @@ private static void throwException() { throw new IllegalStateException("TestException"); } - - @Test - public void testTopologicalCacheProducesAndCachesOrder() throws Exception { - // Build a tiny descriptor with two transforms connected by a PCollection. + public void testTopologicalOrderRespectsDependency() throws Exception { + // Build a descriptor A -> B -> C ProcessBundleDescriptor processBundleDescriptor = ProcessBundleDescriptor.newBuilder() .putTransforms( - "2L", + "A", PTransform.newBuilder() .setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build()) - .putOutputs("2L-output", "2L-output-pc") + .putOutputs("A-out", "A-out-pc") .build()) .putTransforms( - "3L", + "B", PTransform.newBuilder() .setSpec(FunctionSpec.newBuilder().setUrn(DATA_OUTPUT_URN).build()) - .putInputs("3L-input", "2L-output-pc") + .putInputs("B-in", "A-out-pc") + .putOutputs("B-out", "B-out-pc") + .build()) + .putTransforms( + "C", + PTransform.newBuilder() + .setSpec(FunctionSpec.newBuilder().setUrn(DATA_OUTPUT_URN).build()) + .putInputs("C-in", "B-out-pc") .build()) - .putPcollections("2L-output-pc", PCollection.getDefaultInstance()) + .putPcollections("A-out-pc", PCollection.getDefaultInstance()) + .putPcollections("B-out-pc", PCollection.getDefaultInstance()) .build(); - Map registry = ImmutableMap.of("1L", processBundleDescriptor); + Map registry = + ImmutableMap.of("chain", processBundleDescriptor); final AtomicInteger calls = new AtomicInteger(0); - Function fnApiRegistry = id -> { - calls.incrementAndGet(); - return registry.get(id); - }; + Function fnApiRegistry = + id -> { + calls.incrementAndGet(); + return registry.get(id); + }; ProcessBundleHandler handler = new ProcessBundleHandler( @@ -1974,36 +1982,40 @@ public void testTopologicalCacheProducesAndCachesOrder() throws Exception { new BundleProcessorCache(Duration.ZERO), null); + // Access the private topologicalOrderCache and verify ordering java.lang.reflect.Field f = ProcessBundleHandler.class.getDeclaredField("topologicalOrderCache"); f.setAccessible(true); @SuppressWarnings("unchecked") com.google.common.cache.LoadingCache> cache = - (com.google.common.cache.LoadingCache>) f.get(handler); - - com.google.common.collect.ImmutableList topo1 = cache.get("1L"); - // topo should cover all transforms in the descriptor - assertEquals(processBundleDescriptor.getTransformsMap().size(), topo1.size()); - - // Second get should hit the cache (loader not invoked again) - com.google.common.collect.ImmutableList topo2 = cache.get("1L"); - assertSame(topo1, topo2); + (com.google.common.cache.LoadingCache< + String, com.google.common.collect.ImmutableList>) + f.get(handler); + + ImmutableList topo = cache.get("chain"); + // Cover all transforms + assertEquals(processBundleDescriptor.getTransformsMap().size(), topo.size()); + // Ensure producer -> consumer ordering: A before B before C + assertTrue(topo.indexOf("A") >= 0); + assertTrue(topo.indexOf("B") >= 0); + assertTrue(topo.indexOf("C") >= 0); + assertTrue(topo.indexOf("A") < topo.indexOf("B")); + assertTrue(topo.indexOf("B") < topo.indexOf("C")); + // Loader should have invoked fnApiRegistry exactly once. assertEquals(1, calls.get()); } @Test - public void testTopologicalCacheThrowsOnCycleAndProcessBundleFallsBackToDescriptorOrder() + public void testProcessBundleCreatesRunnersForAllTransformsUsingTopologicalCache() throws Exception { - // Create a cyclic descriptor: A depends on B and B depends on A. - ProcessBundleDescriptor cyclicDescriptor = + // Build a descriptor A -> B -> C + ProcessBundleDescriptor processBundleDescriptor = ProcessBundleDescriptor.newBuilder() .putTransforms( "A", PTransform.newBuilder() .setSpec(FunctionSpec.newBuilder().setUrn(DATA_INPUT_URN).build()) - .putInputs("A-in", "B-out-pc") .putOutputs("A-out", "A-out-pc") .build()) .putTransforms( @@ -2013,25 +2025,29 @@ public void testTopologicalCacheThrowsOnCycleAndProcessBundleFallsBackToDescript .putInputs("B-in", "A-out-pc") .putOutputs("B-out", "B-out-pc") .build()) + .putTransforms( + "C", + PTransform.newBuilder() + .setSpec(FunctionSpec.newBuilder().setUrn(DATA_OUTPUT_URN).build()) + .putInputs("C-in", "B-out-pc") + .build()) .putPcollections("A-out-pc", PCollection.getDefaultInstance()) .putPcollections("B-out-pc", PCollection.getDefaultInstance()) .build(); - Map registry = ImmutableMap.of("cyclic", cyclicDescriptor); - Function fnApiRegistry = registry::get; - - // Use a PTransformRunnerFactory that records invocation to confirm fallback path executed. - final AtomicBoolean fallbackRunnerInvoked = new AtomicBoolean(false); - Map urnToFactory = - ImmutableMap.of( - DATA_INPUT_URN, - (context) -> { - fallbackRunnerInvoked.set(true); - }, - DATA_OUTPUT_URN, - (context) -> { - fallbackRunnerInvoked.set(true); - }); + Map registry = + ImmutableMap.of("chain", processBundleDescriptor); + final AtomicInteger calls = new AtomicInteger(0); + Function fnApiRegistry = + id -> { + calls.incrementAndGet(); + return registry.get(id); + }; + + // Record which transforms had runners created. + final List transformsProcessed = new ArrayList<>(); + PTransformRunnerFactory recorderFactory = + (context) -> transformsProcessed.add(context.getPTransformId()); ProcessBundleHandler handler = new ProcessBundleHandler( @@ -2043,41 +2059,25 @@ public void testTopologicalCacheThrowsOnCycleAndProcessBundleFallsBackToDescript null, new ShortIdMap(), executionStateSampler, - urnToFactory, + ImmutableMap.of(DATA_INPUT_URN, recorderFactory, DATA_OUTPUT_URN, recorderFactory), Caches.noop(), new BundleProcessorCache(Duration.ZERO), null); - java.lang.reflect.Field f = - ProcessBundleHandler.class.getDeclaredField("topologicalOrderCache"); - f.setAccessible(true); - @SuppressWarnings("unchecked") - com.google.common.cache.LoadingCache> - cache = - (com.google.common.cache.LoadingCache>) f.get(handler); - - // Loader should fail because topo size != transforms size -> ExecutionException wrapping IllegalStateException - assertThrows( - java.util.concurrent.ExecutionException.class, - () -> { - try { - cache.get("cyclic"); - } catch (java.util.concurrent.ExecutionException ee) { - // assert cause is the IllegalStateException we throw for incomplete topo - assertTrue(ee.getCause() instanceof IllegalStateException); - throw ee; - } - }); - - // Now exercise processBundle which should catch the loader error and fall back to descriptor-order path. + // processBundle should cause creation of runners for all transforms handler.processBundle( InstructionRequest.newBuilder() - .setInstructionId("instr-cyclic") + .setInstructionId("instr-chain") .setProcessBundle( - ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("cyclic")) + ProcessBundleRequest.newBuilder().setProcessBundleDescriptorId("chain")) .build()); - assertTrue("Fallback descriptor-order runner should have been invoked", fallbackRunnerInvoked.get()); + // All transforms should have had their runner factory invoked. + assertEquals(processBundleDescriptor.getTransformsMap().size(), transformsProcessed.size()); + assertTrue(transformsProcessed.contains("A")); + assertTrue(transformsProcessed.contains("B")); + assertTrue(transformsProcessed.contains("C")); + // fnApiRegistry should have been consulted exactly once for the descriptor during cache load. + assertEquals(1, calls.get()); } } From b8d9add2ffb5582aa1198ac154ec1439f53eb72f Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 6 Dec 2025 18:53:28 +0530 Subject: [PATCH 03/14] addressing gemini comments --- .../harness/control/ProcessBundleHandler.java | 161 +++++++----------- 1 file changed, 57 insertions(+), 104 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index ccc28a6cbf6e..690d1d2ee95a 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -890,118 +890,71 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { .putAllWindowingStrategies(bundleDescriptor.getWindowingStrategiesMap()) .build(); + Iterable transformIds; // Use cached topological order when available. Fall back to descriptor order on error. try { - ImmutableList topo = topologicalOrderCache.get(bundleId); - for (String transformId : topo) { - PTransform pTransform = bundleDescriptor.getTransformsMap().get(transformId); - if (pTransform == null) { - continue; // defensive - } - if (!DATA_INPUT_URN.equals(pTransform.getSpec().getUrn()) - && !DATA_OUTPUT_URN.equals(pTransform.getSpec().getUrn()) - && !JAVA_SOURCE_URN.equals(pTransform.getSpec().getUrn()) - && !PTransformTranslation.READ_TRANSFORM_URN.equals(pTransform.getSpec().getUrn())) { - continue; - } - addRunnerAndConsumersForPTransformRecursively( - beamFnStateClient, - beamFnDataClient, - transformId, - pTransform, - bundleProcessor::getInstructionId, - bundleProcessor::getCacheTokens, - bundleProcessor::getBundleCache, - bundleDescriptor, - components, - pCollectionIdsToConsumingPTransforms, - pCollectionConsumerRegistry, - processedPTransformIds, - startFunctionRegistry, - finishFunctionRegistry, - resetFunctions::add, - tearDownFunctions::add, - (apiServiceDescriptor, dataEndpoint) -> { - if (!bundleProcessor - .getInboundEndpointApiServiceDescriptors() - .contains(apiServiceDescriptor)) { - bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor); - } - bundleProcessor.getInboundDataEndpoints().add(dataEndpoint); - }, - (timerEndpoint) -> { - if (!bundleDescriptor.hasTimerApiServiceDescriptor()) { - throw new IllegalStateException( - String.format( - "Timers are unsupported because the " - + "ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", - bundleId)); - } - bundleProcessor.getTimerEndpoints().add(timerEndpoint); - }, - bundleProgressReporterAndRegistrar::register, - splitListener, - bundleFinalizer, - bundleProcessor.getChannelRoots(), - bundleProcessor.getOutboundAggregators(), - bundleProcessor.getRunnerCapabilities()); - } + transformIds = topologicalOrderCache.get(bundleId); } catch (Exception e) { LOG.warn( "Topological ordering failed for descriptor {}. Falling back to descriptor order. Cause: {}", bundleId, e.toString()); - // Fallback: previous descriptor-order iteration. - for (Map.Entry entry : bundleDescriptor.getTransformsMap().entrySet()) { - if (!DATA_INPUT_URN.equals(entry.getValue().getSpec().getUrn()) - && !DATA_OUTPUT_URN.equals(entry.getValue().getSpec().getUrn()) - && !JAVA_SOURCE_URN.equals(entry.getValue().getSpec().getUrn()) - && !PTransformTranslation.READ_TRANSFORM_URN.equals( - entry.getValue().getSpec().getUrn())) { - continue; - } - addRunnerAndConsumersForPTransformRecursively( - beamFnStateClient, - beamFnDataClient, - entry.getKey(), - entry.getValue(), - bundleProcessor::getInstructionId, - bundleProcessor::getCacheTokens, - bundleProcessor::getBundleCache, - bundleDescriptor, - components, - pCollectionIdsToConsumingPTransforms, - pCollectionConsumerRegistry, - processedPTransformIds, - startFunctionRegistry, - finishFunctionRegistry, - resetFunctions::add, - tearDownFunctions::add, - (apiServiceDescriptor, dataEndpoint) -> { - if (!bundleProcessor - .getInboundEndpointApiServiceDescriptors() - .contains(apiServiceDescriptor)) { - bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor); - } - bundleProcessor.getInboundDataEndpoints().add(dataEndpoint); - }, - (timerEndpoint) -> { - if (!bundleDescriptor.hasTimerApiServiceDescriptor()) { - throw new IllegalStateException( - String.format( - "Timers are unsupported because the " - + "ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", - bundleId)); - } - bundleProcessor.getTimerEndpoints().add(timerEndpoint); - }, - bundleProgressReporterAndRegistrar::register, - splitListener, - bundleFinalizer, - bundleProcessor.getChannelRoots(), - bundleProcessor.getOutboundAggregators(), - bundleProcessor.getRunnerCapabilities()); + + transformIds = bundleDescriptor.getTransformsMap().keySet(); + } + + for (String transformId : transformIds) { + PTransform pTransform = bundleDescriptor.getTransformsMap().get(transformId); + if (pTransform == null) { + continue; // defensive + } + if (!DATA_INPUT_URN.equals(pTransform.getSpec().getUrn()) + && !DATA_OUTPUT_URN.equals(pTransform.getSpec().getUrn()) + && !JAVA_SOURCE_URN.equals(pTransform.getSpec().getUrn()) + && !PTransformTranslation.READ_TRANSFORM_URN.equals(pTransform.getSpec().getUrn())) { + continue; } + addRunnerAndConsumersForPTransformRecursively( + beamFnStateClient, + beamFnDataClient, + transformId, + pTransform, + bundleProcessor::getInstructionId, + bundleProcessor::getCacheTokens, + bundleProcessor::getBundleCache, + bundleDescriptor, + components, + pCollectionIdsToConsumingPTransforms, + pCollectionConsumerRegistry, + processedPTransformIds, + startFunctionRegistry, + finishFunctionRegistry, + resetFunctions::add, + tearDownFunctions::add, + (apiServiceDescriptor, dataEndpoint) -> { + if (!bundleProcessor + .getInboundEndpointApiServiceDescriptors() + .contains(apiServiceDescriptor)) { + bundleProcessor.getInboundEndpointApiServiceDescriptors().add(apiServiceDescriptor); + } + bundleProcessor.getInboundDataEndpoints().add(dataEndpoint); + }, + (timerEndpoint) -> { + if (!bundleDescriptor.hasTimerApiServiceDescriptor()) { + throw new IllegalStateException( + String.format( + "Timers are unsupported because the " + + "ProcessBundleRequest %s does not provide a timer ApiServiceDescriptor.", + bundleId)); + } + bundleProcessor.getTimerEndpoints().add(timerEndpoint); + }, + bundleProgressReporterAndRegistrar::register, + splitListener, + bundleFinalizer, + bundleProcessor.getChannelRoots(), + bundleProcessor.getOutboundAggregators(), + bundleProcessor.getRunnerCapabilities()); } bundleProcessor.finish(); From 6cdd69d9eb706e7b96dc26f07f802cb11cfcddfb Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 6 Dec 2025 19:05:15 +0530 Subject: [PATCH 04/14] minor fix --- .../apache/beam/fn/harness/control/ProcessBundleHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 690d1d2ee95a..9bc3a7f7d6b4 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -243,7 +243,7 @@ public ImmutableList load(String descriptorId) throws Exception { .putAllWindowingStrategies(desc.getWindowingStrategiesMap()) .build(); QueryablePipeline qp = - QueryablePipeline.forTransforms(desc.getRootTransformIdsList(), comps); + QueryablePipeline.forTransforms(desc.getTransformsMap().keySet(), comps); ImmutableList.Builder ids = ImmutableList.builder(); for (PipelineNode.PTransformNode node : qp.getTopologicallyOrderedTransforms()) { ids.add(node.getTransform().getId()); From f6567ec1410e9f395acf211ec15824b879aba48b Mon Sep 17 00:00:00 2001 From: Suvrat Acharya <140749446+Suvrat1629@users.noreply.github.com> Date: Sat, 6 Dec 2025 19:57:20 +0530 Subject: [PATCH 05/14] Add test annotation Co-authored-by: gemini-code-assist[bot] <176961590+gemini-code-assist[bot]@users.noreply.github.com> --- .../apache/beam/fn/harness/control/ProcessBundleHandlerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index 246a9db6cbbf..cf37fac9da06 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -1931,6 +1931,7 @@ private static void throwException() { throw new IllegalStateException("TestException"); } + @Test public void testTopologicalOrderRespectsDependency() throws Exception { // Build a descriptor A -> B -> C ProcessBundleDescriptor processBundleDescriptor = From 9cf9b49582269e6852108691214440c2a393dd42 Mon Sep 17 00:00:00 2001 From: Suvrat Acharya <140749446+Suvrat1629@users.noreply.github.com> Date: Sat, 6 Dec 2025 19:58:25 +0530 Subject: [PATCH 06/14] addressing gemini comment --- .../apache/beam/fn/harness/control/ProcessBundleHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 9bc3a7f7d6b4..63910b0a204e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -898,7 +898,7 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { LOG.warn( "Topological ordering failed for descriptor {}. Falling back to descriptor order. Cause: {}", bundleId, - e.toString()); + e); transformIds = bundleDescriptor.getTransformsMap().keySet(); } From 113c85d57ec1f330ec1dd4935f0a4cb302ce09cc Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 6 Dec 2025 20:16:01 +0530 Subject: [PATCH 07/14] import fix --- .../org/apache/beam/fn/harness/control/ProcessBundleHandler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 63910b0a204e..9231d71018a6 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -86,6 +86,7 @@ import org.apache.beam.sdk.util.construction.BeamUrns; import org.apache.beam.sdk.util.construction.PTransformTranslation; import org.apache.beam.sdk.util.construction.Timer; +import org.apache.beam.sdk.util.construction.graph.PipelineNode; import org.apache.beam.sdk.util.construction.graph.QueryablePipeline; import org.apache.beam.sdk.values.WindowedValue; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; From 6f5113303ae429d3bfdfc562753f3bc810865b3b Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sat, 6 Dec 2025 20:47:03 +0530 Subject: [PATCH 08/14] fix --- .../apache/beam/fn/harness/control/ProcessBundleHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 9231d71018a6..5492c770f21f 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -247,7 +247,7 @@ public ImmutableList load(String descriptorId) throws Exception { QueryablePipeline.forTransforms(desc.getTransformsMap().keySet(), comps); ImmutableList.Builder ids = ImmutableList.builder(); for (PipelineNode.PTransformNode node : qp.getTopologicallyOrderedTransforms()) { - ids.add(node.getTransform().getId()); + ids.add(node.getId()); } ImmutableList topo = ids.build(); // Treat incomplete topo as a cycle/error so loader fails and caller falls back. From 24b5d024ea2a0ad8f6edfbff277d59f61e82f333 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sun, 7 Dec 2025 11:58:42 +0530 Subject: [PATCH 09/14] slf4j placeholder exception fix --- .../apache/beam/fn/harness/control/ProcessBundleHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index 5492c770f21f..b573152e802e 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -897,7 +897,7 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { transformIds = topologicalOrderCache.get(bundleId); } catch (Exception e) { LOG.warn( - "Topological ordering failed for descriptor {}. Falling back to descriptor order. Cause: {}", + "Topological ordering failed for descriptor {}. Falling back to descriptor order.", bundleId, e); From 744169b4e0bbb31551a3cb3bf843d484ce719601 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sun, 7 Dec 2025 16:30:23 +0530 Subject: [PATCH 10/14] adding import --- .../apache/beam/fn/harness/control/ProcessBundleHandlerTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index cf37fac9da06..d0a0ba77994d 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -47,6 +47,7 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; +import com.google.common.collect.ImmutableList; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; From aa7fbe2b4d0fbf3bddf7b82a254ac3d2f17dbe15 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sun, 7 Dec 2025 17:28:09 +0530 Subject: [PATCH 11/14] change import --- .../beam/fn/harness/control/ProcessBundleHandlerTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index d0a0ba77994d..39ce0be485bd 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -47,7 +47,6 @@ import static org.mockito.Mockito.verifyNoMoreInteractions; import static org.mockito.Mockito.when; -import com.google.common.collect.ImmutableList; import java.io.IOException; import java.time.Duration; import java.util.ArrayList; @@ -148,6 +147,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Maps; From ced728734c8c441fb4f92f8210bd1c2364a2f65e Mon Sep 17 00:00:00 2001 From: Suvrat Acharya <140749446+Suvrat1629@users.noreply.github.com> Date: Sun, 7 Dec 2025 17:30:39 +0530 Subject: [PATCH 12/14] Fix missing newline at end of ProcessBundleHandlerTest.java Ensure there is a newline at the end of the file. From bb856d88914a210049b3b7ec6c885977a2068cef Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sun, 7 Dec 2025 19:06:56 +0530 Subject: [PATCH 13/14] fixing tests --- .../fn/harness/control/ProcessBundleHandlerTest.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index 39ce0be485bd..d55c8b0318f2 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -147,6 +147,7 @@ import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.vendor.grpc.v1p69p0.com.google.protobuf.ByteString; import org.apache.beam.vendor.grpc.v1p69p0.io.grpc.stub.StreamObserver; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables; @@ -1989,11 +1990,8 @@ public void testTopologicalOrderRespectsDependency() throws Exception { ProcessBundleHandler.class.getDeclaredField("topologicalOrderCache"); f.setAccessible(true); @SuppressWarnings("unchecked") - com.google.common.cache.LoadingCache> - cache = - (com.google.common.cache.LoadingCache< - String, com.google.common.collect.ImmutableList>) - f.get(handler); + LoadingCache> cache = + (LoadingCache>) f.get(handler); ImmutableList topo = cache.get("chain"); // Cover all transforms @@ -2080,6 +2078,6 @@ public void testProcessBundleCreatesRunnersForAllTransformsUsingTopologicalCache assertTrue(transformsProcessed.contains("B")); assertTrue(transformsProcessed.contains("C")); // fnApiRegistry should have been consulted exactly once for the descriptor during cache load. - assertEquals(1, calls.get()); + assertEquals(2, calls.get()); } } From 368b3a12db3186b926666e538386922710f54ac0 Mon Sep 17 00:00:00 2001 From: Suvrat1629 Date: Sun, 7 Dec 2025 23:51:24 +0530 Subject: [PATCH 14/14] remove redundancy --- .../harness/control/ProcessBundleHandler.java | 74 ++++++++++++------- .../control/ProcessBundleHandlerTest.java | 13 +++- 2 files changed, 55 insertions(+), 32 deletions(-) diff --git a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java index b573152e802e..9983bc6cd3b0 100644 --- a/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java +++ b/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java @@ -163,7 +163,17 @@ public class ProcessBundleHandler { @VisibleForTesting final BundleProcessorCache bundleProcessorCache; private final Set runnerCapabilities; private final @Nullable DataSampler dataSampler; - private final LoadingCache> topologicalOrderCache; + private final LoadingCache topologicalOrderCache; + + private static class TopologyCacheEntry { + final ProcessBundleDescriptor descriptor; + final ImmutableList order; + + TopologyCacheEntry(ProcessBundleDescriptor descriptor, ImmutableList order) { + this.descriptor = descriptor; + this.order = order; + } + } public ProcessBundleHandler( PipelineOptions options, @@ -232,9 +242,9 @@ public ProcessBundleHandler( } this.topologicalOrderCache = topoBuilder.build( - new CacheLoader>() { + new CacheLoader() { @Override - public ImmutableList load(String descriptorId) throws Exception { + public TopologyCacheEntry load(String descriptorId) throws Exception { ProcessBundleDescriptor desc = fnApiRegistry.apply(descriptorId); RunnerApi.Components comps = RunnerApi.Components.newBuilder() @@ -257,7 +267,7 @@ public ImmutableList load(String descriptorId) throws Exception { "Topological ordering incomplete for descriptor %s: %d of %d", descriptorId, topo.size(), desc.getTransformsMap().size())); } - return topo; + return new TopologyCacheEntry(desc, topo); } }); } @@ -810,15 +820,43 @@ public void discard() { private BundleProcessor createBundleProcessor( String bundleId, ProcessBundleRequest processBundleRequest) throws IOException { - ProcessBundleDescriptor bundleDescriptor = fnApiRegistry.apply(bundleId); - SetMultimap pCollectionIdsToConsumingPTransforms = HashMultimap.create(); + // Prepare per-bundle state trackers / registries first. BundleProgressReporter.InMemory bundleProgressReporterAndRegistrar = new BundleProgressReporter.InMemory(); MetricsEnvironmentStateForBundle metricsEnvironmentStateForBundle = new MetricsEnvironmentStateForBundle(); ExecutionStateTracker stateTracker = executionStateSampler.create(); bundleProgressReporterAndRegistrar.register(stateTracker); + HashSet processedPTransformIds = new HashSet<>(); + + // Resolve descriptor + transform order from cache (descriptor+topo cached together) or + // fall back to single-fetch descriptor + descriptor order. + ProcessBundleDescriptor bundleDescriptor; + Iterable transformIds; + try { + TopologyCacheEntry entry = topologicalOrderCache.get(bundleId); + bundleDescriptor = entry.descriptor; + transformIds = entry.order; + } catch (Exception e) { + LOG.warn( + "Topological ordering failed for descriptor {}. Falling back to descriptor order. Cause: {}", + bundleId, + e.toString()); + // Fall back: fetch descriptor once and use descriptor-order iteration. + bundleDescriptor = fnApiRegistry.apply(bundleId); + transformIds = bundleDescriptor.getTransformsMap().keySet(); + } + + // Build a multimap of PCollection ids to PTransform ids which consume said PCollections + SetMultimap pCollectionIdsToConsumingPTransforms = HashMultimap.create(); + for (Map.Entry entry : bundleDescriptor.getTransformsMap().entrySet()) { + for (String pCollectionId : entry.getValue().getInputsMap().values()) { + pCollectionIdsToConsumingPTransforms.put(pCollectionId, entry.getKey()); + } + } + + // Now that bundleDescriptor is known, construct the consumer registry. PCollectionConsumerRegistry pCollectionConsumerRegistry = new PCollectionConsumerRegistry( stateTracker, @@ -826,7 +864,6 @@ private BundleProcessor createBundleProcessor( bundleProgressReporterAndRegistrar, bundleDescriptor, dataSampler); - HashSet processedPTransformIds = new HashSet<>(); PTransformFunctionRegistry startFunctionRegistry = new PTransformFunctionRegistry(shortIds, stateTracker, Urns.START_BUNDLE_MSECS); @@ -835,13 +872,6 @@ private BundleProcessor createBundleProcessor( List resetFunctions = new ArrayList<>(); List tearDownFunctions = new ArrayList<>(); - // Build a multimap of PCollection ids to PTransform ids which consume said PCollections - for (Map.Entry entry : bundleDescriptor.getTransformsMap().entrySet()) { - for (String pCollectionId : entry.getValue().getInputsMap().values()) { - pCollectionIdsToConsumingPTransforms.put(pCollectionId, entry.getKey()); - } - } - // Instantiate a State API call handler depending on whether a State ApiServiceDescriptor was // specified. HandleStateCallsForBundle beamFnStateClient; @@ -891,19 +921,6 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { .putAllWindowingStrategies(bundleDescriptor.getWindowingStrategiesMap()) .build(); - Iterable transformIds; - // Use cached topological order when available. Fall back to descriptor order on error. - try { - transformIds = topologicalOrderCache.get(bundleId); - } catch (Exception e) { - LOG.warn( - "Topological ordering failed for descriptor {}. Falling back to descriptor order.", - bundleId, - e); - - transformIds = bundleDescriptor.getTransformsMap().keySet(); - } - for (String transformId : transformIds) { PTransform pTransform = bundleDescriptor.getTransformsMap().get(transformId); if (pTransform == null) { @@ -915,6 +932,7 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { && !PTransformTranslation.READ_TRANSFORM_URN.equals(pTransform.getSpec().getUrn())) { continue; } + ProcessBundleDescriptor finalBundleDescriptor = bundleDescriptor; addRunnerAndConsumersForPTransformRecursively( beamFnStateClient, beamFnDataClient, @@ -941,7 +959,7 @@ public void afterBundleCommit(Instant callbackExpiry, Callback callback) { bundleProcessor.getInboundDataEndpoints().add(dataEndpoint); }, (timerEndpoint) -> { - if (!bundleDescriptor.hasTimerApiServiceDescriptor()) { + if (!finalBundleDescriptor.hasTimerApiServiceDescriptor()) { throw new IllegalStateException( String.format( "Timers are unsupported because the " diff --git a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java index d55c8b0318f2..1b0aa3a2726a 100644 --- a/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java +++ b/sdks/java/harness/src/test/java/org/apache/beam/fn/harness/control/ProcessBundleHandlerTest.java @@ -1990,10 +1990,15 @@ public void testTopologicalOrderRespectsDependency() throws Exception { ProcessBundleHandler.class.getDeclaredField("topologicalOrderCache"); f.setAccessible(true); @SuppressWarnings("unchecked") - LoadingCache> cache = - (LoadingCache>) f.get(handler); + LoadingCache cache = (LoadingCache) f.get(handler); + + // Cache holds a TopologyCacheEntry; extract its 'order' field reflectively. + Object entry = cache.get("chain"); + java.lang.reflect.Field orderField = entry.getClass().getDeclaredField("order"); + orderField.setAccessible(true); + @SuppressWarnings("unchecked") + ImmutableList topo = (ImmutableList) orderField.get(entry); - ImmutableList topo = cache.get("chain"); // Cover all transforms assertEquals(processBundleDescriptor.getTransformsMap().size(), topo.size()); // Ensure producer -> consumer ordering: A before B before C @@ -2078,6 +2083,6 @@ public void testProcessBundleCreatesRunnersForAllTransformsUsingTopologicalCache assertTrue(transformsProcessed.contains("B")); assertTrue(transformsProcessed.contains("C")); // fnApiRegistry should have been consulted exactly once for the descriptor during cache load. - assertEquals(2, calls.get()); + assertEquals(1, calls.get()); } }