Skip to content

Commit

Permalink
Port SDK changes v3.7.0-v3.8.0
Browse files Browse the repository at this point in the history
- added InvalidArchive and InvalidAsset
- modify getAllTransitivelyReferencedCustomResourceUrnsAsync into getAllTransitivelyReferencedResourceUrnsAsync
- DefaultRunner enhancements

- IGNORED DefaultRunner fixes from C#, filed as separate issue #59
  • Loading branch information
pawelprazak committed Feb 5, 2022
1 parent b687596 commit b80a26e
Show file tree
Hide file tree
Showing 9 changed files with 286 additions and 52 deletions.
8 changes: 8 additions & 0 deletions sdk/jvm/pulumi/src/main/java/io/pulumi/core/Archive.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.pulumi.core;

import io.grpc.Internal;
import io.pulumi.core.internal.Constants;

import java.util.Map;
Expand Down Expand Up @@ -44,4 +45,11 @@ public RemoteArchive(String uri) {
super(Constants.AssetOrArchiveUriName, uri);
}
}

@Internal
public static final class InvalidArchive extends Archive {
public InvalidArchive() {
super(Constants.ArchiveAssetsName, Map.<String, AssetOrArchive>of());
}
}
}
8 changes: 8 additions & 0 deletions sdk/jvm/pulumi/src/main/java/io/pulumi/core/Asset.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.pulumi.core;

import io.grpc.Internal;
import io.pulumi.core.internal.Constants;

/**
Expand Down Expand Up @@ -39,4 +40,11 @@ public RemoteAsset(String uri) {
super(Constants.AssetOrArchiveUriName, uri);
}
}

@Internal
public static final class InvalidAsset extends Asset {
public InvalidAsset() {
super(Constants.AssetTextName, "");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.*;
import java.util.logging.Level;
Expand Down Expand Up @@ -562,7 +563,7 @@ private CompletableFuture<PrepareResult> prepareResourceAsync(
var allDirectDependencies = new HashSet<>(explicitDirectDependencies);

var allDirectDependencyUrnsFuture =
CompletableFutures.builder(getAllTransitivelyReferencedCustomResourceUrnsAsync(explicitDirectDependencies));
CompletableFutures.builder(getAllTransitivelyReferencedResourceUrnsAsync(explicitDirectDependencies));
var propertyToDirectDependencyUrnFutures = new HashMap<String, CompletableFuture<ImmutableSet<String>>>();

for (var entry : propertyToDirectDependencies.entrySet()) {
Expand All @@ -571,7 +572,7 @@ private CompletableFuture<PrepareResult> prepareResourceAsync(

allDirectDependencies.addAll(directDependencies);

var urns = getAllTransitivelyReferencedCustomResourceUrnsAsync(
var urns = getAllTransitivelyReferencedResourceUrnsAsync(
ImmutableSet.copyOf(directDependencies)
);
allDirectDependencyUrnsFuture.accumulate(urns, (s1, s2) -> Sets.union(s1, s2).immutableCopy());
Expand Down Expand Up @@ -624,34 +625,46 @@ private CompletableFuture<List<Resource>> gatherExplicitDependenciesAsync(Input<
return TypedInputOutput.cast(resources).view(InputOutputData::getValueNullable);
}

private CompletableFuture<ImmutableSet<String>> getAllTransitivelyReferencedCustomResourceUrnsAsync(
private CompletableFuture<ImmutableSet<String>> getAllTransitivelyReferencedResourceUrnsAsync(
ImmutableSet<Resource> resources
) {
// Go through 'resources', but transitively walk through **Component** resources,
// collecting any of their child resources. This way, a Component acts as an
// aggregation really of all the reachable custom resources it parents. This walking
// will transitively walk through other child ComponentResources, but will stop when it
// hits custom resources. In other words, if we had:
// Go through 'resources', but transitively walk through **Component** resources, collecting any
// of their child resources. This way, a Component acts as an aggregation really of all the
// reachable resources it parents. This walking will stop when it hits custom resources.
//
// Comp1
// / \
// Cust1 Comp2
// / \
// Cust2 Cust3
// /
// Cust4
// This function also terminates at remote components, whose children are not known to the Node SDK directly.
// Remote components will always wait on all of their children, so ensuring we return the remote component
// itself here and waiting on it will accomplish waiting on all of it's children regardless of whether they
// are returned explicitly here.
//
// Then the transitively reachable custom resources of Comp1 will be [Cust1, Cust2, Cust3].
// It will *not* include 'Cust4'.

// To do this, first we just get the transitively reachable set of resources (not diving
// into custom resources). In the above picture, if we start with 'Comp1', this will be
// [Comp1, Cust1, Comp2, Cust2, Cust3]
// In other words, if we had:
//
// Comp1
// / | \
// Cust1 Comp2 Remote1
// / \ \
// Cust2 Cust3 Comp3
// / \
// Cust4 Cust5
//
// Then the transitively reachable resources of Comp1 will be [Cust1, Cust2, Cust3, Remote1]. It
// will *not* include:
// * Cust4 because it is a child of a custom resource
// * Comp2 because it is a non-remote component resoruce
// * Comp3 and Cust5 because Comp3 is a child of a remote component resource
var transitivelyReachableResources =
getTransitivelyReferencedChildResourcesOfComponentResources(resources);

var transitivelyReachableCustomResources = transitivelyReachableResources.stream()
.filter(resource -> resource instanceof CustomResource)
.filter(resource -> {
if (resource instanceof CustomResource) {
return true;
}
if (resource instanceof ComponentResource) {
return resource.internalGetRemote();
}
return false; // Unreachable
})
.map(resource -> TypedInputOutput.cast(resource.getUrn()).view(InputOutputData::getValueNullable))
.collect(toImmutableSet());
return CompletableFutures.allOf(transitivelyReachableCustomResources)
Expand Down Expand Up @@ -1345,8 +1358,7 @@ static class DefaultRunner implements Runner {
* exiting once the set becomes empty.
*/
private final Map<CompletableFuture<Void>, List<String>> inFlightTasks = new ConcurrentHashMap<>();

