Skip to content
38 changes: 19 additions & 19 deletions LocalTesting/LocalTesting.FlinkSqlAppHost/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@
return;
}

// Custom network creation removed - Aspire doesn't automatically use custom networks with Podman
// Containers will use default network and rely on port mapping for connectivity
// EnsureDockerNetworkExists("aspire-flink-network"); // Disabled - not effective with Aspire + Podman
// Ensure Flink/Kafka network exists for container communication
EnsureFlinkKafkaNetwork();

LogConfiguredPorts();
SetupEnvironment();
Expand Down Expand Up @@ -38,8 +37,8 @@
// Configure Kafka with FIXED external port 9093
// Both tests and Flink jobs connect to localhost:9093 (mapped to container port 9092)
#pragma warning disable S1481 // Kafka resource is created but not directly referenced - used via connection string
var kafka = builder.AddKafka("kafka")
.WithLifetime(ContainerLifetime.Persistent);
var kafka = builder.AddKafka("kafka");
// .WithLifetime(ContainerLifetime.Persistent); // Commented out for testing without persistence
#pragma warning restore S1481

// Flink JobManager with named HTTP endpoint for service references
Expand Down Expand Up @@ -77,8 +76,8 @@
.WithEnvironment("JAVA_TOOL_OPTIONS", JavaOpenOptions)
.WithBindMount(Path.Combine(connectorsDir, "flink-sql-connector-kafka-4.0.1-2.0.jar"), "/opt/flink/lib/flink-sql-connector-kafka-4.0.1-2.0.jar", isReadOnly: true)
.WithBindMount(Path.Combine(connectorsDir, "flink-json-2.1.0.jar"), "/opt/flink/lib/flink-json-2.1.0.jar", isReadOnly: true)
.WithArgs("jobmanager")
.WithLifetime(ContainerLifetime.Persistent);
.WithArgs("jobmanager");
// .WithLifetime(ContainerLifetime.Persistent); // Commented out for testing without persistence

// Flink TaskManager with increased slots for parallel test execution (10 tests)
// All ports are hardcoded - no WaitFor dependencies needed for parallel startup
Expand All @@ -102,8 +101,8 @@
.WithBindMount(Path.Combine(connectorsDir, "flink-sql-connector-kafka-4.0.1-2.0.jar"), "/opt/flink/lib/flink-sql-connector-kafka-4.0.1-2.0.jar", isReadOnly: true)
.WithBindMount(Path.Combine(connectorsDir, "flink-json-2.1.0.jar"), "/opt/flink/lib/flink-json-2.1.0.jar", isReadOnly: true)
.WithReference(kafka)
.WithArgs("taskmanager")
.WithLifetime(ContainerLifetime.Persistent);
.WithArgs("taskmanager");
// .WithLifetime(ContainerLifetime.Persistent); // Commented out for testing without persistence

// Flink SQL Gateway - Enables SQL Gateway REST API for direct SQL submission
// SQL Gateway provides /v1/statements endpoint for executing SQL without JAR submission
Expand Down Expand Up @@ -138,8 +137,8 @@
.WithEnvironment("JAVA_TOOL_OPTIONS", JavaOpenOptions)
.WithBindMount(Path.Combine(connectorsDir, "flink-sql-connector-kafka-4.0.1-2.0.jar"), "/opt/flink/lib/flink-sql-connector-kafka-4.0.1-2.0.jar", isReadOnly: true)
.WithBindMount(Path.Combine(connectorsDir, "flink-json-2.1.0.jar"), "/opt/flink/lib/flink-json-2.1.0.jar", isReadOnly: true)
.WithArgs("/opt/flink/bin/sql-gateway.sh", "start-foreground")
.WithLifetime(ContainerLifetime.Persistent);
.WithArgs("/opt/flink/bin/sql-gateway.sh", "start-foreground");
// .WithLifetime(ContainerLifetime.Persistent); // Commented out for testing without persistence

// Flink.JobGateway - Add Flink Job Gateway
// IMPORTANT: Gateway needs container network address since it submits jobs to Flink containers
Expand Down Expand Up @@ -423,11 +422,12 @@ static void SetPodmanDockerHost()
}
}

static void EnsureDockerNetworkExists(string networkName)
static void EnsureFlinkKafkaNetwork()
{
const string networkName = "aspire-flink-network";

try
{
// Detect which container runtime to use (docker or podman)
var containerRuntime = GetContainerRuntimeCommand();

// Check if network already exists
Expand Down Expand Up @@ -468,29 +468,29 @@ static void EnsureDockerNetworkExists(string networkName)
using var createProcess = Process.Start(createPsi);
if (createProcess != null)
{
createProcess.StandardOutput.ReadToEnd(); // Consume output
createProcess.StandardOutput.ReadToEnd();
var error = createProcess.StandardError.ReadToEnd();
createProcess.WaitForExit(5000);

if (createProcess.ExitCode == 0)
{
Console.WriteLine($"✅ Created {containerRuntime} network '{networkName}' for DNS resolution");
Console.WriteLine($"✅ Created {containerRuntime} network '{networkName}' for Flink-Kafka communication");
}
else
{
Console.WriteLine($"⚠️ Failed to create network '{networkName}' with {containerRuntime}: {error}");
Console.WriteLine($"⚠️ Failed to create network '{networkName}': {error}");
}
}
}
catch (Exception ex)
{
Console.WriteLine($"⚠️ Error managing container network: {ex.Message}");
Console.WriteLine($"⚠️ Error managing Flink-Kafka network: {ex.Message}");
}
}

