diff --git a/build.sc b/build.sc index a645d6a5626..fd19d6796d7 100644 --- a/build.sc +++ b/build.sc @@ -762,6 +762,9 @@ object main extends MillStableScalaModule with BuildInfo { } } + object server extends MillPublishScalaModule { + def moduleDeps = Seq(client, api) + } object graphviz extends MillPublishScalaModule { def moduleDeps = Seq(main, scalalib) def ivyDeps = Agg(Deps.graphvizJava, Deps.jgraphtCore) @@ -1434,7 +1437,7 @@ def launcherScript( cmdClassPath: Agg[String] ) = { - val millMainClass = "mill.main.client.MillClientMain" + val millMainClass = "mill.runner.client.MillClientMain" Jvm.universalScript( shellCommands = { @@ -1545,7 +1548,14 @@ def launcherScript( } object runner extends MillPublishScalaModule { - def moduleDeps = Seq(scalalib, scalajslib, scalanativelib, bsp, linenumbers, main.codesig) + object client extends MillPublishJavaModule{ + def buildInfoPackageName = "mill.runner.client" + def moduleDeps = Seq(main.client) + } + + def moduleDeps = Seq( + scalalib, scalajslib, scalanativelib, bsp, linenumbers, main.codesig, main.server, client + ) def skipPreviousVersions: T[Seq[String]] = Seq("0.11.0-M7") object linenumbers extends MillPublishScalaModule { diff --git a/main/client/src/mill/main/client/FileToStreamTailer.java b/main/client/src/mill/main/client/FileToStreamTailer.java index f401f062ecb..9847f5db6d0 100644 --- a/main/client/src/mill/main/client/FileToStreamTailer.java +++ b/main/client/src/mill/main/client/FileToStreamTailer.java @@ -99,6 +99,7 @@ public void flush() { @Override public void close() throws Exception { + flush(); interrupt(); } } diff --git a/main/client/src/mill/main/client/InputPumper.java b/main/client/src/mill/main/client/InputPumper.java index cbd5ecf460d..1bddc0bc656 100644 --- a/main/client/src/mill/main/client/InputPumper.java +++ b/main/client/src/mill/main/client/InputPumper.java @@ -38,7 +38,12 @@ public void run() { } else if (checkAvailable && src.available() == 0) Thread.sleep(2); else { - int n = src.read(buffer); + int n; + try{ + n = src.read(buffer); + } catch (Exception e){ + n = -1; + } if (n == -1) { running = false; } diff --git a/main/client/src/mill/main/client/MillServerCouldNotBeStarted.java b/main/client/src/mill/main/client/MillServerCouldNotBeStarted.java deleted file mode 100644 index b5b3bcfb6bb..00000000000 --- a/main/client/src/mill/main/client/MillServerCouldNotBeStarted.java +++ /dev/null @@ -1,7 +0,0 @@ -package mill.main.client; - -public class MillServerCouldNotBeStarted extends Exception { - public MillServerCouldNotBeStarted(String msg) { - super(msg); - } -} diff --git a/main/client/src/mill/main/client/MillServerLauncher.java b/main/client/src/mill/main/client/MillServerLauncher.java deleted file mode 100644 index a788844e079..00000000000 --- a/main/client/src/mill/main/client/MillServerLauncher.java +++ /dev/null @@ -1,174 +0,0 @@ -package mill.main.client; - -import mill.main.client.lock.Locks; -import mill.main.client.lock.TryLocked; -import static mill.main.client.OutFiles.*; - -import org.newsclub.net.unix.AFUNIXSocket; -import org.newsclub.net.unix.AFUNIXSocketAddress; - -import java.io.File; -import java.io.FileOutputStream; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.file.Files; -import java.nio.file.Paths; -import java.util.Map; - -public class MillServerLauncher { - final static int tailerRefreshIntervalMillis = 2; - final static int maxLockAttempts = 3; - public static int runMain(String[] args) throws Exception { - - final boolean setJnaNoSys = System.getProperty("jna.nosys") == null; - if (setJnaNoSys) { - System.setProperty("jna.nosys", "true"); - } - - final String versionAndJvmHomeEncoding = Util.sha1Hash(BuildInfo.millVersion + System.getProperty("java.home")); - final int serverProcessesLimit = getServerProcessesLimit(versionAndJvmHomeEncoding); - - int serverIndex = 0; - while (serverIndex < serverProcessesLimit) { // Try each possible server process (-1 to -5) - serverIndex++; - final String lockBase = out + "/" + millWorker + versionAndJvmHomeEncoding + "-" + serverIndex; - java.io.File lockBaseFile = new java.io.File(lockBase); - lockBaseFile.mkdirs(); - - int lockAttempts = 0; - while (lockAttempts < maxLockAttempts) { // Try to lock a particular server - try ( - Locks locks = Locks.files(lockBase); - TryLocked clientLock = locks.clientLock.tryLock() - ) { - if (clientLock != null) { - return runMillServer(args, lockBase, setJnaNoSys, locks); - } - } catch (Exception e) { - for (File file : lockBaseFile.listFiles()) file.delete(); - } finally { - lockAttempts++; - } - } - } - throw new MillServerCouldNotBeStarted("Reached max server processes limit: " + serverProcessesLimit); - } - - static int runMillServer(String[] args, - String lockBase, - boolean setJnaNoSys, - Locks locks) throws Exception { - final File stdout = new java.io.File(lockBase + "/" + ServerFiles.stdout); - final File stderr = new java.io.File(lockBase + "/" + ServerFiles.stderr); - - try( - final FileToStreamTailer stdoutTailer = new FileToStreamTailer(stdout, System.out, tailerRefreshIntervalMillis); - final FileToStreamTailer stderrTailer = new FileToStreamTailer(stderr, System.err, tailerRefreshIntervalMillis); - ) { - stdoutTailer.start(); - stderrTailer.start(); - final int exitCode = run( - lockBase, - () -> { - try { - MillLauncher.launchMillServer(lockBase, setJnaNoSys); - } catch (Exception e) { - throw new RuntimeException(e); - } - }, - locks, - System.in, - System.out, - System.err, - args, - System.getenv()); - - // Here, we ensure we process the tails of the output files before interrupting - // the threads - stdoutTailer.flush(); - stderrTailer.flush(); - - return exitCode; - } - } - - // 5 processes max - private static int getServerProcessesLimit(String jvmHomeEncoding) { - File outFolder = new File(out); - String[] totalProcesses = outFolder.list((dir, name) -> name.startsWith(millWorker)); - String[] thisJdkProcesses = outFolder.list((dir, name) -> name.startsWith(millWorker + jvmHomeEncoding)); - - int processLimit = 5; - - if (thisJdkProcesses != null) { - processLimit -= Math.min(totalProcesses.length - thisJdkProcesses.length, 5); - } else if (totalProcesses != null) { - processLimit -= Math.min(totalProcesses.length, 5); - } - - return processLimit; - } - public static int run( - String lockBase, - Runnable initServer, - Locks locks, - InputStream stdin, - OutputStream stdout, - OutputStream stderr, - String[] args, - Map env) throws Exception { - - try (FileOutputStream f = new FileOutputStream(lockBase + "/" + ServerFiles.runArgs)) { - f.write(System.console() != null ? 1 : 0); - Util.writeString(f, BuildInfo.millVersion); - Util.writeArgs(args, f); - Util.writeMap(env, f); - } - - if (locks.processLock.probe()) initServer.run(); - - while (locks.processLock.probe()) Thread.sleep(3); - - String socketName = ServerFiles.pipe(lockBase); - AFUNIXSocketAddress addr = AFUNIXSocketAddress.of(new File(socketName)); - - long retryStart = System.currentTimeMillis(); - Socket ioSocket = null; - Throwable socketThrowable = null; - while (ioSocket == null && System.currentTimeMillis() - retryStart < 1000) { - try { - ioSocket = AFUNIXSocket.connectTo(addr); - } catch (Throwable e) { - socketThrowable = e; - Thread.sleep(1); - } - } - - if (ioSocket == null) { - throw new Exception("Failed to connect to server", socketThrowable); - } - - InputStream outErr = ioSocket.getInputStream(); - OutputStream in = ioSocket.getOutputStream(); - ProxyStream.Pumper outPumper = new ProxyStream.Pumper(outErr, stdout, stderr); - InputPumper inPump = new InputPumper(() -> stdin, () -> in, true); - Thread outPumperThread = new Thread(outPumper, "outPump"); - outPumperThread.setDaemon(true); - Thread inThread = new Thread(inPump, "inPump"); - inThread.setDaemon(true); - outPumperThread.start(); - inThread.start(); - - outPumperThread.join(); - - try { - return Integer.parseInt(Files.readAllLines(Paths.get(lockBase + "/" + ServerFiles.exitCode)).get(0)); - } catch (Throwable e) { - return Util.ExitClientCodeCannotReadFromExitCodeFile(); - } finally { - ioSocket.close(); - } - } - -} diff --git a/main/client/src/mill/main/client/ServerCouldNotBeStarted.java b/main/client/src/mill/main/client/ServerCouldNotBeStarted.java new file mode 100644 index 00000000000..908dbce62f3 --- /dev/null +++ b/main/client/src/mill/main/client/ServerCouldNotBeStarted.java @@ -0,0 +1,7 @@ +package mill.main.client; + +public class ServerCouldNotBeStarted extends Exception { + public ServerCouldNotBeStarted(String msg) { + super(msg); + } +} diff --git a/main/client/src/mill/main/client/ServerFiles.java b/main/client/src/mill/main/client/ServerFiles.java index d4ee7d11d48..208237ae6e6 100644 --- a/main/client/src/mill/main/client/ServerFiles.java +++ b/main/client/src/mill/main/client/ServerFiles.java @@ -5,6 +5,7 @@ * and documentation about what they do */ public class ServerFiles { + final public static String serverId = "serverId"; final public static String sandbox = "sandbox"; /** @@ -30,7 +31,7 @@ public class ServerFiles { */ public static String pipe(String base) { try { - return base + "/mill-" + Util.md5hex(new java.io.File(base).getCanonicalPath()) + "-io"; + return base + "/mill-" + Util.md5hex(new java.io.File(base).getCanonicalPath()).substring(0, 8) + "-io"; }catch (Exception e){ throw new RuntimeException(e); } diff --git a/main/client/src/mill/main/client/ServerLauncher.java b/main/client/src/mill/main/client/ServerLauncher.java new file mode 100644 index 00000000000..58fbb2ee1e2 --- /dev/null +++ b/main/client/src/mill/main/client/ServerLauncher.java @@ -0,0 +1,189 @@ +package mill.main.client; + +import mill.main.client.lock.Locks; +import mill.main.client.lock.TryLocked; +import static mill.main.client.OutFiles.*; + +import org.newsclub.net.unix.AFUNIXSocket; +import org.newsclub.net.unix.AFUNIXSocketAddress; + +import java.io.File; +import java.io.FileOutputStream; +import java.io.InputStream; +import java.io.OutputStream; +import java.io.PrintStream; +import java.net.Socket; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.util.Map; +import java.util.function.BiConsumer; + +/** + * Client side code that interacts with `Server.scala` in order to launch a generic + * long lived background server. + * + * The protocol is as follows: + * + * - Client: + * - Take clientLock + * - If processLock is not yet taken, it means server is not running, so spawn a server + * - Wait for server socket to be available for connection + * - Server: + * - Take processLock. + * - If already taken, it means another server was running + * (e.g. spawned by a different client) so exit immediately + * - Server: loop: + * - Listen for incoming client requests on serverSocket + * - Execute client request + * - If clientLock is released during execution, terminate server (otherwise + * we have no safe way of termianting the in-process request, so the server + * may continue running for arbitrarily long with no client attached) + * - Send `ProxyStream.END` packet and call `clientSocket.close()` + * - Client: + * - Wait for `ProxyStream.END` packet or `clientSocket.close()`, + * indicating server has finished execution and all data has been received + */ +public abstract class ServerLauncher { + public static class Result{ + public int exitCode; + public String serverDir; + } + final static int tailerRefreshIntervalMillis = 2; + final int serverProcessesLimit = 5; + final int serverInitWaitMillis = 10000; + public abstract void initServer(String serverDir, boolean b, Locks locks) throws Exception; + InputStream stdin; + PrintStream stdout; + PrintStream stderr; + Map env; + String[] args; + Locks[] memoryLocks; + int forceFailureForTestingMillisDelay; + public ServerLauncher(InputStream stdin, + PrintStream stdout, + PrintStream stderr, + Map env, + String[] args, + Locks[] memoryLocks, + int forceFailureForTestingMillisDelay){ + this.stdin = stdin; + this.stdout = stdout; + this.stderr = stderr; + this.env = env; + this.args = args; + + // For testing in memory, we need to pass in the locks separately, so that the + // locks can be shared between the different instances of `ServerLauncher` the + // same way file locks are shared between different Mill client/secrer processes + this.memoryLocks = memoryLocks; + + this.forceFailureForTestingMillisDelay = forceFailureForTestingMillisDelay; + } + public Result acquireLocksAndRun(String outDir) throws Exception { + + final boolean setJnaNoSys = System.getProperty("jna.nosys") == null; + if (setJnaNoSys) { + System.setProperty("jna.nosys", "true"); + } + + final String versionAndJvmHomeEncoding = Util.sha1Hash(BuildInfo.millVersion + System.getProperty("java.home")); + + + int serverIndex = 0; + while (serverIndex < serverProcessesLimit) { // Try each possible server process (-1 to -5) + serverIndex++; + final String serverDir = outDir + "/" + millWorker + versionAndJvmHomeEncoding + "-" + serverIndex; + java.io.File serverDirFile = new java.io.File(serverDir); + serverDirFile.mkdirs(); + + try ( + Locks locks = memoryLocks != null ? memoryLocks[serverIndex-1] : Locks.files(serverDir); + TryLocked clientLocked = locks.clientLock.tryLock() + ) { + if (clientLocked.isLocked()) { + Result result = new Result(); + result.exitCode = run(serverDir, setJnaNoSys, locks); + result.serverDir = serverDir; + return result; + } + } + } + throw new ServerCouldNotBeStarted("Reached max server processes limit: " + serverProcessesLimit); + } + + int run(String serverDir, boolean setJnaNoSys, Locks locks) throws Exception { + try( + final FileToStreamTailer stdoutTailer = new FileToStreamTailer( + new java.io.File(serverDir + "/" + ServerFiles.stdout), + stdout, + tailerRefreshIntervalMillis + ); + final FileToStreamTailer stderrTailer = new FileToStreamTailer( + new java.io.File(serverDir + "/" + ServerFiles.stderr), + stderr, + tailerRefreshIntervalMillis + ); + ) { + stdoutTailer.start(); + stderrTailer.start(); + try (FileOutputStream f = new FileOutputStream(serverDir + "/" + ServerFiles.runArgs)) { + f.write(System.console() != null ? 1 : 0); + Util.writeString(f, BuildInfo.millVersion); + Util.writeArgs(args, f); + Util.writeMap(env, f); + } + + if (locks.processLock.probe()) { + initServer(serverDir, setJnaNoSys, locks); + } + + while (locks.processLock.probe()) Thread.sleep(3); + + String socketName = ServerFiles.pipe(serverDir); + AFUNIXSocketAddress addr = AFUNIXSocketAddress.of(new File(socketName)); + + long retryStart = System.currentTimeMillis(); + Socket ioSocket = null; + Throwable socketThrowable = null; + while (ioSocket == null && System.currentTimeMillis() - retryStart < serverInitWaitMillis) { + try { + ioSocket = AFUNIXSocket.connectTo(addr); + } catch (Throwable e) { + socketThrowable = e; + Thread.sleep(10); + } + } + + if (ioSocket == null) { + throw new Exception("Failed to connect to server", socketThrowable); + } + + InputStream outErr = ioSocket.getInputStream(); + OutputStream in = ioSocket.getOutputStream(); + ProxyStream.Pumper outPumper = new ProxyStream.Pumper(outErr, stdout, stderr); + InputPumper inPump = new InputPumper(() -> stdin, () -> in, true); + Thread outPumperThread = new Thread(outPumper, "outPump"); + outPumperThread.setDaemon(true); + Thread inThread = new Thread(inPump, "inPump"); + inThread.setDaemon(true); + outPumperThread.start(); + inThread.start(); + + if (forceFailureForTestingMillisDelay > 0){ + Thread.sleep(forceFailureForTestingMillisDelay); + throw new Exception("Force failure for testing: " + serverDir); + } + outPumperThread.join(); + + try { + return Integer.parseInt(Files.readAllLines(Paths.get(serverDir + "/" + ServerFiles.exitCode)).get(0)); + } catch (Throwable e) { + return Util.ExitClientCodeCannotReadFromExitCodeFile(); + } finally { + ioSocket.close(); + } + } + + } + +} diff --git a/main/client/src/mill/main/client/Util.java b/main/client/src/mill/main/client/Util.java index 2dcb69c514f..c432d70df21 100644 --- a/main/client/src/mill/main/client/Util.java +++ b/main/client/src/mill/main/client/Util.java @@ -3,7 +3,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; -import java.io.UnsupportedEncodingException; +import java.io.File; +import java.io.FileNotFoundException; import java.math.BigInteger; import java.nio.charset.Charset; import java.nio.charset.StandardCharsets; @@ -12,6 +13,9 @@ import java.util.Base64; import java.util.HashMap; import java.util.Map; +import java.util.List; +import java.util.LinkedList; +import java.util.Scanner; public class Util { // use methods instead of constants to avoid inlining by compiler @@ -128,4 +132,27 @@ static String sha1Hash(String path) throws NoSuchAlgorithmException { return Base64.getEncoder().encodeToString(digest); } + /** + * Reads a file, ignoring empty or comment lines + * + * @return The non-empty lines of the files or an empty list, if the file does not exists + */ + public static List readOptsFileLines(final File file) { + final List vmOptions = new LinkedList<>(); + try ( + final Scanner sc = new Scanner(file) + ) { + while (sc.hasNextLine()) { + String arg = sc.nextLine(); + String trimmed = arg.trim(); + if (!trimmed.isEmpty() && !trimmed.startsWith("#")) { + vmOptions.add(arg); + } + } + } catch (FileNotFoundException e) { + // ignored + } + return vmOptions; + } + } diff --git a/main/client/src/mill/main/client/lock/FileLock.java b/main/client/src/mill/main/client/lock/FileLock.java index a405bc03c13..454b8794bd7 100644 --- a/main/client/src/mill/main/client/lock/FileLock.java +++ b/main/client/src/mill/main/client/lock/FileLock.java @@ -35,4 +35,6 @@ public void close() throws Exception { chan.close(); raf.close(); } + + public void delete() throws Exception { close(); } } diff --git a/main/client/src/mill/main/client/lock/Lock.java b/main/client/src/mill/main/client/lock/Lock.java index 72674cb42c8..6d729c0ebd6 100644 --- a/main/client/src/mill/main/client/lock/Lock.java +++ b/main/client/src/mill/main/client/lock/Lock.java @@ -14,4 +14,5 @@ public void await() throws Exception { * Returns `true` if the lock is *available for taking* */ public abstract boolean probe() throws Exception; + public void delete() throws Exception {} } diff --git a/main/client/src/mill/main/client/lock/Locks.java b/main/client/src/mill/main/client/lock/Locks.java index 86f55ed8496..b026fc17393 100644 --- a/main/client/src/mill/main/client/lock/Locks.java +++ b/main/client/src/mill/main/client/lock/Locks.java @@ -2,29 +2,7 @@ import mill.main.client.ServerFiles; -/** - * The locks used to manage the relationship of Mill between Mill's clients and servers. - * The protocol is as follows: - * - * - Client: - * - Take clientLock - * - If processLock is not yet taken, it means server is not running, so spawn a server - * - Wait for server socket to be available for connection - * - Server: - * - Take processLock. - * - If already taken, it means another server was running - * (e.g. spawned by a different client) so exit immediately - * - Server: loop: - * - Listen for incoming client requests on serverSocket - * - Execute client request - * - If clientLock is released during execution, terminate server (otherwise - * we have no safe way of termianting the in-process request, so the server - * may continue running for arbitrarily long with no client attached) - * - Send `ProxyStream.END` packet and call `clientSocket.close()` - * - Client: - * - Wait for `ProxyStream.END` packet or `clientSocket.close()`, - * indicating server has finished execution and all data has been received - */ + final public class Locks implements AutoCloseable { final public Lock clientLock; @@ -35,10 +13,10 @@ public Locks(Lock clientLock, Lock processLock){ this.processLock = processLock; } - public static Locks files(String lockBase) throws Exception { + public static Locks files(String serverDir) throws Exception { return new Locks( - new FileLock(lockBase + "/" + ServerFiles.clientLock), - new FileLock(lockBase + "/" + ServerFiles.processLock) + new FileLock(serverDir + "/" + ServerFiles.clientLock), + new FileLock(serverDir + "/" + ServerFiles.processLock) ); } @@ -51,7 +29,7 @@ public static Locks memory() { @Override public void close() throws Exception { - clientLock.close(); - processLock.close(); + clientLock.delete(); + processLock.delete(); } } diff --git a/main/client/test/src/mill/main/client/MillEnvTests.java b/main/client/test/src/mill/main/client/MillEnvTests.java index f4af16e3966..bcb19366142 100644 --- a/main/client/test/src/mill/main/client/MillEnvTests.java +++ b/main/client/test/src/mill/main/client/MillEnvTests.java @@ -17,7 +17,7 @@ protected void initTests() { File file = new File( getClass().getClassLoader().getResource("file-wo-final-newline.txt").toURI() ); - List lines = MillLauncher.readOptsFileLines(file); + List lines = Util.readOptsFileLines(file); expectEquals( lines, Arrays.asList( diff --git a/main/server/src/mill/main/server/Server.scala b/main/server/src/mill/main/server/Server.scala new file mode 100644 index 00000000000..588b1533dac --- /dev/null +++ b/main/server/src/mill/main/server/Server.scala @@ -0,0 +1,267 @@ +package mill.main.server + +import java.io._ +import java.net.Socket +import scala.jdk.CollectionConverters._ +import org.newsclub.net.unix.AFUNIXServerSocket +import org.newsclub.net.unix.AFUNIXSocketAddress +import mill.main.client._ +import mill.api.SystemStreams +import mill.main.client.ProxyStream.Output +import mill.main.client.lock.{Lock, Locks} + +import scala.util.Try + +/** + * Models a long-lived server that receives requests from a client and calls a [[main0]] + * method to run the commands in-process. Provides the command args, env variables, + * JVM properties, wrapped input/output streams, and other metadata related to the + * client command + */ +abstract class Server[T]( + serverDir: os.Path, + acceptTimeoutMillis: Int, + locks: Locks +) { + + @volatile var running = true + def exitServer(): Unit = running = false + var stateCache = stateCache0 + def stateCache0: T + + val serverId: String = java.lang.Long.toHexString(scala.util.Random.nextLong()) + def serverLog0(s: String): Unit = { + if (running) { + os.write.append(serverDir / ServerFiles.serverLog, s"$s\n", createFolders = true) + } + } + + def serverLog(s: String): Unit = serverLog0(s"$serverId $s") + + def run(): Unit = { + serverLog("running server in " + serverDir) + val initialSystemProperties = sys.props.toMap + + try Server.tryLockBlock(locks.processLock) { + watchServerIdFile() + + while ( + running && { + val serverSocket = bindSocket() + try + interruptWithTimeout(() => serverSocket.close(), () => serverSocket.accept()) match { + case None => false + case Some(sock) => + serverLog("handling run") + try handleRun(sock, initialSystemProperties) + catch { + case e: Throwable => serverLog(e + "\n" + e.getStackTrace.mkString("\n")) + } finally sock.close(); + true + } + finally serverSocket.close() + } + ) () + + }.getOrElse(throw new Exception("Mill server process already present")) + finally exitServer() + } + + def bindSocket(): AFUNIXServerSocket = { + val socketPath = os.Path(ServerFiles.pipe(serverDir.toString())) + os.remove.all(socketPath) + + val relFile = socketPath.relativeTo(os.pwd).toNIO.toFile + serverLog("listening on socket " + relFile) + // Use relative path because otherwise the full path might be too long for the socket API + val addr = AFUNIXSocketAddress.of(relFile) + AFUNIXServerSocket.bindOn(addr) + } + + def proxyInputStreamThroughPumper(in: InputStream): PipedInputStream = { + val pipedInput = new PipedInputStream() + val pipedOutput = new PipedOutputStream() + pipedOutput.connect(pipedInput) + val pumper = new InputPumper(() => in, () => pipedOutput, false) + val pumperThread = new Thread(pumper, "proxyInputStreamThroughPumper") + pumperThread.setDaemon(true) + pumperThread.start() + pipedInput + } + + def watchServerIdFile(): Unit = { + os.write.over(serverDir / ServerFiles.serverId, serverId) + val serverIdThread = new Thread( + () => + while ( + running && { + Thread.sleep(100) + Try(os.read(serverDir / ServerFiles.serverId)).toOption match { + case None => + serverLog("serverId file missing") + exitServer() + false + case Some(s) => + if (s == serverId) true + else { + serverLog(s"serverId file contents $s does not match serverId $serverId") + exitServer() + false + } + } + } + ) (), + "Server ID Checker Thread" + ) + serverIdThread.start() + } + + def interruptWithTimeout[T](close: () => Unit, t: () => T): Option[T] = { + @volatile var interrupt = true + @volatile var interrupted = false + val thread = new Thread( + () => { + try Thread.sleep(acceptTimeoutMillis) + catch { case t: InterruptedException => /* Do Nothing */ } + if (interrupt) { + interrupted = true + serverLog(s"Interrupting after ${acceptTimeoutMillis}ms") + close() + } + }, + "MillSocketTimeoutInterruptThread" + ) + + thread.start() + try { + val res = + try Some(t()) + catch { case e: Throwable => None } + + if (interrupted) None + else res + + } finally { + thread.interrupt() + interrupt = false + } + } + + def handleRun(clientSocket: Socket, initialSystemProperties: Map[String, String]): Unit = { + + val currentOutErr = clientSocket.getOutputStream + try { + val stdout = new PrintStream(new Output(currentOutErr, ProxyStream.OUT), true) + val stderr = new PrintStream(new Output(currentOutErr, ProxyStream.ERR), true) + + // Proxy the input stream through a pair of Piped**putStream via a pumper, + // as the `UnixDomainSocketInputStream` we get directly from the socket does + // not properly implement `available(): Int` and thus messes up polling logic + // that relies on that method + val proxiedSocketInput = proxyInputStreamThroughPumper(clientSocket.getInputStream) + + val argStream = os.read.inputStream(serverDir / ServerFiles.runArgs) + val interactive = argStream.read() != 0 + val clientMillVersion = Util.readString(argStream) + val serverMillVersion = BuildInfo.millVersion + if (clientMillVersion != serverMillVersion) { + stderr.println( + s"Mill version changed ($serverMillVersion -> $clientMillVersion), re-starting server" + ) + os.write( + serverDir / ServerFiles.exitCode, + Util.ExitServerCodeWhenVersionMismatch().toString.getBytes() + ) + System.exit(Util.ExitServerCodeWhenVersionMismatch()) + } + val args = Util.parseArgs(argStream) + val env = Util.parseMap(argStream) + serverLog("args " + upickle.default.write(args)) + serverLog("env " + upickle.default.write(env.asScala)) + val userSpecifiedProperties = Util.parseMap(argStream) + argStream.close() + + @volatile var done = false + @volatile var idle = false + val t = new Thread( + () => + try { + val (result, newStateCache) = main0( + args, + stateCache, + interactive, + new SystemStreams(stdout, stderr, proxiedSocketInput), + env.asScala.toMap, + idle = _, + userSpecifiedProperties.asScala.toMap, + initialSystemProperties + ) + + stateCache = newStateCache + serverLog("exitCode " + ServerFiles.exitCode) + os.write.over(serverDir / ServerFiles.exitCode, if (result) "0" else "1") + } finally { + done = true + idle = true + }, + "MillServerActionRunner" + ) + t.start() + // We cannot simply use Lock#await here, because the filesystem doesn't + // realize the clientLock/serverLock are held by different threads in the + // two processes and gives a spurious deadlock error + while (!done && !locks.clientLock.probe()) Thread.sleep(3) + + if (!idle) { + serverLog("client interrupted while server was executing command") + exitServer() + } + + t.interrupt() + // Try to give thread a moment to stop before we kill it for real + Thread.sleep(5) + try t.stop() + catch { + case e: UnsupportedOperationException => + // nothing we can do about, removed in Java 20 + case e: java.lang.Error if e.getMessage.contains("Cleaner terminated abnormally") => + // ignore this error and do nothing; seems benign + } + + // flush before closing the socket + System.out.flush() + System.err.flush() + + } finally ProxyStream.sendEnd(currentOutErr) // Send a termination + } + + def main0( + args: Array[String], + stateCache: T, + mainInteractive: Boolean, + streams: SystemStreams, + env: Map[String, String], + setIdle: Boolean => Unit, + userSpecifiedProperties: Map[String, String], + initialSystemProperties: Map[String, String] + ): (Boolean, T) + +} + +object Server { + + def lockBlock[T](lock: Lock)(t: => T): T = { + val l = lock.lock() + try t + finally l.release() + } + + def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = { + lock.tryLock() match { + case null => None + case l => + try Some(t) + finally l.release() + } + } +} diff --git a/main/server/test/src/mill/main/server/ClientServerTests.scala b/main/server/test/src/mill/main/server/ClientServerTests.scala new file mode 100644 index 00000000000..b36215f5a7b --- /dev/null +++ b/main/server/test/src/mill/main/server/ClientServerTests.scala @@ -0,0 +1,269 @@ +package mill.main.server + +import java.io._ +import mill.main.client.Util +import mill.main.client.ServerFiles +import mill.main.client.lock.Locks +import mill.api.SystemStreams + +import scala.jdk.CollectionConverters._ +import utest._ + +/** + * Exercises the client-server logic in memory, using in-memory locks + * and in-memory clients and servers + */ +object ClientServerTests extends TestSuite { + + val ENDL = System.lineSeparator() + class EchoServer(override val serverId: String, serverDir: os.Path, locks: Locks) + extends Server[Option[Int]](serverDir, 1000, locks) with Runnable { + override def exitServer() = { + serverLog("exiting server") + super.exitServer() + } + def stateCache0 = None + + override def serverLog0(s: String) = { + println(s) + super.serverLog0(s) + } + + def main0( + args: Array[String], + stateCache: Option[Int], + mainInteractive: Boolean, + streams: SystemStreams, + env: Map[String, String], + setIdle: Boolean => Unit, + systemProperties: Map[String, String], + initialSystemProperties: Map[String, String] + ) = { + + val reader = new BufferedReader(new InputStreamReader(streams.in)) + val str = reader.readLine() + Thread.sleep(200) + if (args.nonEmpty) { + streams.out.println(str + args(0)) + } + env.toSeq.sortBy(_._1).foreach { + case (key, value) => streams.out.println(s"$key=$value") + } + systemProperties.toSeq.sortBy(_._1).foreach { + case (key, value) => streams.out.println(s"$key=$value") + } + if (args.nonEmpty) { + streams.err.println(str.toUpperCase + args(0)) + } + streams.out.flush() + streams.err.flush() + (true, None) + } + } + + class Tester { + + var nextServerId: Int = 0 + val terminatedServers = collection.mutable.Set.empty[String] + val dest = os.pwd / "out" + os.makeDir.all(dest) + val outDir = os.temp.dir(dest, deleteOnExit = false) + + val memoryLocks = Array.fill(10)(Locks.memory()); + + def apply( + env: Map[String, String] = Map(), + args: Array[String] = Array(), + forceFailureForTestingMillisDelay: Int = -1 + ) = { + val in = new ByteArrayInputStream(s"hello$ENDL".getBytes()) + val out = new ByteArrayOutputStream() + val err = new ByteArrayOutputStream() + val result = new mill.main.client.ServerLauncher( + in, + new PrintStream(out), + new PrintStream(err), + env.asJava, + args, + memoryLocks, + forceFailureForTestingMillisDelay + ) { + def initServer(serverDir: String, b: Boolean, locks: Locks) = { + val serverId = "server-" + nextServerId + nextServerId += 1 + new Thread(new EchoServer(serverId, os.Path(serverDir, os.pwd), locks)).start() + } + }.acquireLocksAndRun(outDir.relativeTo(os.pwd).toString) + + ClientResult( + result.exitCode, + os.Path(result.serverDir, os.pwd), + outDir, + out.toString, + err.toString + ) + } + + } + + case class ClientResult( + exitCode: Int, + serverDir: os.Path, + outDir: os.Path, + out: String, + err: String + ) { + def logsFor(suffix: String) = { + os.read + .lines(serverDir / ServerFiles.serverLog) + .collect { case s if s.endsWith(" " + suffix) => s.dropRight(1 + suffix.length) } + } + } + + def tests = Tests { + val tester = new Tester + "hello" - { + + val res1 = tester(args = Array("world")) + + assert( + res1.out == s"helloworld$ENDL", + res1.err == s"HELLOworld$ENDL" + ) + + // A second client in sequence connect to the same server + val res2 = tester(args = Array(" WORLD")) + + assert( + res2.out == s"hello WORLD$ENDL", + res2.err == s"HELLO WORLD$ENDL" + ) + + if (!Util.isWindows) { + // Make sure the server times out of not used for a while + Thread.sleep(2000) + + assert(res2.logsFor("Interrupting after 1000ms") == Seq("server-0")) + assert(res2.logsFor("exiting server") == Seq("server-0")) + + // Have a third client spawn/connect-to a new server at the same path + val res3 = tester(args = Array(" World")) + assert( + res3.out == s"hello World$ENDL", + res3.err == s"HELLO World$ENDL" + ) + + // Make sure if we delete the out dir, the server notices and exits + os.remove.all(res3.outDir) + Thread.sleep(500) + + assert(res3.logsFor("serverId file missing") == Seq("server-1")) + assert(res3.logsFor("exiting server") == Seq("server-1")) + } + } + + "concurrency" - { + // Make sure concurrently running client commands results in multiple processes + // being spawned, running in different folders + import concurrent._ + import concurrent.ExecutionContext.Implicits.global + val f1 = Future(tester(args = Array(" World"))) + val f2 = Future(tester(args = Array(" WORLD"))) + val f3 = Future(tester(args = Array(" wOrLd"))) + val resF1 = Await.result(f1, duration.Duration.Inf) + val resF2 = Await.result(f2, duration.Duration.Inf) + val resF3 = Await.result(f3, duration.Duration.Inf) + + // Mutiple server processes live in same out folder + assert(resF1.outDir == resF2.outDir) + assert(resF2.outDir == resF3.outDir) + // but the serverDir is placed in different subfolders + assert(resF1.serverDir != resF2.serverDir) + assert(resF2.serverDir != resF3.serverDir) + + assert(resF1.out == s"hello World$ENDL") + assert(resF2.out == s"hello WORLD$ENDL") + assert(resF3.out == s"hello wOrLd$ENDL") + } + + "clientLockReleasedOnFailure" - { + // When the client gets interrupted via Ctrl-C, we exit the server immediately. This + // is because Mill ends up executing arbitrary JVM code, and there is no generic way + // to interrupt such an execution. The two options are to leave the server running + // for an unbounded duration, or kill the server process and take a performance hit + // on the next cold startup. Mill chooses the second option. + import concurrent._ + import concurrent.ExecutionContext.Implicits.global + val res1 = intercept[Exception] { + tester.apply(args = Array(" World"), forceFailureForTestingMillisDelay = 100) + } + + val s"Force failure for testing: $pathStr" = res1.getMessage + val logLines = os.read.lines(os.Path(pathStr, os.pwd) / "server.log") + + assert( + logLines.takeRight(2) == + Seq( + "server-0 client interrupted while server was executing command", + "server-0 exiting server" + ) + ) + } + + "envVars" - retry(3) { + // Make sure the simple "have the client start a server and + // exchange one message" workflow works from end to end. + + def longString(s: String) = Array.fill(1000)(s).mkString + val b1000 = longString("b") + val c1000 = longString("c") + val a1000 = longString("a") + + val env = Map( + "a" -> a1000, + "b" -> b1000, + "c" -> c1000 + ) + + val res1 = tester(env = env) + val expected = s"a=$a1000${ENDL}b=$b1000${ENDL}c=$c1000$ENDL" + + assert( + res1.out == expected, + res1.err == "" + ) + + val path = List( + "/Users/foo/Library/Haskell/bin", + "/usr/local/git/bin", + "/sw/bin/", + "/usr/local/bin", + "/usr/local/", + "/usr/local/sbin", + "/usr/local/mysql/bin", + "/usr/local/bin", + "/usr/bin", + "/bin", + "/usr/sbin", + "/sbin", + "/opt/X11/bin", + "/usr/local/MacGPG2/bin", + "/Library/TeX/texbin", + "/usr/local/bin/", + "/Users/foo/bin", + "/Users/foo/go/bin", + "~/.bloop" + ) + + val pathEnvVar = path.mkString(":") + val res2 = tester(env = Map("PATH" -> pathEnvVar)) + + val expected2 = s"PATH=$pathEnvVar$ENDL" + + assert( + res2.out == expected2, + res2.err == "" + ) + } + } +} diff --git a/main/client/src/mill/main/client/MillClientMain.java b/runner/client/src/mill/runner/client/MillClientMain.java similarity index 69% rename from main/client/src/mill/main/client/MillClientMain.java rename to runner/client/src/mill/runner/client/MillClientMain.java index ea0d2965602..d22b87a1274 100644 --- a/main/client/src/mill/main/client/MillClientMain.java +++ b/runner/client/src/mill/runner/client/MillClientMain.java @@ -1,16 +1,19 @@ -package mill.main.client; +package mill.runner.client; -import java.security.NoSuchAlgorithmException; import java.util.Arrays; +import java.util.function.BiConsumer; +import mill.main.client.ServerLauncher; +import mill.main.client.ServerFiles; +import mill.main.client.Util; +import mill.main.client.lock.Locks; +import mill.main.client.OutFiles; +import mill.main.client.ServerCouldNotBeStarted; /** * This is a Java implementation to speed up repetitive starts. * A Scala implementation would result in the JVM loading much more classes almost doubling the start-up times. */ public class MillClientMain { - - - public static void main(String[] args) throws Exception { boolean runNoServer = false; if (args.length > 0) { @@ -33,12 +36,17 @@ public static void main(String[] args) throws Exception { MillNoServerLauncher.runMain(args); } else try { // start in client-server mode - int exitCode = MillServerLauncher.runMain(args); + ServerLauncher launcher = new ServerLauncher(System.in, System.out, System.err, System.getenv(), args, null, -1){ + public void initServer(String serverDir, boolean setJnaNoSys, Locks locks) throws Exception{ + MillProcessLauncher.launchMillServer(serverDir, setJnaNoSys); + } + }; + int exitCode = launcher.acquireLocksAndRun(OutFiles.out).exitCode; if (exitCode == Util.ExitServerCodeWhenVersionMismatch()) { - exitCode = MillServerLauncher.runMain(args); + exitCode = launcher.acquireLocksAndRun(OutFiles.out).exitCode; } System.exit(exitCode); - } catch (MillServerCouldNotBeStarted e) { + } catch (ServerCouldNotBeStarted e) { // TODO: try to run in-process System.err.println("Could not start a Mill server process.\n" + "This could be caused by too many already running Mill instances " + @@ -53,4 +61,5 @@ public static void main(String[] args) throws Exception { } } } + } diff --git a/main/client/src/mill/main/client/MillNoServerLauncher.java b/runner/client/src/mill/runner/client/MillNoServerLauncher.java similarity index 94% rename from main/client/src/mill/main/client/MillNoServerLauncher.java rename to runner/client/src/mill/runner/client/MillNoServerLauncher.java index 99d69e3630e..b07655b8873 100644 --- a/main/client/src/mill/main/client/MillNoServerLauncher.java +++ b/runner/client/src/mill/runner/client/MillNoServerLauncher.java @@ -1,4 +1,4 @@ -package mill.main.client; +package mill.runner.client; import java.lang.reflect.Method; import java.util.Optional; @@ -44,7 +44,7 @@ public static LoadResult load() { public static void runMain(String[] args) throws Exception { LoadResult loadResult = load(); if (loadResult.millMainMethod.isPresent()) { - int exitVal = MillLauncher.launchMillNoServer(args); + int exitVal = MillProcessLauncher.launchMillNoServer(args); System.exit(exitVal); } else { throw new RuntimeException("Cannot load mill.runner.MillMain class"); diff --git a/main/client/src/mill/main/client/MillLauncher.java b/runner/client/src/mill/runner/client/MillProcessLauncher.java similarity index 77% rename from main/client/src/mill/main/client/MillLauncher.java rename to runner/client/src/mill/runner/client/MillProcessLauncher.java index 0364f1c9720..3245ee78a76 100644 --- a/main/client/src/mill/main/client/MillLauncher.java +++ b/runner/client/src/mill/runner/client/MillProcessLauncher.java @@ -1,4 +1,4 @@ -package mill.main.client; +package mill.runner.client; import static mill.main.client.OutFiles.*; import java.io.File; @@ -6,8 +6,11 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.util.*; +import mill.main.client.Util; +import mill.main.client.ServerFiles; +import mill.main.client.ServerCouldNotBeStarted; -public class MillLauncher { +public class MillProcessLauncher { static int launchMillNoServer(String[] args) throws Exception { boolean setJnaNoSys = System.getProperty("jna.nosys") == null; @@ -23,27 +26,27 @@ static int launchMillNoServer(String[] args) throws Exception { return configureRunMillProcess(builder, out + "/" + millNoServer).waitFor(); } - static void launchMillServer(String lockBase, boolean setJnaNoSys) throws Exception { + static void launchMillServer(String serverDir, boolean setJnaNoSys) throws Exception { List l = new ArrayList<>(); l.addAll(millLaunchJvmCommand(setJnaNoSys)); l.add("mill.runner.MillServerMain"); - l.add(new File(lockBase).getCanonicalPath()); + l.add(new File(serverDir).getCanonicalPath()); - File stdout = new java.io.File(lockBase + "/" + ServerFiles.stdout); - File stderr = new java.io.File(lockBase + "/" + ServerFiles.stderr); + File stdout = new java.io.File(serverDir + "/" + ServerFiles.stdout); + File stderr = new java.io.File(serverDir + "/" + ServerFiles.stderr); ProcessBuilder builder = new ProcessBuilder() .command(l) .redirectOutput(stdout) .redirectError(stderr); - configureRunMillProcess(builder, lockBase + "/" + ServerFiles.sandbox); + configureRunMillProcess(builder, serverDir + "/" + ServerFiles.sandbox); } static Process configureRunMillProcess(ProcessBuilder builder, - String lockBase) throws Exception { + String serverDir) throws Exception { builder.environment().put("MILL_WORKSPACE_ROOT", new File("").getCanonicalPath()); - File sandbox = new java.io.File(lockBase + "/" + ServerFiles.sandbox); + File sandbox = new java.io.File(serverDir + "/" + ServerFiles.sandbox); sandbox.mkdirs(); // builder.directory(sandbox); return builder.start(); @@ -146,7 +149,7 @@ static List millLaunchJvmCommand(boolean setJnaNoSys) { // extra opts File millJvmOptsFile = millJvmOptsFile(); if (millJvmOptsFile.exists()) { - vmOptions.addAll(readOptsFileLines(millJvmOptsFile)); + vmOptions.addAll(Util.readOptsFileLines(millJvmOptsFile)); } vmOptions.add("-cp"); @@ -156,30 +159,6 @@ static List millLaunchJvmCommand(boolean setJnaNoSys) { } static List readMillJvmOpts() { - return readOptsFileLines(millJvmOptsFile()); + return Util.readOptsFileLines(millJvmOptsFile()); } - - /** - * Reads a file, ignoring empty or comment lines - * - * @return The non-empty lines of the files or an empty list, if the file does not exists - */ - static List readOptsFileLines(final File file) { - final List vmOptions = new LinkedList<>(); - try ( - final Scanner sc = new Scanner(file) - ) { - while (sc.hasNextLine()) { - String arg = sc.nextLine(); - String trimmed = arg.trim(); - if (!trimmed.isEmpty() && !trimmed.startsWith("#")) { - vmOptions.add(arg); - } - } - } catch (FileNotFoundException e) { - // ignored - } - return vmOptions; - } - } diff --git a/runner/src/mill/runner/MillServerMain.scala b/runner/src/mill/runner/MillServerMain.scala index 103558e74eb..164a3eb3b0a 100644 --- a/runner/src/mill/runner/MillServerMain.scala +++ b/runner/src/mill/runner/MillServerMain.scala @@ -2,38 +2,12 @@ package mill.runner import sun.misc.{Signal, SignalHandler} -import java.io._ -import java.net.Socket -import scala.jdk.CollectionConverters._ -import org.newsclub.net.unix.AFUNIXServerSocket -import org.newsclub.net.unix.AFUNIXSocketAddress -import mill.main.BuildInfo import mill.main.client._ -import mill.api.{SystemStreams, internal} -import mill.main.client.ProxyStream.Output -import mill.main.client.lock.{Lock, Locks} - +import mill.api.SystemStreams +import mill.main.client.lock.Locks import scala.util.Try -@internal -trait MillServerMain[T] { - def stateCache0: T - var stateCache = stateCache0 - def main0( - args: Array[String], - stateCache: T, - mainInteractive: Boolean, - streams: SystemStreams, - env: Map[String, String], - setIdle: Boolean => Unit, - systemProperties: Map[String, String], - initialSystemProperties: Map[String, String] - ): (Boolean, T) -} - -@internal -object MillServerMain extends MillServerMain[RunnerState] { - def stateCache0 = RunnerState.empty +object MillServerMain { def main(args0: Array[String]): Unit = { // Disable SIGINT interrupt signal in the Mill server. // @@ -52,14 +26,27 @@ object MillServerMain extends MillServerMain[RunnerState] { val acceptTimeoutMillis = Try(System.getProperty("mill.server_timeout").toInt).getOrElse(5 * 60 * 1000) // 5 minutes - new Server( - lockBase = os.Path(args0(0)), - this, - () => System.exit(Util.ExitServerCodeWhenIdle()), + new MillServerMain( + serverDir = os.Path(args0(0)), acceptTimeoutMillis = acceptTimeoutMillis, Locks.files(args0(0)) ).run() } +} +class MillServerMain( + serverDir: os.Path, + acceptTimeoutMillis: Int, + locks: Locks +) extends mill.main.server.Server[RunnerState]( + serverDir, + acceptTimeoutMillis, + locks + ) { + + override def exitServer(): Unit = { + super.exitServer(); System.exit(Util.ExitServerCodeWhenIdle()) + } + def stateCache0 = RunnerState.empty def main0( args: Array[String], @@ -85,192 +72,3 @@ object MillServerMain extends MillServerMain[RunnerState] { catch MillMain.handleMillException(streams.err, stateCache) } } - -class Server[T]( - lockBase: os.Path, - sm: MillServerMain[T], - interruptServer: () => Unit, - acceptTimeoutMillis: Int, - locks: Locks -) { - - val originalStdout = System.out - def run(): Unit = { - val initialSystemProperties = sys.props.toMap - Server.tryLockBlock(locks.processLock) { - var running = true - while (running) { - - val socketPath = os.Path(ServerFiles.pipe(lockBase.toString())) - - os.remove.all(socketPath) - - // Use relative path because otherwise the full path might be too long for the socket API - val addr = - AFUNIXSocketAddress.of(socketPath.relativeTo(os.pwd).toNIO.toFile) - val serverSocket = AFUNIXServerSocket.bindOn(addr) - val socketClose = () => serverSocket.close() - - val sockOpt = Server.interruptWith( - "MillSocketTimeoutInterruptThread", - acceptTimeoutMillis, - socketClose(), - serverSocket.accept() - ) - - sockOpt match { - case None => running = false - case Some(sock) => - try { - try handleRun(sock, initialSystemProperties) - catch { case e: Throwable => e.printStackTrace(originalStdout) } - finally sock.close(); - } finally serverSocket.close() - } - } - }.getOrElse(throw new Exception("PID already present")) - } - - def proxyInputStreamThroughPumper(in: InputStream): PipedInputStream = { - val pipedInput = new PipedInputStream() - val pipedOutput = new PipedOutputStream() - pipedOutput.connect(pipedInput) - val pumper = new InputPumper(() => in, () => pipedOutput, false) - val pumperThread = new Thread(pumper, "proxyInputStreamThroughPumper") - pumperThread.setDaemon(true) - pumperThread.start() - pipedInput - } - def handleRun(clientSocket: Socket, initialSystemProperties: Map[String, String]): Unit = { - - val currentOutErr = clientSocket.getOutputStream - try { - val stdout = new PrintStream(new Output(currentOutErr, ProxyStream.OUT), true) - val stderr = new PrintStream(new Output(currentOutErr, ProxyStream.ERR), true) - - // Proxy the input stream through a pair of Piped**putStream via a pumper, - // as the `UnixDomainSocketInputStream` we get directly from the socket does - // not properly implement `available(): Int` and thus messes up polling logic - // that relies on that method - val proxiedSocketInput = proxyInputStreamThroughPumper(clientSocket.getInputStream) - - val argStream = os.read.inputStream(lockBase / ServerFiles.runArgs) - val interactive = argStream.read() != 0 - val clientMillVersion = Util.readString(argStream) - val serverMillVersion = BuildInfo.millVersion - if (clientMillVersion != serverMillVersion) { - stderr.println( - s"Mill version changed ($serverMillVersion -> $clientMillVersion), re-starting server" - ) - os.write( - lockBase / ServerFiles.exitCode, - Util.ExitServerCodeWhenVersionMismatch().toString.getBytes() - ) - System.exit(Util.ExitServerCodeWhenVersionMismatch()) - } - val args = Util.parseArgs(argStream) - val env = Util.parseMap(argStream) - val userSpecifiedProperties = Util.parseMap(argStream) - argStream.close() - - @volatile var done = false - @volatile var idle = false - val t = new Thread( - () => - try { - val (result, newStateCache) = sm.main0( - args, - sm.stateCache, - interactive, - new SystemStreams(stdout, stderr, proxiedSocketInput), - env.asScala.toMap, - idle = _, - userSpecifiedProperties.asScala.toMap, - initialSystemProperties - ) - - sm.stateCache = newStateCache - os.write.over( - lockBase / ServerFiles.exitCode, - (if (result) 0 else 1).toString.getBytes() - ) - } finally { - done = true - idle = true - }, - "MillServerActionRunner" - ) - t.start() - // We cannot simply use Lock#await here, because the filesystem doesn't - // realize the clientLock/serverLock are held by different threads in the - // two processes and gives a spurious deadlock error - while (!done && !locks.clientLock.probe()) Thread.sleep(3) - - if (!idle) interruptServer() - - t.interrupt() - // Try to give thread a moment to stop before we kill it for real - Thread.sleep(5) - try t.stop() - catch { - case e: UnsupportedOperationException => - // nothing we can do about, removed in Java 20 - case e: java.lang.Error if e.getMessage.contains("Cleaner terminated abnormally") => - // ignore this error and do nothing; seems benign - } - - // flush before closing the socket - System.out.flush() - System.err.flush() - - } finally ProxyStream.sendEnd(currentOutErr) // Send a termination - } -} - -object Server { - - def lockBlock[T](lock: Lock)(t: => T): T = { - val l = lock.lock() - try t - finally l.release() - } - - def tryLockBlock[T](lock: Lock)(t: => T): Option[T] = { - lock.tryLock() match { - case null => None - case l => - try Some(t) - finally l.release() - } - } - - def interruptWith[T](threadName: String, millis: Int, close: => Unit, t: => T): Option[T] = { - @volatile var interrupt = true - @volatile var interrupted = false - val thread = new Thread( - () => { - try Thread.sleep(millis) - catch { case t: InterruptedException => /* Do Nothing */ } - if (interrupt) { - interrupted = true - close - } - }, - threadName - ) - - thread.start() - try { - val res = - try Some(t) - catch { case e: Throwable => None } - - if (interrupted) None - else res - - } finally { - thread.interrupt() - interrupt = false - } - } -} diff --git a/runner/test/src/mill/runner/ClientServerTests.scala b/runner/test/src/mill/runner/ClientServerTests.scala deleted file mode 100644 index 5eb24f6a765..00000000000 --- a/runner/test/src/mill/runner/ClientServerTests.scala +++ /dev/null @@ -1,222 +0,0 @@ -package mill.runner - -import java.io._ -import mill.main.client.Util -import mill.main.client.lock.Locks -import mill.api.SystemStreams - -import scala.jdk.CollectionConverters._ -import utest._ - -class EchoServer extends MillServerMain[Option[Int]] { - def stateCache0 = None - def main0( - args: Array[String], - stateCache: Option[Int], - mainInteractive: Boolean, - streams: SystemStreams, - env: Map[String, String], - setIdle: Boolean => Unit, - systemProperties: Map[String, String], - initialSystemProperties: Map[String, String] - ) = { - - val reader = new BufferedReader(new InputStreamReader(streams.in)) - val str = reader.readLine() - if (args.nonEmpty) { - streams.out.println(str + args(0)) - } - env.toSeq.sortBy(_._1).foreach { - case (key, value) => streams.out.println(s"$key=$value") - } - systemProperties.toSeq.sortBy(_._1).foreach { - case (key, value) => streams.out.println(s"$key=$value") - } - if (args.nonEmpty) { - streams.err.println(str.toUpperCase + args(0)) - } - streams.out.flush() - streams.err.flush() - (true, None) - } -} - -object ClientServerTests extends TestSuite { - - val ENDL = System.lineSeparator() - - def initStreams() = { - val in = new ByteArrayInputStream(s"hello${ENDL}".getBytes()) - val out = new ByteArrayOutputStream() - val err = new ByteArrayOutputStream() - (in, out, err) - } - def init() = { - val tmpDir = os.temp.dir() - val locks = Locks.memory() - - (tmpDir, locks) - } - - def spawnEchoServer(tmpDir: os.Path, locks: Locks): Unit = { - new Thread(() => - new Server( - tmpDir, - new EchoServer(), - () => (), - 1000, - locks - ).run() - ).start() - } - - def runClientAux( - tmpDir: os.Path, - locks: Locks - )(env: Map[String, String], args: Array[String]) = { - val (in, out, err) = initStreams() - Server.lockBlock(locks.clientLock) { - mill.main.client.MillServerLauncher.run( - tmpDir.toString, - () => spawnEchoServer(tmpDir, locks), - locks, - in, - out, - err, - args, - env.asJava - ) - Thread.sleep(200) - (new String(out.toByteArray), new String(err.toByteArray)) - } - } - - def tests = Tests { - "hello" - { - val (tmpDir, locks) = init() - def runClient(s: String) = runClientAux(tmpDir, locks)(Map.empty, Array(s)) - - // Make sure the simple "have the client start a server and - // exchange one message" workflow works from end to end. - - assert( - locks.clientLock.probe(), - locks.processLock.probe() - ) - - val (out1, err1) = runClient("world") - - assert( - out1 == s"helloworld${ENDL}", - err1 == s"HELLOworld${ENDL}" - ) - - // Give a bit of time for the server to release the lock and - // re-acquire it to signal to the client that it"s" done - Thread.sleep(100) - - assert( - locks.clientLock.probe(), - !locks.processLock.probe() - ) - - // A seecond client in sequence connect to the same server - val (out2, err2) = runClient(" WORLD") - - assert( - out2 == s"hello WORLD${ENDL}", - err2 == s"HELLO WORLD${ENDL}" - ) - - if (!Util.isWindows) { - // Make sure the server times out of not used for a while - Thread.sleep(2000) - assert( - locks.clientLock.probe(), - locks.processLock.probe() - ) - - // Have a third client spawn/connect-to a new server at the same path - val (out3, err3) = runClient(" World") - assert( - out3 == s"hello World${ENDL}", - err3 == s"HELLO World${ENDL}" - ) - } - - "envVars" - retry(3) { - val (tmpDir, locks) = init() - - def runClient(env: Map[String, String]) = runClientAux(tmpDir, locks)(env, Array()) - - // Make sure the simple "have the client start a server and - // exchange one message" workflow works from end to end. - - assert( - locks.clientLock.probe(), - locks.processLock.probe() - ) - - def longString(s: String) = Array.fill(1000)(s).mkString - val b1000 = longString("b") - val c1000 = longString("c") - val a1000 = longString("a") - - val env = Map( - "a" -> a1000, - "b" -> b1000, - "c" -> c1000 - ) - - val (out1, err1) = runClient(env) - val expected = s"a=$a1000${ENDL}b=$b1000${ENDL}c=$c1000${ENDL}" - - assert( - out1 == expected, - err1 == "" - ) - - // Give a bit of time for the server to release the lock and - // re-acquire it to signal to the client that it's done - Thread.sleep(100) - - assert( - locks.clientLock.probe(), - !locks.processLock.probe() - ) - - val path = List( - "/Users/foo/Library/Haskell/bin", - "/usr/local/git/bin", - "/sw/bin/", - "/usr/local/bin", - "/usr/local/", - "/usr/local/sbin", - "/usr/local/mysql/bin", - "/usr/local/bin", - "/usr/bin", - "/bin", - "/usr/sbin", - "/sbin", - "/opt/X11/bin", - "/usr/local/MacGPG2/bin", - "/Library/TeX/texbin", - "/usr/local/bin/", - "/Users/foo/bin", - "/Users/foo/go/bin", - "~/.bloop" - ) - - val pathEnvVar = path.mkString(":") - val (out2, err2) = runClient(Map("PATH" -> pathEnvVar)) - - val expected2 = s"PATH=$pathEnvVar${ENDL}" - - assert( - out2 == expected2, - err2 == "" - ) - } - } - } -}