Skip to content

Commit

Permalink
Make sure the IpcClient recreates the server if the context creation …
Browse files Browse the repository at this point in the history
…fails, fixes #446
  • Loading branch information
gnodet committed Jul 6, 2021
1 parent fb1af1d commit f960351
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 57 deletions.
108 changes: 61 additions & 47 deletions sync/src/main/java/org/mvndaemon/mvnd/sync/IpcClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package org.mvndaemon.mvnd.sync;

import java.io.Closeable;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
Expand Down Expand Up @@ -111,43 +112,51 @@ Socket createClient() throws IOException {
int tmpport = ss.getLocalPort();
int rand = new Random().nextInt();

List<String> args = new ArrayList<>();
String javaHome = System.getenv("JAVA_HOME");
if (javaHome == null) {
javaHome = System.getProperty("java.home");
}
boolean win = System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
String javaCmd = win ? "bin\\java.exe" : "bin/java";
String java = Paths.get(javaHome).resolve(javaCmd).toAbsolutePath().toString();
args.add(java);
String classpath;
String className = getClass().getName().replace('.', '/') + ".class";
String url = getClass().getClassLoader().getResource(className).toString();
if (url.startsWith("jar:")) {
classpath = url.substring("jar:".length(), url.indexOf("!/"));
} else if (url.startsWith("file:")) {
classpath = url.substring("file:".length(), url.indexOf(className));
String noFork = System.getProperty(IpcServer.NO_FORK_PROP);
Closeable close;
if (Boolean.parseBoolean(noFork)) {
IpcServer server = IpcServer.runServer(tmpport, rand);
close = server::close;
} else {
throw new IllegalStateException();
}
args.add("-cp");
args.add(classpath);
String timeout = System.getProperty(IpcServer.IDLE_TIMEOUT_PROP);
if (timeout != null) {
args.add("-D" + IpcServer.IDLE_TIMEOUT_PROP + "=" + timeout);
List<String> args = new ArrayList<>();
String javaHome = System.getenv("JAVA_HOME");
if (javaHome == null) {
javaHome = System.getProperty("java.home");
}
boolean win = System.getProperty("os.name").toLowerCase(Locale.ROOT).contains("win");
String javaCmd = win ? "bin\\java.exe" : "bin/java";
String java = Paths.get(javaHome).resolve(javaCmd).toAbsolutePath().toString();
args.add(java);
String classpath;
String className = getClass().getName().replace('.', '/') + ".class";
String url = getClass().getClassLoader().getResource(className).toString();
if (url.startsWith("jar:")) {
classpath = url.substring("jar:".length(), url.indexOf("!/"));
} else if (url.startsWith("file:")) {
classpath = url.substring("file:".length(), url.indexOf(className));
} else {
throw new IllegalStateException();
}
args.add("-cp");
args.add(classpath);
String timeout = System.getProperty(IpcServer.IDLE_TIMEOUT_PROP);
if (timeout != null) {
args.add("-D" + IpcServer.IDLE_TIMEOUT_PROP + "=" + timeout);
}
args.add(IpcServer.class.getName());
args.add(Integer.toString(tmpport));
args.add(Integer.toString(rand));
ProcessBuilder processBuilder = new ProcessBuilder();
ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(new File(win ? "NUL" : "/dev/null"));
discard = ProcessBuilder.Redirect.INHERIT;
Process process = processBuilder
.directory(lockFile.getParent().toFile())
.command(args)
.redirectOutput(discard)
.redirectError(discard)
.start();
close = process::destroyForcibly;
}
args.add(IpcServer.class.getName());
args.add(Integer.toString(tmpport));
args.add(Integer.toString(rand));
ProcessBuilder processBuilder = new ProcessBuilder();
ProcessBuilder.Redirect discard = ProcessBuilder.Redirect.to(new File(win ? "NUL" : "/dev/null"));
discard = ProcessBuilder.Redirect.INHERIT;
Process process = processBuilder
.directory(lockFile.getParent().toFile())
.command(args)
.redirectOutput(discard)
.redirectError(discard)
.start();

ExecutorService es = Executors.newSingleThreadExecutor();
Future<int[]> future = es.submit(() -> {
Expand All @@ -161,14 +170,14 @@ Socket createClient() throws IOException {
try {
res = future.get(5, TimeUnit.SECONDS);
} catch (Exception e) {
process.destroyForcibly();
close.close();
throw e;
} finally {
es.shutdownNow();
ss.close();
}
if (rand != res[0]) {
process.destroyForcibly();
close.close();
throw new IllegalStateException("IpcServer did not respond with the correct random");
}

Expand Down Expand Up @@ -255,17 +264,21 @@ synchronized void close(Throwable e) {
}

String newContext(boolean shared) {
try {
List<String> response = send(Arrays.asList(
REQUEST_CONTEXT, Boolean.toString(shared)));
if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
throw new IOException("Unexpected response: " + response);
RuntimeException error = new RuntimeException("Unable to create new sync context");
for (int i = 0; i < 2; i++) {
try {
List<String> response = send(Arrays.asList(
REQUEST_CONTEXT, Boolean.toString(shared)));
if (response.size() != 2 || !RESPONSE_CONTEXT.equals(response.get(0))) {
throw new IOException("Unexpected response: " + response);
}
return response.get(1);
} catch (Exception e) {
close(e);
error.addSuppressed(e);
}
return response.get(1);
} catch (Exception e) {
close(e);
throw new RuntimeException("Unable to create new sync context", e);
}
throw error;
}

void lock(String contextId, Collection<String> keys) {
Expand Down Expand Up @@ -299,7 +312,8 @@ void unlock(String contextId) {
@Override
public String toString() {
return "IpcClient{"
+ "repository=" + repository
+ "repository=" + repository + ','
+ "port=" + (socket != null ? socket.getPort() : 0)
+ '}';
}
}
51 changes: 44 additions & 7 deletions sync/src/main/java/org/mvndaemon/mvnd/sync/IpcServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.net.ServerSocket;
import java.net.Socket;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Locale;
Expand All @@ -46,12 +47,14 @@
*/
public class IpcServer {

public static final String NO_FORK_PROP = "ipcsync.nofork";

public static final String IDLE_TIMEOUT_PROP = "ipcsync.idle.timeout";

static final long IDLE_TIMEOUT = TimeUnit.SECONDS.toNanos(60);

private final ServerSocket serverSocket;
private final AtomicInteger clients = new AtomicInteger();
private final Map<Socket, Thread> clients = new HashMap<>();
private final AtomicInteger counter = new AtomicInteger();
private final Map<String, Lock> locks = new ConcurrentHashMap<>();
private final Map<String, Context> contexts = new ConcurrentHashMap<>();
Expand All @@ -66,8 +69,13 @@ public IpcServer() throws IOException {
String str = System.getProperty(IDLE_TIMEOUT_PROP);
if (str != null) {
try {
TimeUnit unit = TimeUnit.SECONDS;
if (str.endsWith("ms")) {
unit = TimeUnit.MILLISECONDS;
str = str.substring(0, str.length() - 2);
}
long dur = Long.parseLong(str);
timeout = TimeUnit.SECONDS.toNanos(dur);
timeout = unit.toNanos(dur);
} catch (NumberFormatException e) {
error("Property " + IDLE_TIMEOUT_PROP + " specified with invalid value: " + str, e);
}
Expand Down Expand Up @@ -96,6 +104,10 @@ public static void main(String[] args) throws Exception {
int tmpPort = Integer.parseInt(args[0]);
int rand = Integer.parseInt(args[1]);

runServer(tmpPort, rand);
}

static IpcServer runServer(int tmpPort, int rand) throws IOException {
IpcServer server = new IpcServer();
run(server::run);
int port = server.getPort();
Expand All @@ -108,6 +120,8 @@ public static void main(String[] args) throws Exception {
dos.flush();
}
}

return server;
}

private static void debug(String msg, Object... args) {
Expand Down Expand Up @@ -149,7 +163,11 @@ public void run() {
}

private void client(Socket socket) {
int c = clients.incrementAndGet();
int c;
synchronized (clients) {
clients.put(socket, Thread.currentThread());
c = clients.size();
}
info("New client connected (%d connected)", c);
use();
Map<String, Context> clientContexts = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -240,9 +258,13 @@ private void client(Socket socket) {
}
}
} catch (Throwable t) {
error("Error processing request", t);
if (!closing) {
error("Error processing request", t);
}
} finally {
info("Client disconnecting...");
if (!closing) {
info("Client disconnecting...");
}
clientContexts.values().forEach(context -> {
contexts.remove(context.id);
context.unlock();
Expand All @@ -252,7 +274,13 @@ private void client(Socket socket) {
} catch (IOException ioException) {
// ignore
}
info("%d clients left", clients.decrementAndGet());
synchronized (clients) {
clients.remove(socket);
c = clients.size();
}
if (!closing) {
info("%d clients left", c);
}
}
}

Expand All @@ -264,19 +292,28 @@ private void expirationCheck() {
while (true) {
long current = System.nanoTime();
if (current - lastUsed > idleTimeout) {
info("IpcServer expired, closing");
close();
break;
}
}
}

private void close() {
void close() {
closing = true;
try {
serverSocket.close();
} catch (IOException e) {
error("Error closing server socket", e);
}
clients.forEach((s, t) -> {
try {
s.close();
} catch (IOException e) {
// ignore
}
t.interrupt();
});
}

static class Waiter {
Expand Down
40 changes: 37 additions & 3 deletions sync/src/test/java/org/mvndaemon/mvnd/sync/IpcSyncContextTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class IpcSyncContextTest {

private static final Logger LOGGER = LoggerFactory.getLogger(IpcSyncContextTest.class);

@BeforeAll
static void setup() {
System.setProperty(IpcServer.IDLE_TIMEOUT_PROP, "5");
Expand Down Expand Up @@ -73,15 +77,15 @@ public void testContext() throws Exception {
for (int i = 0; i < threads.length; i++) {
threads[i] = new Thread(() -> {
try (SyncContext context = factory.newInstance(session, false)) {
System.out.println("Trying to lock from " + context);
LOGGER.info("Trying to lock from {}", context);
context.acquire(Collections.singleton(artifact), null);
System.out.println("Lock acquired from " + context);
LOGGER.info("Lock acquired from {}", context);
try {
Thread.sleep(50);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Unlock from " + context);
LOGGER.info("Unlock from {}", context);
}
});
threads[i].start();
Expand All @@ -91,4 +95,34 @@ public void testContext() throws Exception {
thread.join();
}
}

@Test
void testTimeoutAndConnect() throws Exception {
System.setProperty(IpcServer.IDLE_TIMEOUT_PROP, "50ms");
System.setProperty(IpcServer.NO_FORK_PROP, "true");
try {

SyncContextFactory factory = new IpcSyncContextFactory();

DefaultRepositorySystemSession session = new DefaultRepositorySystemSession();
LocalRepository repository = new LocalRepository(new File("target/test-repo"));
LocalRepositoryManager localRepositoryManager = new SimpleLocalRepositoryManagerFactory()
.newInstance(session, repository);
session.setLocalRepositoryManager(localRepositoryManager);
Artifact artifact = new DefaultArtifact("myGroup", "myArtifact", "jar", "0.1");

for (int i = 0; i < 10; i++) {
LOGGER.info("[client] Creating sync context");
try (SyncContext context = factory.newInstance(session, false)) {
LOGGER.info("[client] Sync context created: {}", context.toString());
context.acquire(Collections.singleton(artifact), null);
}
LOGGER.info("[client] Sync context closed");
Thread.sleep(100);
}
} finally {
System.clearProperty(IpcServer.IDLE_TIMEOUT_PROP);
System.clearProperty(IpcServer.NO_FORK_PROP);
}
}
}

0 comments on commit f960351

Please sign in to comment.