private final List<Exception> swallowedExceptions = new ArrayList<>();
private final Queue<Exception> swallowedExceptions = new ConcurrentLinkedQueue<>();

public DefaultRunner(DeploymentState deployment, Logger standardLogger) {
this.engineLogger = Objects.requireNonNull(Objects.requireNonNull(deployment).logger);
Expand Down Expand Up @@ -1411,12 +1423,15 @@ public <T> void registerTask(String description, CompletableFuture<T> task) {
Objects.requireNonNull(task);
standardLogger.log(Level.FINEST, String.format("Registering task: '%s', %s", description, task));

// we don't need the result here, just the future itself
CompletableFuture<Void> key = task.thenApply(ignore -> null);
// TODO: should we enforce a timeout (with .orTimeout()) on the task (and make it configurable)?

// We may get several of the same tasks with different descriptions. That can
// happen when the runtime reuses cached tasks that it knows are value-identical
// (for example a completed future). In that case, we just store all the descriptions.
// We'll print them all out as done once this task actually finishes.
inFlightTasks.compute(
task.thenApply(ignore -> null), // TODO: should we enforce a timeout here (and make it configurable)?
inFlightTasks.compute(key,
(ignore, descriptions) -> {
if (descriptions == null) {
return Lists.newArrayList(description);
Expand Down Expand Up @@ -1455,7 +1470,11 @@ private CompletableFuture<Integer> whileRunningAsync() {

// Log the descriptions of completed tasks.
if (standardLogger.isLoggable(Level.FINEST)) {
List<String> descriptions = inFlightTasks.getOrDefault(task, List.of()); // FIXME: this should never return null, but it does for whatever reason
List<String> descriptions = inFlightTasks.getOrDefault(task, List.of());
// getOrDefault should never return null, but it does for whatever reason, so just to be sure
if (descriptions == null) {
descriptions = List.of();
}
standardLogger.log(Level.FINEST, String.format("Completed task: '%s', %s", String.join(",", descriptions), task));
}
} catch (Exception e) {
Expand Down Expand Up @@ -1502,7 +1521,8 @@ private CompletableFuture<Integer> handleExceptionAsync(Exception exception) {
};

if (exception instanceof LogException) {
// We got an error while logging itself. Nothing to do here but print some errors and fail entirely.
// We got an error while logging itself.
// Nothing to do here but print some errors and abort.
standardLogger.log(Level.SEVERE, String.format(
"Error occurred trying to send logging message to engine: %s", exception.getMessage()));
return CompletableFuture.supplyAsync(() -> {
Expand All @@ -1519,13 +1539,14 @@ private CompletableFuture<Integer> handleExceptionAsync(Exception exception) {
return handleExceptionAsync((Exception) exception.getCause());
}

// For the rest of the issue we encounter log the problem to the error stream. If we
// successfully do this, then return with a special error code stating as such so that
// our host doesn't print out another set of errors.
// For all other issues we encounter we log the
// problem to the error stream.
//
// Note: if these logging calls fail, they will just end up bubbling up an exception
// that will be caught by nothing. This will tear down the actual process with a
// non-zero error which our host will handle properly.
// Note: if these logging calls fail, they will just
// end up bubbling up an exception that will be caught
// by nothing. This will tear down the actual process
// with a non-zero error which our host will handle
// properly.
if (exception instanceof RunException) {
// Always hide the stack for RunErrors.
return engineLogger
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ public abstract class Resource {

protected final Set<Resource> childResources = Collections.synchronizedSet(new HashSet<>());

protected final boolean remote;

/**
* @see Resource#Resource(String, String, boolean, ResourceArgs, ResourceOptions, boolean, boolean)
*/
Expand Down Expand Up @@ -73,6 +75,8 @@ protected Resource(
ResourceArgs args, ResourceOptions options,
boolean remote, boolean dependency
) {
this.remote = remote;

if (dependency) {
// this.urn will be set using setter in the subtype constructor after this supertype constructor finishes
this.type = "";
Expand Down Expand Up @@ -239,6 +243,11 @@ public List<Input<String>> internalGetAliases() {
return this.aliases;
}

@Internal
public boolean internalGetRemote() {
return this.remote;
}

public String getResourceType() {
return type;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import com.google.gson.JsonNull;
import com.google.gson.stream.JsonWriter;
import com.google.protobuf.Value;
import io.pulumi.core.Archive;
import io.pulumi.core.Archive.InvalidArchive;
import io.pulumi.core.Asset.InvalidAsset;
import io.pulumi.core.AssetOrArchive;
import io.pulumi.core.Either;
import io.pulumi.core.internal.InputOutputData;
Expand Down Expand Up @@ -201,8 +204,20 @@ private static Object tryConvertObjectInner(
return value;
}

if (Archive.class.isAssignableFrom(targetType.getType())) {
try {
return tryEnsureType(context, value, targetType);
} catch (UnsupportedOperationException ex) {
return tryEnsureType(context, new InvalidArchive(), targetType);
}
}

if (AssetOrArchive.class.isAssignableFrom(targetType.getType())) {
return tryEnsureType(context, value, targetType);
try {
return tryEnsureType(context, value, targetType);
} catch (UnsupportedOperationException ex) {
return tryEnsureType(context, new InvalidAsset(), targetType);
}
}

if (JsonElement.class.isAssignableFrom(targetType.getType())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
import com.google.protobuf.Value;
import io.grpc.Internal;
import io.pulumi.Log;
import io.pulumi.core.Archive.InvalidArchive;
import io.pulumi.core.Asset.InvalidAsset;
import io.pulumi.core.AssetOrArchive;
import io.pulumi.core.Either;
import io.pulumi.core.InputOutput;
Expand Down Expand Up @@ -347,6 +349,13 @@ private Object serializeJson(String ctx, JsonElement element) {
Log.debug(String.format("Serialize property[%s]: asset/archive=%s", ctx, assetOrArchive.getClass().getSimpleName()));
}

if (assetOrArchive instanceof InvalidAsset) {
throw new UnsupportedOperationException("Cannot serialize invalid asset");
}
if (assetOrArchive instanceof InvalidArchive) {
throw new UnsupportedOperationException("Cannot serialize invalid archive");
}

var propName = assetOrArchive.getPropName();
return serializeAsync(ctx + "." + propName, assetOrArchive.getValue(), keepResources).thenApply(
/* @Nullable */ value -> {
Expand Down
Loading

0 comments on commit b80a26e

Please sign in to comment.