static string GetContainerRuntimeCommand()
{
// Check if Podman is available and preferred
// Check if Podman is configured as the runtime
if (Environment.GetEnvironmentVariable("ASPIRE_CONTAINER_RUNTIME") == "podman")
{
return "podman";
Expand All @@ -508,6 +508,6 @@ static string GetContainerRuntimeCommand()
return "podman";
}

// Default to docker (will fail gracefully if not available)
// Default to docker
return "docker";
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
// Tests will reuse the shared GlobalTestInfrastructure (Kafka + Flink + Gateway)
// Each test uses unique topics via TestContext.CurrentContext.Test.ID to avoid conflicts
[assembly: Parallelizable(ParallelScope.All)]
[assembly: LevelOfParallelism(7)] // Run up to 7 tests in parallel
[assembly: LevelOfParallelism(10)] // Run up to 10 tests in parallel (more than test count for max parallelism)
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ namespace LocalTesting.IntegrationTests;
public class GatewayAllPatternsTests : LocalTestingTestBase
{
private static readonly TimeSpan TestTimeout = TimeSpan.FromMinutes(2);
private static readonly TimeSpan JobRunTimeout = TimeSpan.FromSeconds(60);
private static readonly TimeSpan MessageTimeout = TimeSpan.FromSeconds(45);
private static readonly TimeSpan JobRunTimeout = TimeSpan.FromSeconds(30);
private static readonly TimeSpan MessageTimeout = TimeSpan.FromSeconds(30);

[Test]
public async Task Gateway_Pattern1_Uppercase_ShouldWork()
Expand Down Expand Up @@ -147,12 +147,8 @@ private async Task RunGatewayPatternTest(

try
{
// Quick health check - global setup already validated everything
TestContext.WriteLine("⏳ Quick infrastructure health check...");
await WaitForFullInfrastructureAsync(includeGateway: true, lightweightMode: true, ct);
TestContext.WriteLine("✅ Infrastructure ready");

// Create topics
// Skip health check - global setup already validated everything
// Create topics immediately
TestContext.WriteLine($"📝 Creating topics: {inputTopic} -> {outputTopic}");
await CreateTopicAsync(inputTopic, 1);
await CreateTopicAsync(outputTopic, 1);
Expand Down Expand Up @@ -193,8 +189,8 @@ private async Task RunGatewayPatternTest(
TestContext.WriteLine($"📤 Producing {inputMessages.Length} messages...");
await ProduceMessagesAsync(inputTopic, inputMessages, ct, usesJson);

// Consume and verify
var consumeTimeout = allowLongerProcessing ? TimeSpan.FromSeconds(75) : MessageTimeout;
// Consume and verify (reduced timeout for faster tests)
var consumeTimeout = allowLongerProcessing ? TimeSpan.FromSeconds(45) : MessageTimeout;
var consumed = await ConsumeMessagesAsync(outputTopic, expectedOutputCount, consumeTimeout, ct);

TestContext.WriteLine($"📊 Consumed {consumed.Count} messages (expected: {expectedOutputCount})");
Expand Down Expand Up @@ -347,7 +343,7 @@ private static async Task WaitForJobRunningViaGatewayAsync(string gatewayBaseUrl
TestContext.WriteLine($" ⏳ Attempt {attempt}: Request failed - {ex.Message}");
}

await Task.Delay(1000, ct);
await Task.Delay(500, ct); // Reduced from 1000ms to 500ms
}

throw new TimeoutException($"Job {jobId} did not reach RUNNING state within {timeout.TotalSeconds:F0}s");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,38 +178,31 @@ await app.ResourceNotifications
[OneTimeTearDown]
public async Task GlobalTearDown()
{
Console.WriteLine("🌍 ========================================");
Console.WriteLine("🌍 GLOBAL TEST INFRASTRUCTURE TEARDOWN START");
Console.WriteLine("🌍 ========================================");
Console.WriteLine("🌍 TEARDOWN: Cleaning up test infrastructure...");

if (AppHost != null)
{
try
{
Console.WriteLine("🔧 Stopping AppHost...");
await AppHost.StopAsync();
Console.WriteLine("✅ AppHost stopped");
}
catch (Exception ex)
{
Console.WriteLine($"⚠️ Error stopping AppHost: {ex.Message}");
}

try
{
Console.WriteLine("🔧 Disposing AppHost...");
await AppHost.DisposeAsync();
Console.WriteLine("✅ AppHost disposed");
// Aggressive cleanup with minimal timeout
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(2));

try
{
await AppHost.StopAsync(cts.Token);
await AppHost.DisposeAsync();
Console.WriteLine("✅ Infrastructure cleaned up");
}
catch (OperationCanceledException)
{
Console.WriteLine("✅ Cleanup timed out - runtime will handle remaining resources");
}
}
catch (Exception ex)
{
Console.WriteLine($"⚠️ Error disposing AppHost: {ex.Message}");
Console.WriteLine($"✅ Cleanup completed with: {ex.Message}");
}
}

Console.WriteLine("🌍 ========================================");
Console.WriteLine("🌍 GLOBAL INFRASTRUCTURE TEARDOWN COMPLETE");
Console.WriteLine("🌍 ========================================");
}

private static void ConfigureGatewayJarPath()
Expand Down
Loading