Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix condition for KubePortManagerSingleton init twice #16213

Merged
merged 6 commits into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public static synchronized KubePortManagerSingleton getInstance() {
public static synchronized void init(final Set<Integer> ports) {
if (instance == null) {
instance = new KubePortManagerSingleton(ports);
} else if (Sets.intersection(instance.getAllPorts(), ports).size() != ports.size()) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

here is what I really fix

} else if (Sets.intersection(instance.getAllPorts(), ports).size() == ports.size()) {
LOGGER.info("Skipping initializing KubePortManagerSingleton since ports specified are the same.");
} else {
throw new RuntimeException("Cannot initialize twice with different ports!");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.when;
Expand Down Expand Up @@ -102,14 +103,31 @@ public void setup() throws Exception {
final WorkerConfigs workerConfigs = spy(new WorkerConfigs(new EnvConfigs()));
when(workerConfigs.getEnvMap()).thenReturn(Map.of("ENV_VAR_1", "ENV_VALUE_1"));

processFactory =
new KubeProcessFactory(
workerConfigs,
"default",
fabricClient,
heartbeatUrl,
getHost(),
false);
processFactory = new KubeProcessFactory(workerConfigs, "default", fabricClient, heartbeatUrl, getHost(), false);
}

@RetryingTest(3)
public void testInitKubePortManagerSingletonTwice() throws Exception {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@davinchia what I really add test here.

Others code change is coding-style formatter by google-style which recommends (https://github.com/airbytehq/airbyte/blob/master/docs/contributing-to-airbyte/code-style.md)

/**
* Test init KubePortManagerSingleton twice: 1. with same ports shoule succeed 2. with different
* port should fail
*
* Every test has been init firt times in BeforeAll with getOpenPorts(30)
*/

KubePortManagerSingleton originalKubePortManager = KubePortManagerSingleton.getInstance();

// init the second time with the same ports
final List<Integer> theSameOpenPorts = new ArrayList<>(getOpenPorts(30));
KubePortManagerSingleton.init(new HashSet<>(theSameOpenPorts.subList(1, theSameOpenPorts.size() - 1)));
assertEquals(originalKubePortManager, KubePortManagerSingleton.getInstance());

// init the second time with different ports
final List<Integer> differentOpenPorts = new ArrayList<>(getOpenPorts(32));
Exception exception = assertThrows(RuntimeException.class, () -> {
KubePortManagerSingleton.init(new HashSet<>(differentOpenPorts.subList(1, differentOpenPorts.size() - 1)));
});
assertTrue(exception.getMessage().contains("Cannot initialize twice with different ports!"));
}

/**
Expand Down Expand Up @@ -283,21 +301,14 @@ public void testDeletingPodImmediatelyAfterCompletion() throws Exception {
final var uuid = UUID.randomUUID();
final Process process = getProcess(Map.of("uuid", uuid.toString()), "sleep 1 && exit 10");

final var pod = fabricClient.pods().list().getItems().stream()
.filter(p -> p.getMetadata() != null && p.getMetadata().getLabels() != null)
final var pod = fabricClient.pods().list().getItems().stream().filter(p -> p.getMetadata() != null && p.getMetadata().getLabels() != null)
.filter(p -> p.getMetadata().getLabels().containsKey("uuid") && p.getMetadata().getLabels().get("uuid").equals(uuid.toString()))
.collect(Collectors.toList()).get(0);
final SharedIndexInformer<Pod> podInformer = fabricClient.pods()
.inNamespace(pod.getMetadata().getNamespace())
.withName(pod.getMetadata().getName())
.inform();
podInformer.addEventHandler(new ExitCodeWatcher(
pod.getMetadata().getName(),
pod.getMetadata().getNamespace(),
exitCode -> {
fabricClient.pods().delete(pod);
},
() -> {}));
final SharedIndexInformer<Pod> podInformer =
fabricClient.pods().inNamespace(pod.getMetadata().getNamespace()).withName(pod.getMetadata().getName()).inform();
podInformer.addEventHandler(new ExitCodeWatcher(pod.getMetadata().getName(), pod.getMetadata().getNamespace(), exitCode -> {
fabricClient.pods().delete(pod);
}, () -> {}));

process.waitFor();

Expand Down Expand Up @@ -342,14 +353,7 @@ public void testKillingWithoutHeartbeat() throws Exception {
final WorkerConfigs workerConfigs = spy(new WorkerConfigs(new EnvConfigs()));
when(workerConfigs.getEnvMap()).thenReturn(Map.of("ENV_VAR_1", "ENV_VALUE_1"));

processFactory =
new KubeProcessFactory(
workerConfigs,
"default",
fabricClient,
heartbeatUrl,
getHost(),
false);
processFactory = new KubeProcessFactory(workerConfigs, "default", fabricClient, heartbeatUrl, getHost(), false);

// start an infinite process
final var availablePortsBefore = KubePortManagerSingleton.getInstance().getNumAvailablePorts();
Expand Down Expand Up @@ -412,11 +416,7 @@ private static String getRandomFile(final int lines) {

private Process getProcess(final String entrypoint) throws WorkerException {
// these files aren't used for anything, it's just to check for exceptions when uploading
final var files = ImmutableMap.of(
"file0", "fixed str",
"file1", getRandomFile(1),
"file2", getRandomFile(100),
"file3", getRandomFile(1000));
final var files = ImmutableMap.of("file0", "fixed str", "file1", getRandomFile(1), "file2", getRandomFile(100), "file3", getRandomFile(1000));

return getProcess(entrypoint, files);
}
Expand All @@ -431,19 +431,8 @@ private Process getProcess(final String entrypoint, final Map<String, String> fi

private Process getProcess(final Map<String, String> customLabels, final String entrypoint, final Map<String, String> files)
throws WorkerException {
return processFactory.create(
"tester",
"some-id",
0,
Path.of("/tmp/job-root"),
"busybox:latest",
false,
files,
entrypoint,
DEFAULT_RESOURCE_REQUIREMENTS,
customLabels,
Collections.emptyMap(),
Collections.emptyMap());
return processFactory.create("tester", "some-id", 0, Path.of("/tmp/job-root"), "busybox:latest", false, files, entrypoint,
DEFAULT_RESOURCE_REQUIREMENTS, customLabels, Collections.emptyMap(), Collections.emptyMap());
}

private static Set<Integer> getOpenPorts(final int count) {
Expand Down