diff --git a/src/Build.UnitTests/BackEnd/NodeEndpointInProc_Tests.cs b/src/Build.UnitTests/BackEnd/NodeEndpointInProc_Tests.cs index 5a1eb10715b..fd8178dcadd 100644 --- a/src/Build.UnitTests/BackEnd/NodeEndpointInProc_Tests.cs +++ b/src/Build.UnitTests/BackEnd/NodeEndpointInProc_Tests.cs @@ -101,7 +101,7 @@ public void UnregisterPacketHandler(NodePacketType packetType) throw new NotImplementedException(); } - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { throw new NotImplementedException(); } diff --git a/src/Build/BackEnd/Client/MSBuildClient.cs b/src/Build/BackEnd/Client/MSBuildClient.cs index 0abf86a2aa2..5cceeb22d66 100644 --- a/src/Build/BackEnd/Client/MSBuildClient.cs +++ b/src/Build/BackEnd/Client/MSBuildClient.cs @@ -7,7 +7,6 @@ using System.Diagnostics; using System.Globalization; using System.IO; -using System.IO.Pipes; using System.Threading; using Microsoft.Build.BackEnd; using Microsoft.Build.BackEnd.Client; @@ -75,19 +74,9 @@ public sealed class MSBuildClient private readonly string _pipeName; /// - /// The named pipe stream for client-server communication. + /// The named pipe client for client-server communication. /// - private NamedPipeClientStream _nodeStream = null!; - - /// - /// A way to cache a byte array when writing out packets - /// - private readonly MemoryStream _packetMemoryStream; - - /// - /// A binary writer to help write into - /// - private readonly BinaryWriter _binaryWriter; + private NodePipeClient _pipeClient = null!; /// /// Used to estimate the size of the build with an ETW trace. @@ -130,26 +119,14 @@ public MSBuildClient( // Client <-> Server communication stream _handshake = GetHandshake(); _pipeName = OutOfProcServerNode.GetPipeName(_handshake); - _packetMemoryStream = new MemoryStream(); - _binaryWriter = new BinaryWriter(_packetMemoryStream); CreateNodePipeStream(); } private void CreateNodePipeStream() { -#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter - _nodeStream = new NamedPipeClientStream( - serverName: ".", - _pipeName, - PipeDirection.InOut, - PipeOptions.Asynchronous -#if FEATURE_PIPEOPTIONS_CURRENTUSERONLY - | PipeOptions.CurrentUserOnly -#endif - ); -#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter - _packetPump = new MSBuildClientPacketPump(_nodeStream); + _pipeClient = new NodePipeClient(_pipeName, _handshake); + _packetPump = new MSBuildClientPacketPump(_pipeClient); } /// @@ -423,7 +400,7 @@ private bool TrySendPacket(Func packetResolver) try { packet = packetResolver(); - WritePacket(_nodeStream, packet); + _pipeClient.WritePacket(packet); CommunicationsUtilities.Trace("Command packet of type '{0}' sent...", packet.Type); } catch (Exception ex) @@ -621,7 +598,7 @@ private bool TryConnectToServer(int timeoutMilliseconds) tryAgain = false; try { - NodeProviderOutOfProcBase.ConnectToPipeStream(_nodeStream, _pipeName, _handshake, Math.Max(1, timeoutMilliseconds - (int)sw.ElapsedMilliseconds)); + _pipeClient.ConnectToServer(Math.Max(1, timeoutMilliseconds - (int)sw.ElapsedMilliseconds)); } catch (Exception ex) { @@ -644,30 +621,5 @@ private bool TryConnectToServer(int timeoutMilliseconds) return true; } - - private void WritePacket(Stream nodeStream, INodePacket packet) - { - MemoryStream memoryStream = _packetMemoryStream; - memoryStream.SetLength(0); - - ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(memoryStream); - - // Write header - memoryStream.WriteByte((byte)packet.Type); - - // Pad for packet length - _binaryWriter.Write(0); - - // Reset the position in the write buffer. - packet.Translate(writeTranslator); - - int packetStreamLength = (int)memoryStream.Position; - - // Now write in the actual packet length - memoryStream.Position = 1; - _binaryWriter.Write(packetStreamLength - 5); - - nodeStream.Write(memoryStream.GetBuffer(), 0, packetStreamLength); - } } } diff --git a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs index a7234030b5c..34ec9d28df1 100644 --- a/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs +++ b/src/Build/BackEnd/Client/MSBuildClientPacketPump.cs @@ -2,15 +2,11 @@ // The .NET Foundation licenses this file to you under the MIT license. using System; -using System.Buffers.Binary; using System.Collections.Concurrent; -using System.IO; using System.Threading; using Microsoft.Build.Internal; using Microsoft.Build.Shared; -#if !FEATURE_APM using System.Threading.Tasks; -#endif namespace Microsoft.Build.BackEnd.Client { @@ -46,25 +42,15 @@ internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketF /// private readonly NodePacketFactory _packetFactory; - /// - /// The memory stream for a read buffer. - /// - private readonly MemoryStream _readBufferMemoryStream; - /// /// The thread which runs the asynchronous packet pump /// private Thread? _packetPumpThread; /// - /// The stream from where to read packets. - /// - private readonly Stream _stream; - - /// - /// The binary translator for reading packets. + /// The pipe client from where to read packets. /// - private readonly ITranslator _binaryReadTranslator; + private readonly NodePipeClient _pipeClient; /// /// True if this side is gracefully disconnecting. @@ -73,11 +59,12 @@ internal sealed class MSBuildClientPacketPump : INodePacketHandler, INodePacketF /// private bool _isServerDisconnecting; - public MSBuildClientPacketPump(Stream stream) + public MSBuildClientPacketPump(NodePipeClient pipeClient) { - ErrorUtilities.VerifyThrowArgumentNull(stream); + ErrorUtilities.VerifyThrowArgumentNull(pipeClient); - _stream = stream; + _pipeClient = pipeClient; + _pipeClient.RegisterPacketFactory(this); _isServerDisconnecting = false; _packetFactory = new NodePacketFactory(); @@ -85,9 +72,6 @@ public MSBuildClientPacketPump(Stream stream) PacketReceivedEvent = new AutoResetEvent(false); PacketPumpCompleted = new ManualResetEvent(false); _packetPumpShutdownEvent = new ManualResetEvent(false); - - _readBufferMemoryStream = new MemoryStream(); - _binaryReadTranslator = BinaryTranslator.GetReadTranslator(_readBufferMemoryStream, InterningBinaryReader.CreateSharedBuffer()); } #region INodePacketFactory Members @@ -113,14 +97,13 @@ public void UnregisterPacketHandler(NodePacketType packetType) } /// - /// Deserializes and routes a packer to the appropriate handler. + /// Deserializes a packet. /// - /// The node from which the packet was received. /// The packet type. /// The translator to use as a source for packet data. - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { - _packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator); + return _packetFactory.DeserializePacket(packetType, translator); } /// @@ -182,21 +165,16 @@ public void Stop() /// private void PacketPumpProc() { - RunReadLoop(_stream, _packetPumpShutdownEvent); + RunReadLoop(_pipeClient, _packetPumpShutdownEvent); } - private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShutdownEvent) + private void RunReadLoop(NodePipeClient pipeClient, ManualResetEvent localPacketPumpShutdownEvent) { CommunicationsUtilities.Trace("Entering read loop."); try { - byte[] headerByte = new byte[5]; -#if FEATURE_APM - IAsyncResult result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null); -#else - Task readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length).AsTask(); -#endif + Task readTask = pipeClient.ReadPacketAsync(); bool continueReading = true; do @@ -208,11 +186,7 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu WaitHandle[] handles = [ localPacketPumpShutdownEvent, -#if FEATURE_APM - result.AsyncWaitHandle -#else ((IAsyncResult)readTask).AsyncWaitHandle -#endif ]; int waitId = WaitHandle.WaitAny(handles); switch (waitId) @@ -224,80 +198,27 @@ private void RunReadLoop(Stream localStream, ManualResetEvent localPacketPumpShu break; case 1: + INodePacket packet = readTask.GetAwaiter().GetResult(); + + if (packet.Type == NodePacketType.NodeShutdown) { - // Client recieved a packet header. Read the rest of it. - int headerBytesRead = 0; -#if FEATURE_APM - headerBytesRead = localStream.EndRead(result); -#else - headerBytesRead = readTask.Result; -#endif - - if ((headerBytesRead != headerByte.Length) && !localPacketPumpShutdownEvent.WaitOne(0)) + if (!_isServerDisconnecting) { - // Incomplete read. Abort. - if (headerBytesRead == 0) - { - if (_isServerDisconnecting) - { - continueReading = false; - break; - } - - ErrorUtilities.ThrowInternalError("Server disconnected abruptly"); - } - else - { - ErrorUtilities.ThrowInternalError("Incomplete header read. {0} of {1} bytes read", headerBytesRead, headerByte.Length); - } + ErrorUtilities.ThrowInternalError("Server disconnected abruptly."); } - NodePacketType packetType = (NodePacketType)Enum.ToObject(typeof(NodePacketType), headerByte[0]); - - int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span(headerByte, 1, 4)); - int packetBytesRead = 0; - - _readBufferMemoryStream.Position = 0; - _readBufferMemoryStream.SetLength(packetLength); - byte[] packetData = _readBufferMemoryStream.GetBuffer(); - - while (packetBytesRead < packetLength) - { - int bytesRead = localStream.Read(packetData, packetBytesRead, packetLength - packetBytesRead); - if (bytesRead == 0) - { - // Incomplete read. Abort. - ErrorUtilities.ThrowInternalError("Incomplete packet read. {0} of {1} bytes read", packetBytesRead, packetLength); - } - - packetBytesRead += bytesRead; - } + continueReading = false; + break; + } - try - { - _packetFactory.DeserializeAndRoutePacket(0, packetType, _binaryReadTranslator); - } - catch - { - // Error while deserializing or handling packet. Logging additional info. - CommunicationsUtilities.Trace("Packet factory failed to receive package. Exception while deserializing packet {0}.", packetType); - throw; - } + _packetFactory.RoutePacket(0, packet); - if (packetType == NodePacketType.ServerNodeBuildResult) - { - continueReading = false; - } - else - { - // Start reading the next package header. -#if FEATURE_APM - result = localStream.BeginRead(headerByte, 0, headerByte.Length, null, null); -#else - readTask = CommunicationsUtilities.ReadAsync(localStream, headerByte, headerByte.Length).AsTask(); -#endif - } + continueReading = packet.Type != NodePacketType.ServerNodeBuildResult; + if (continueReading) + { + readTask = pipeClient.ReadPacketAsync(); } + break; default: diff --git a/src/Build/BackEnd/Components/Communications/NodeManager.cs b/src/Build/BackEnd/Components/Communications/NodeManager.cs index b0031746031..96cb184f1c2 100644 --- a/src/Build/BackEnd/Components/Communications/NodeManager.cs +++ b/src/Build/BackEnd/Components/Communications/NodeManager.cs @@ -240,19 +240,13 @@ public void UnregisterPacketHandler(NodePacketType packetType) } /// - /// Takes a serializer, deserializes the packet and routes it to the appropriate handler. + /// Takes a serializer and deserializes the packet. /// - /// The node from which the packet was received. /// The packet type. /// The translator containing the data from which the packet should be reconstructed. - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { - if (packetType == NodePacketType.NodeShutdown) - { - RemoveNodeFromMapping(nodeId); - } - - _packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator); + return _packetFactory.DeserializePacket(packetType, translator); } /// diff --git a/src/Build/BackEnd/Components/Communications/NodeProviderInProc.cs b/src/Build/BackEnd/Components/Communications/NodeProviderInProc.cs index 61ff495b01f..a2709281af4 100644 --- a/src/Build/BackEnd/Components/Communications/NodeProviderInProc.cs +++ b/src/Build/BackEnd/Components/Communications/NodeProviderInProc.cs @@ -285,10 +285,11 @@ public void UnregisterPacketHandler(NodePacketType packetType) /// /// Deserializes and routes a packet. Not used in the in-proc node. /// - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { // Not used ErrorUtilities.ThrowInternalErrorUnreachable(); + return null; } /// diff --git a/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs b/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs index b1e380b5fa2..d346a6f3fe6 100644 --- a/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs +++ b/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcBase.cs @@ -1,32 +1,22 @@ // Licensed to the .NET Foundation under one or more agreements. // The .NET Foundation licenses this file to you under the MIT license. +#nullable disable + using System; -using System.Buffers.Binary; -using System.Collections.Generic; using System.Collections.Concurrent; +using System.Collections.Generic; using System.Globalization; using System.IO; -using System.IO.Pipes; using System.Diagnostics; using System.Linq; using System.Threading; using System.Threading.Tasks; -#if FEATURE_PIPE_SECURITY -using System.Security.Principal; -#endif - -#if FEATURE_APM -using Microsoft.Build.Eventing; -#endif using Microsoft.Build.Internal; using Microsoft.Build.Shared; -using Task = System.Threading.Tasks.Task; using Microsoft.Build.Framework; using Microsoft.Build.BackEnd.Logging; -#nullable disable - namespace Microsoft.Build.BackEnd { /// @@ -35,11 +25,6 @@ namespace Microsoft.Build.BackEnd /// internal abstract class NodeProviderOutOfProcBase { - /// - /// The maximum number of bytes to write - /// - private const int MaxPacketWriteSize = 1048576; - /// /// The number of times to retry creating an out-of-proc node. /// @@ -55,9 +40,6 @@ internal abstract class NodeProviderOutOfProcBase /// private const int TimeoutForWaitForExit = 30000; -#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY - private static readonly WindowsIdentity s_currentWindowsIdentity = WindowsIdentity.GetCurrent(); -#endif /// /// The build component host. /// @@ -163,21 +145,18 @@ protected void ShutdownAllNodes(bool nodeReuse, NodeContextTerminateDelegate ter int timeout = 30; // Attempt to connect to the process with the handshake without low priority. - Stream nodeStream = TryConnectToProcess(nodeProcess.Id, timeout, NodeProviderOutOfProc.GetHandshake(nodeReuse, false)); + NodePipeClient pipeClient = TryConnectToProcess(nodeProcess.Id, timeout, NodeProviderOutOfProc.GetHandshake(nodeReuse, false)); - if (nodeStream == null) - { - // If we couldn't connect attempt to connect to the process with the handshake including low priority. - nodeStream = TryConnectToProcess(nodeProcess.Id, timeout, NodeProviderOutOfProc.GetHandshake(nodeReuse, true)); - } + // If we couldn't connect attempt to connect to the process with the handshake including low priority. + pipeClient ??= TryConnectToProcess(nodeProcess.Id, timeout, NodeProviderOutOfProc.GetHandshake(nodeReuse, true)); - if (nodeStream != null) + if (pipeClient != null) { // If we're able to connect to such a process, send a packet requesting its termination CommunicationsUtilities.Trace("Shutting down node with pid = {0}", nodeProcess.Id); - NodeContext nodeContext = new NodeContext(0, nodeProcess, nodeStream, factory, terminateNode); + NodeContext nodeContext = new(0, nodeProcess, pipeClient, factory, terminateNode); nodeContext.SendData(new NodeBuildComplete(false /* no node reuse */)); - nodeStream.Dispose(); + pipeClient.Dispose(); } } } @@ -284,8 +263,8 @@ bool TryReuseAnyFromPossibleRunningNodes(int currentProcessId, int nodeId) _processesToIgnore.TryAdd(nodeLookupKey, default); // Attempt to connect to each process in turn. - Stream nodeStream = TryConnectToProcess(nodeToReuse.Id, 0 /* poll, don't wait for connections */, hostHandshake); - if (nodeStream != null) + NodePipeClient pipeClient = TryConnectToProcess(nodeToReuse.Id, 0 /* poll, don't wait for connections */, hostHandshake); + if (pipeClient != null) { // Connection successful, use this node. CommunicationsUtilities.Trace("Successfully connected to existed node {0} which is PID {1}", nodeId, nodeToReuse.Id); @@ -295,7 +274,7 @@ bool TryReuseAnyFromPossibleRunningNodes(int currentProcessId, int nodeId) BuildEventContext = new BuildEventContext(nodeId, BuildEventContext.InvalidTargetId, BuildEventContext.InvalidProjectContextId, BuildEventContext.InvalidTaskId) }); - CreateNodeContext(nodeId, nodeToReuse, nodeStream); + CreateNodeContext(nodeId, nodeToReuse, pipeClient); return true; } } @@ -344,13 +323,13 @@ bool StartNewNode(int nodeId) // to the debugger process. Instead, use MSBUILDDEBUGONSTART=1 // Now try to connect to it. - Stream nodeStream = TryConnectToProcess(msbuildProcess.Id, TimeoutForNewNodeCreation, hostHandshake); - if (nodeStream != null) + NodePipeClient pipeClient = TryConnectToProcess(msbuildProcess.Id, TimeoutForNewNodeCreation, hostHandshake); + if (pipeClient != null) { // Connection successful, use this node. CommunicationsUtilities.Trace("Successfully connected to created node {0} which is PID {1}", nodeId, msbuildProcess.Id); - CreateNodeContext(nodeId, msbuildProcess, nodeStream); + CreateNodeContext(nodeId, msbuildProcess, pipeClient); return true; } @@ -379,9 +358,9 @@ bool StartNewNode(int nodeId) return false; } - void CreateNodeContext(int nodeId, Process nodeToReuse, Stream nodeStream) + void CreateNodeContext(int nodeId, Process nodeToReuse, NodePipeClient pipeClient) { - NodeContext nodeContext = new(nodeId, nodeToReuse, nodeStream, factory, terminateNode); + NodeContext nodeContext = new(nodeId, nodeToReuse, pipeClient, factory, terminateNode); nodeContexts.Enqueue(nodeContext); createNode(nodeContext); } @@ -423,52 +402,22 @@ private string GetProcessesToIgnoreKey(Handshake hostHandshake, int nodeProcessI #endif } -#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY - // This code needs to be in a separate method so that we don't try (and fail) to load the Windows-only APIs when JIT-ing the code - // on non-Windows operating systems - private static void ValidateRemotePipeSecurityOnWindows(NamedPipeClientStream nodeStream) - { - SecurityIdentifier identifier = s_currentWindowsIdentity.Owner; -#if FEATURE_PIPE_SECURITY - PipeSecurity remoteSecurity = nodeStream.GetAccessControl(); -#else - var remoteSecurity = new PipeSecurity(nodeStream.SafePipeHandle, System.Security.AccessControl.AccessControlSections.Access | - System.Security.AccessControl.AccessControlSections.Owner | System.Security.AccessControl.AccessControlSections.Group); -#endif - IdentityReference remoteOwner = remoteSecurity.GetOwner(typeof(SecurityIdentifier)); - if (remoteOwner != identifier) - { - CommunicationsUtilities.Trace("The remote pipe owner {0} does not match {1}", remoteOwner.Value, identifier.Value); - throw new UnauthorizedAccessException(); - } - } -#endif - /// /// Attempts to connect to the specified process. /// - private Stream TryConnectToProcess(int nodeProcessId, int timeout, Handshake handshake) + private NodePipeClient TryConnectToProcess(int nodeProcessId, int timeout, Handshake handshake) { // Try and connect to the process. string pipeName = NamedPipeUtil.GetPlatformSpecificPipeName(nodeProcessId); -#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter - NamedPipeClientStream nodeStream = new NamedPipeClientStream( - serverName: ".", - pipeName, - PipeDirection.InOut, - PipeOptions.Asynchronous -#if FEATURE_PIPEOPTIONS_CURRENTUSERONLY - | PipeOptions.CurrentUserOnly -#endif - ); -#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter - CommunicationsUtilities.Trace("Attempting connect to PID {0} with pipe {1} with timeout {2} ms", nodeProcessId, pipeName, timeout); + NodePipeClient pipeClient = new(pipeName, handshake); + + CommunicationsUtilities.Trace("Attempting connect to PID {0}", nodeProcessId); try { - ConnectToPipeStream(nodeStream, pipeName, handshake, timeout); - return nodeStream; + pipeClient.ConnectToServer(timeout); + return pipeClient; } catch (Exception e) when (!ExceptionHandling.IsCriticalException(e)) { @@ -480,56 +429,12 @@ private Stream TryConnectToProcess(int nodeProcessId, int timeout, Handshake han CommunicationsUtilities.Trace("Failed to connect to pipe {0}. {1}", pipeName, e.Message.TrimEnd()); // If we don't close any stream, we might hang up the child - nodeStream?.Dispose(); + pipeClient?.Dispose(); } return null; } - /// - /// Connect to named pipe stream and ensure validate handshake and security. - /// - /// - /// Reused by MSBuild server client . - /// - internal static void ConnectToPipeStream(NamedPipeClientStream nodeStream, string pipeName, Handshake handshake, int timeout) - { - nodeStream.Connect(timeout); - -#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY - if (NativeMethodsShared.IsWindows) - { - // Verify that the owner of the pipe is us. This prevents a security hole where a remote node has - // been faked up with ACLs that would let us attach to it. It could then issue fake build requests back to - // us, potentially causing us to execute builds that do harmful or unexpected things. The pipe owner can - // only be set to the user's own SID by a normal, unprivileged process. The conditions where a faked up - // remote node could set the owner to something else would also let it change owners on other objects, so - // this would be a security flaw upstream of us. - ValidateRemotePipeSecurityOnWindows(nodeStream); - } -#endif - - int[] handshakeComponents = handshake.RetrieveHandshakeComponents(); - for (int i = 0; i < handshakeComponents.Length; i++) - { - CommunicationsUtilities.Trace("Writing handshake part {0} ({1}) to pipe {2}", i, handshakeComponents[i], pipeName); - nodeStream.WriteIntForHandshake(handshakeComponents[i]); - } - - // This indicates that we have finished all the parts of our handshake; hopefully the endpoint has as well. - nodeStream.WriteEndOfHandshakeSignal(); - - CommunicationsUtilities.Trace("Reading handshake from pipe {0}", pipeName); - -#if NETCOREAPP2_1_OR_GREATER - nodeStream.ReadEndOfHandshakeSignal(true, timeout); -#else - nodeStream.ReadEndOfHandshakeSignal(true); -#endif - // We got a connection. - CommunicationsUtilities.Trace("Successfully connected to pipe {0}...!", pipeName); - } - /// /// Class which wraps up the communications infrastructure for a given node. /// @@ -542,14 +447,13 @@ private enum ExitPacketState ExitPacketSent } - // The pipe(s) used to communicate with the node. - private Stream _clientToServerStream; - private Stream _serverToClientStream; + // The pipe client used to communicate with the node. + private readonly NodePipeClient _pipeClient; /// /// The factory used to create packets from data read off the pipe. /// - private INodePacketFactory _packetFactory; + private readonly INodePacketFactory _packetFactory; /// /// The node id assigned by the node provider. @@ -563,23 +467,6 @@ private enum ExitPacketState internal Process Process { get { return _process; } } - /// - /// An array used to store the header byte for each packet when read. - /// - private byte[] _headerByte; - - /// - /// A buffer typically big enough to handle a packet body. - /// We use this as a convenient way to manage and cache a byte[] that's resized - /// automatically to fit our payload. - /// - private MemoryStream _readBufferMemoryStream; - - /// - /// A reusable buffer for writing packets. - /// - private MemoryStream _writeBufferMemoryStream; - /// /// A queue used for enqueuing packets to write to the stream asynchronously. /// @@ -602,28 +489,19 @@ private enum ExitPacketState /// private ExitPacketState _exitPacketState; - /// - /// Per node read buffers - /// - private BinaryReaderFactory _binaryReaderFactory; - /// /// Constructor. /// public NodeContext(int nodeId, Process process, - Stream nodePipe, + NodePipeClient pipeClient, INodePacketFactory factory, NodeContextTerminateDelegate terminateDelegate) { _nodeId = nodeId; _process = process; - _clientToServerStream = nodePipe; - _serverToClientStream = nodePipe; + _pipeClient = pipeClient; + _pipeClient.RegisterPacketFactory(factory); _packetFactory = factory; - _headerByte = new byte[5]; // 1 for the packet type, 4 for the body length - _readBufferMemoryStream = new MemoryStream(); - _writeBufferMemoryStream = new MemoryStream(); _terminateDelegate = terminateDelegate; - _binaryReaderFactory = InterningBinaryReader.CreateSharedBuffer(); } /// @@ -636,73 +514,49 @@ public NodeContext(int nodeId, Process process, /// public void BeginAsyncPacketRead() { -#if FEATURE_APM - _clientToServerStream.BeginRead(_headerByte, 0, _headerByte.Length, HeaderReadComplete, this); -#else - ThreadPool.QueueUserWorkItem(delegate - { - var ignored = RunPacketReadLoopAsync(); - }); -#endif + _ = ThreadPool.QueueUserWorkItem(_ => _ = RunPacketReadLoopAsync()); } -#if !FEATURE_APM public async Task RunPacketReadLoopAsync() { - while (true) + INodePacket packet = null; + + while (packet?.Type != NodePacketType.NodeShutdown) { try { - int bytesRead = await CommunicationsUtilities.ReadAsync(_clientToServerStream, _headerByte, _headerByte.Length); - if (!ProcessHeaderBytesRead(bytesRead)) - { - return; - } + packet = await _pipeClient.ReadPacketAsync().ConfigureAwait(false); } catch (IOException e) { - CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in RunPacketReadLoopAsync: {0}", e); - _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed)); - Close(); - return; + CommunicationsUtilities.Trace(_nodeId, "COMMUNICATIONS ERROR (HRC) Node: {0} Process: {1} Exception: {2}", _nodeId, _process.Id, e.Message); + packet = new NodeShutdown(NodeShutdownReason.ConnectionFailed); } - NodePacketType packetType = (NodePacketType)_headerByte[0]; - int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span(_headerByte, 1, 4)); - - _readBufferMemoryStream.SetLength(packetLength); - byte[] packetData = _readBufferMemoryStream.GetBuffer(); - - try + if (packet.Type == NodePacketType.NodeShutdown && (packet as NodeShutdown).Reason == NodeShutdownReason.ConnectionFailed) { - int bytesRead = await CommunicationsUtilities.ReadAsync(_clientToServerStream, packetData, packetLength); - if (!ProcessBodyBytesRead(bytesRead, packetLength, packetType)) + try { - return; + if (_process.HasExited) + { + CommunicationsUtilities.Trace(_nodeId, " Child Process {0} has exited.", _process.Id); + } + else + { + CommunicationsUtilities.Trace(_nodeId, " Child Process {0} is still running.", _process.Id); + } + } + catch (Exception e) when (!ExceptionHandling.IsCriticalException(e)) + { + CommunicationsUtilities.Trace(_nodeId, "Unable to retrieve remote process information. {0}", e); } - } - catch (IOException e) - { - CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in RunPacketReadLoopAsync (Reading): {0}", e); - _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed)); - Close(); - return; - } - - // Read and route the packet. - if (!ReadAndRoutePacket(packetType, packetData, packetLength)) - { - return; } - if (packetType == NodePacketType.NodeShutdown) - { - Close(); - return; - } + _packetFactory.RoutePacket(_nodeId, packet); } + + Close(); } -#endif /// /// Sends the specified packet to this node asynchronously. @@ -748,37 +602,11 @@ private void DrainPacketQueue() static async Task SendDataCoreAsync(Task _, object state) { NodeContext context = (NodeContext)state; - while (context._packetWriteQueue.TryDequeue(out var packet)) + while (context._packetWriteQueue.TryDequeue(out INodePacket packet)) { - MemoryStream writeStream = context._writeBufferMemoryStream; - - // clear the buffer but keep the underlying capacity to avoid reallocations - writeStream.SetLength(0); - - ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(writeStream); try { - writeStream.WriteByte((byte)packet.Type); - - // Pad for the packet length - WriteInt32(writeStream, 0); - packet.Translate(writeTranslator); - - int writeStreamLength = (int)writeStream.Position; - - // Now plug in the real packet length - writeStream.Position = 1; - WriteInt32(writeStream, writeStreamLength - 5); - - byte[] writeStreamBuffer = writeStream.GetBuffer(); - - for (int i = 0; i < writeStreamLength; i += MaxPacketWriteSize) - { - int lengthToWrite = Math.Min(writeStreamLength - i, MaxPacketWriteSize); -#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync' - await context._serverToClientStream.WriteAsync(writeStreamBuffer, i, lengthToWrite, CancellationToken.None); -#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync' - } + await context._pipeClient.WritePacketAsync(packet).ConfigureAwait(false); if (IsExitPacket(packet)) { @@ -804,27 +632,12 @@ private static bool IsExitPacket(INodePacket packet) return packet is NodeBuildComplete buildCompletePacket && !buildCompletePacket.PrepareForReuse; } - /// - /// Avoid having a BinaryWriter just to write a 4-byte int - /// - private static void WriteInt32(MemoryStream stream, int value) - { - stream.WriteByte((byte)value); - stream.WriteByte((byte)(value >> 8)); - stream.WriteByte((byte)(value >> 16)); - stream.WriteByte((byte)(value >> 24)); - } - /// /// Closes the node's context, disconnecting it from the node. /// private void Close() { - _clientToServerStream.Dispose(); - if (!object.ReferenceEquals(_clientToServerStream, _serverToClientStream)) - { - _serverToClientStream.Dispose(); - } + _pipeClient.Dispose(); _terminateDelegate(_nodeId); } @@ -882,191 +695,6 @@ public async Task WaitForExitAsync(ILoggingService loggingService) _process.KillTree(timeoutMilliseconds: 5000); } - -#if FEATURE_APM - /// - /// Completes the asynchronous packet write to the node. - /// - private void PacketWriteComplete(IAsyncResult result) - { - try - { - _serverToClientStream.EndWrite(result); - } - catch (IOException) - { - // Do nothing here because any exception will be caught by the async read handler - } - } -#endif - - private bool ProcessHeaderBytesRead(int bytesRead) - { - if (bytesRead != _headerByte.Length) - { - CommunicationsUtilities.Trace(_nodeId, "COMMUNICATIONS ERROR (HRC) Node: {0} Process: {1} Bytes Read: {2} Expected: {3}", _nodeId, _process.Id, bytesRead, _headerByte.Length); - try - { - if (_process.HasExited) - { - CommunicationsUtilities.Trace(_nodeId, " Child Process {0} has exited.", _process.Id); - } - else - { - CommunicationsUtilities.Trace(_nodeId, " Child Process {0} is still running.", _process.Id); - } - } - catch (Exception e) when (!ExceptionHandling.IsCriticalException(e)) - { - CommunicationsUtilities.Trace(_nodeId, "Unable to retrieve remote process information. {0}", e); - } - - _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed)); - Close(); - return false; - } - - return true; - } - -#if FEATURE_APM - /// - /// Callback invoked by the completion of a read of a header byte on one of the named pipes. - /// - private void HeaderReadComplete(IAsyncResult result) - { - int bytesRead; - try - { - try - { - bytesRead = _clientToServerStream.EndRead(result); - } - - // Workaround for CLR stress bug; it sporadically calls us twice on the same async - // result, and EndRead will throw on the second one. Pretend the second one never happened. - catch (ArgumentException) - { - CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring"); - return; - } - - if (!ProcessHeaderBytesRead(bytesRead)) - { - return; - } - } - catch (IOException e) - { - CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in HeaderReadComplete: {0}", e); - _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed)); - Close(); - return; - } - - int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span(_headerByte, 1, 4)); - MSBuildEventSource.Log.PacketReadSize(packetLength); - - // Ensures the buffer is at least this length. - // It avoids reallocations if the buffer is already large enough. - _readBufferMemoryStream.SetLength(packetLength); - byte[] packetData = _readBufferMemoryStream.GetBuffer(); - - _clientToServerStream.BeginRead(packetData, 0, packetLength, BodyReadComplete, new Tuple(packetData, packetLength)); - } -#endif - - private bool ProcessBodyBytesRead(int bytesRead, int packetLength, NodePacketType packetType) - { - if (bytesRead != packetLength) - { - CommunicationsUtilities.Trace(_nodeId, "Bad packet read for packet {0} - Expected {1} bytes, got {2}", packetType, packetLength, bytesRead); - _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed)); - Close(); - return false; - } - return true; - } - - private bool ReadAndRoutePacket(NodePacketType packetType, byte[] packetData, int packetLength) - { - try - { - // The buffer is publicly visible so that InterningBinaryReader doesn't have to copy to an intermediate buffer. - // Since the buffer is publicly visible dispose right away to discourage outsiders from holding a reference to it. - using (var packetStream = new MemoryStream(packetData, 0, packetLength, /*writeable*/ false, /*bufferIsPubliclyVisible*/ true)) - { - ITranslator readTranslator = BinaryTranslator.GetReadTranslator(packetStream, _binaryReaderFactory); - _packetFactory.DeserializeAndRoutePacket(_nodeId, packetType, readTranslator); - } - } - catch (IOException e) - { - CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in ReadAndRoutPacket: {0}", e); - _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed)); - Close(); - return false; - } - return true; - } - -#if FEATURE_APM - /// - /// Method called when the body of a packet has been read. - /// - private void BodyReadComplete(IAsyncResult result) - { - NodePacketType packetType = (NodePacketType)_headerByte[0]; - var state = (Tuple)result.AsyncState; - byte[] packetData = state.Item1; - int packetLength = state.Item2; - int bytesRead; - - try - { - try - { - bytesRead = _clientToServerStream.EndRead(result); - } - - // Workaround for CLR stress bug; it sporadically calls us twice on the same async - // result, and EndRead will throw on the second one. Pretend the second one never happened. - catch (ArgumentException) - { - CommunicationsUtilities.Trace(_nodeId, "Hit CLR bug #825607: called back twice on same async result; ignoring"); - return; - } - - if (!ProcessBodyBytesRead(bytesRead, packetLength, packetType)) - { - return; - } - } - catch (IOException e) - { - CommunicationsUtilities.Trace(_nodeId, "EXCEPTION in BodyReadComplete (Reading): {0}", e); - _packetFactory.RoutePacket(_nodeId, new NodeShutdown(NodeShutdownReason.ConnectionFailed)); - Close(); - return; - } - - // Read and route the packet. - if (!ReadAndRoutePacket(packetType, packetData, packetLength)) - { - return; - } - - if (packetType != NodePacketType.NodeShutdown) - { - // Read the next packet. - BeginAsyncPacketRead(); - } - else - { - Close(); - } - } -#endif } } } diff --git a/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcTaskHost.cs b/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcTaskHost.cs index 1d0f0f525d3..d7885edb750 100644 --- a/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcTaskHost.cs +++ b/src/Build/BackEnd/Components/Communications/NodeProviderOutOfProcTaskHost.cs @@ -270,21 +270,13 @@ public void UnregisterPacketHandler(NodePacketType packetType) } /// - /// Takes a serializer, deserializes the packet and routes it to the appropriate handler. + /// Takes a serializer and deserializes the packet. /// - /// The node from which the packet was received. /// The packet type. /// The translator containing the data from which the packet should be reconstructed. - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { - if (_nodeIdToPacketFactory.TryGetValue(nodeId, out INodePacketFactory nodePacketFactory)) - { - nodePacketFactory.DeserializeAndRoutePacket(nodeId, packetType, translator); - } - else - { - _localPacketFactory.DeserializeAndRoutePacket(nodeId, packetType, translator); - } + return _localPacketFactory.DeserializePacket(packetType, translator); } /// diff --git a/src/Build/BackEnd/Components/Communications/TaskHostNodeManager.cs b/src/Build/BackEnd/Components/Communications/TaskHostNodeManager.cs index e7e66d6b886..cafc95a4f22 100644 --- a/src/Build/BackEnd/Components/Communications/TaskHostNodeManager.cs +++ b/src/Build/BackEnd/Components/Communications/TaskHostNodeManager.cs @@ -141,10 +141,9 @@ public void UnregisterPacketHandler(NodePacketType packetType) /// /// Takes a serializer, deserializes the packet and routes it to the appropriate handler. /// - /// The node from which the packet was received. /// The packet type. /// The translator containing the data from which the packet should be reconstructed. - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { throw new NotSupportedException("not used"); } diff --git a/src/Build/BackEnd/Node/InProcNode.cs b/src/Build/BackEnd/Node/InProcNode.cs index 7b4049f8905..6ac34aabdd4 100644 --- a/src/Build/BackEnd/Node/InProcNode.cs +++ b/src/Build/BackEnd/Node/InProcNode.cs @@ -216,10 +216,11 @@ public void UnregisterPacketHandler(NodePacketType packetType) /// /// Not necessary for in-proc node - we don't serialize. /// - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { // The in-proc endpoint shouldn't be serializing, just routing. ErrorUtilities.ThrowInternalError("Unexpected call to DeserializeAndRoutePacket on the in-proc node."); + return null; } /// diff --git a/src/Build/BackEnd/Node/OutOfProcNode.cs b/src/Build/BackEnd/Node/OutOfProcNode.cs index f092add506b..bcd97d15400 100644 --- a/src/Build/BackEnd/Node/OutOfProcNode.cs +++ b/src/Build/BackEnd/Node/OutOfProcNode.cs @@ -331,14 +331,13 @@ void INodePacketFactory.UnregisterPacketHandler(NodePacketType packetType) } /// - /// Deserializes and routes a packer to the appropriate handler. + /// Deserializes a packet. /// - /// The node from which the packet was received. /// The packet type. /// The translator to use as a source for packet data. - void INodePacketFactory.DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + INodePacket INodePacketFactory.DeserializePacket(NodePacketType packetType, ITranslator translator) { - _packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator); + return _packetFactory.DeserializePacket(packetType, translator); } /// diff --git a/src/Build/BackEnd/Node/OutOfProcServerNode.cs b/src/Build/BackEnd/Node/OutOfProcServerNode.cs index bda79d588cd..6af7462d159 100644 --- a/src/Build/BackEnd/Node/OutOfProcServerNode.cs +++ b/src/Build/BackEnd/Node/OutOfProcServerNode.cs @@ -202,14 +202,13 @@ void INodePacketFactory.UnregisterPacketHandler(NodePacketType packetType) } /// - /// Deserializes and routes a packer to the appropriate handler. + /// Deserializes a packet. /// - /// The node from which the packet was received. /// The packet type. /// The translator to use as a source for packet data. - void INodePacketFactory.DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + INodePacket INodePacketFactory.DeserializePacket(NodePacketType packetType, ITranslator translator) { - _packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator); + return _packetFactory.DeserializePacket(packetType, translator); } /// diff --git a/src/Build/Instance/TaskFactories/TaskHostTask.cs b/src/Build/Instance/TaskFactories/TaskHostTask.cs index 784b67b200c..f0afd81d0a8 100644 --- a/src/Build/Instance/TaskFactories/TaskHostTask.cs +++ b/src/Build/Instance/TaskFactories/TaskHostTask.cs @@ -362,14 +362,13 @@ public void UnregisterPacketHandler(NodePacketType packetType) } /// - /// Takes a serializer, deserializes the packet and routes it to the appropriate handler. + /// Takes a serializer and deserializes the packet. /// - /// The node from which the packet was received. /// The packet type. /// The translator containing the data from which the packet should be reconstructed. - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { - _packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator); + return _packetFactory.DeserializePacket(packetType, translator); } /// diff --git a/src/Build/Microsoft.Build.csproj b/src/Build/Microsoft.Build.csproj index e6fc5f4cccd..a7881e04a2f 100644 --- a/src/Build/Microsoft.Build.csproj +++ b/src/Build/Microsoft.Build.csproj @@ -95,7 +95,6 @@ Collections\ReadOnlyEmptyCollection.cs - @@ -105,6 +104,9 @@ + + + diff --git a/src/MSBuild/MSBuild.csproj b/src/MSBuild/MSBuild.csproj index 2edca8c339b..a7dc889b270 100644 --- a/src/MSBuild/MSBuild.csproj +++ b/src/MSBuild/MSBuild.csproj @@ -88,7 +88,6 @@ - @@ -103,6 +102,9 @@ + + + diff --git a/src/MSBuild/OutOfProcTaskHostNode.cs b/src/MSBuild/OutOfProcTaskHostNode.cs index f862ae2adca..8aafef70b62 100644 --- a/src/MSBuild/OutOfProcTaskHostNode.cs +++ b/src/MSBuild/OutOfProcTaskHostNode.cs @@ -582,14 +582,13 @@ public void UnregisterPacketHandler(NodePacketType packetType) } /// - /// Takes a serializer, deserializes the packet and routes it to the appropriate handler. + /// Takes a serializer and deserializes the packet. /// - /// The node from which the packet was received. /// The packet type. /// The translator containing the data from which the packet should be reconstructed. - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { - _packetFactory.DeserializeAndRoutePacket(nodeId, packetType, translator); + return _packetFactory.DeserializePacket(packetType, translator); } /// diff --git a/src/MSBuildTaskHost/MSBuildTaskHost.csproj b/src/MSBuildTaskHost/MSBuildTaskHost.csproj index a189f58567a..d0ad4122b8d 100644 --- a/src/MSBuildTaskHost/MSBuildTaskHost.csproj +++ b/src/MSBuildTaskHost/MSBuildTaskHost.csproj @@ -64,7 +64,6 @@ - CopyOnWriteDictionary.cs @@ -140,6 +139,15 @@ NodeBuildComplete.cs + + NodeComponentBase.cs + + + NodeComponentBase.cs + + + NodeComponentBase.cs + NodeEndpointOutOfProcBase.cs diff --git a/src/Shared/BufferedReadStream.cs b/src/Shared/BufferedReadStream.cs deleted file mode 100644 index 55bba5986f8..00000000000 --- a/src/Shared/BufferedReadStream.cs +++ /dev/null @@ -1,210 +0,0 @@ -// Licensed to the .NET Foundation under one or more agreements. -// The .NET Foundation licenses this file to you under the MIT license. - -using System; -using System.IO; -using System.IO.Pipes; -using System.Threading; - -#if NET451_OR_GREATER || NETCOREAPP -using System.Threading.Tasks; -#endif - -#nullable disable - -namespace Microsoft.Build.BackEnd -{ - internal class BufferedReadStream : Stream - { - private const int BUFFER_SIZE = 1024; - private NamedPipeServerStream _innerStream; - private byte[] _buffer; - - // The number of bytes in the buffer that have been read from the underlying stream but not read by consumers of this stream - private int _currentlyBufferedByteCount; - private int _currentIndexInBuffer; - - public BufferedReadStream(NamedPipeServerStream innerStream) - { - _innerStream = innerStream; - _buffer = new byte[BUFFER_SIZE]; - - _currentlyBufferedByteCount = 0; - } - - public override bool CanRead { get { return _innerStream.CanRead; } } - - public override bool CanSeek { get { return false; } } - - public override bool CanWrite { get { return _innerStream.CanWrite; } } - - public override long Length { get { return _innerStream.Length; } } - - public override long Position - { - get { throw new NotSupportedException(); } - set { throw new NotSupportedException(); } - } - - public override void Flush() - { - _innerStream.Flush(); - } - - public override int ReadByte() - { - if (_currentlyBufferedByteCount > 0) - { - int ret = _buffer[_currentIndexInBuffer]; - _currentIndexInBuffer++; - _currentlyBufferedByteCount--; - return ret; - } - else - { - // Let the base class handle it, which will end up calling the Read() method - return base.ReadByte(); - } - } - - public override int Read(byte[] buffer, int offset, int count) - { - if (count > BUFFER_SIZE) - { - // Trying to read more data than the buffer can hold - int alreadyCopied = 0; - if (_currentlyBufferedByteCount > 0) - { - Array.Copy(_buffer, _currentIndexInBuffer, buffer, offset, _currentlyBufferedByteCount); - alreadyCopied = _currentlyBufferedByteCount; - _currentIndexInBuffer = 0; - _currentlyBufferedByteCount = 0; - } - int innerReadCount = _innerStream.Read(buffer, offset + alreadyCopied, count - alreadyCopied); - return innerReadCount + alreadyCopied; - } - else if (count <= _currentlyBufferedByteCount) - { - // Enough data buffered to satisfy read request - Array.Copy(_buffer, _currentIndexInBuffer, buffer, offset, count); - _currentIndexInBuffer += count; - _currentlyBufferedByteCount -= count; - return count; - } - else - { - // Need to read more data - int alreadyCopied = 0; - if (_currentlyBufferedByteCount > 0) - { - Array.Copy(_buffer, _currentIndexInBuffer, buffer, offset, _currentlyBufferedByteCount); - alreadyCopied = _currentlyBufferedByteCount; - _currentIndexInBuffer = 0; - _currentlyBufferedByteCount = 0; - } - - int innerReadCount = _innerStream.Read(_buffer, 0, BUFFER_SIZE); - _currentIndexInBuffer = 0; - _currentlyBufferedByteCount = innerReadCount; - - int remainingCopyCount; - - if (alreadyCopied + innerReadCount >= count) - { - remainingCopyCount = count - alreadyCopied; - } - else - { - remainingCopyCount = innerReadCount; - } - - Array.Copy(_buffer, 0, buffer, offset + alreadyCopied, remainingCopyCount); - _currentIndexInBuffer += remainingCopyCount; - _currentlyBufferedByteCount -= remainingCopyCount; - - return alreadyCopied + remainingCopyCount; - } - } - -#if NET451_OR_GREATER || NETCOREAPP - public override async Task ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken) - { - if (count > BUFFER_SIZE) - { - // Trying to read more data than the buffer can hold - int alreadyCopied = CopyToBuffer(buffer, offset); - -#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync' - int innerReadCount = await _innerStream.ReadAsync(buffer, offset + alreadyCopied, count - alreadyCopied, cancellationToken); -#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync' - return innerReadCount + alreadyCopied; - } - else if (count <= _currentlyBufferedByteCount) - { - // Enough data buffered to satisfy read request - Array.Copy(_buffer, _currentIndexInBuffer, buffer, offset, count); - _currentIndexInBuffer += count; - _currentlyBufferedByteCount -= count; - return count; - } - else - { - // Need to read more data - int alreadyCopied = CopyToBuffer(buffer, offset); - -#pragma warning disable CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync' - int innerReadCount = await _innerStream.ReadAsync(_buffer, 0, BUFFER_SIZE, cancellationToken); -#pragma warning restore CA1835 // Prefer the 'Memory'-based overloads for 'ReadAsync' and 'WriteAsync' - _currentIndexInBuffer = 0; - _currentlyBufferedByteCount = innerReadCount; - - int remainingCopyCount = alreadyCopied + innerReadCount >= count ? count - alreadyCopied : innerReadCount; - Array.Copy(_buffer, 0, buffer, offset + alreadyCopied, remainingCopyCount); - _currentIndexInBuffer += remainingCopyCount; - _currentlyBufferedByteCount -= remainingCopyCount; - - return alreadyCopied + remainingCopyCount; - } - - int CopyToBuffer(byte[] buffer, int offset) - { - int alreadyCopied = 0; - if (_currentlyBufferedByteCount > 0) - { - Array.Copy(_buffer, _currentIndexInBuffer, buffer, offset, _currentlyBufferedByteCount); - alreadyCopied = _currentlyBufferedByteCount; - _currentIndexInBuffer = 0; - _currentlyBufferedByteCount = 0; - } - - return alreadyCopied; - } - } -#endif - - public override long Seek(long offset, SeekOrigin origin) - { - throw new NotSupportedException(); - } - - public override void SetLength(long value) - { - throw new NotSupportedException(); - } - - public override void Write(byte[] buffer, int offset, int count) - { - _innerStream.Write(buffer, offset, count); - } - - protected override void Dispose(bool disposing) - { - if (disposing) - { - _innerStream.Dispose(); - } - - base.Dispose(disposing); - } - } -} diff --git a/src/Shared/CommunicationsUtilities.cs b/src/Shared/CommunicationsUtilities.cs index cb1177fce58..4b48317081c 100644 --- a/src/Shared/CommunicationsUtilities.cs +++ b/src/Shared/CommunicationsUtilities.cs @@ -22,9 +22,6 @@ #if !CLR2COMPATIBILITY using Microsoft.Build.Shared.Debugging; #endif -#if !FEATURE_APM -using System.Threading.Tasks; -#endif #nullable disable @@ -472,25 +469,9 @@ internal static void WriteIntForHandshake(this PipeStream stream, int value) stream.Write(bytes, 0, bytes.Length); } -#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter - internal static void ReadEndOfHandshakeSignal( - this PipeStream stream, - bool isProvider -#if NETCOREAPP2_1_OR_GREATER - , int timeout -#endif - ) -#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter + internal static void ReadEndOfHandshakeSignal(this PipeStream stream, bool isProvider, int timeout) { - // Accept only the first byte of the EndOfHandshakeSignal -#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter - int valueRead = stream.ReadIntForHandshake( - byteToAccept: null -#if NETCOREAPP2_1_OR_GREATER - , timeout -#endif - ); -#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter + int valueRead = stream.ReadIntForHandshake(byteToAccept: null, timeout); if (valueRead != EndOfHandshakeSignal) { @@ -506,17 +487,11 @@ bool isProvider } } -#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter /// /// Extension method to read a series of bytes from a stream. /// If specified, leading byte matches one in the supplied array if any, returns rejection byte and throws IOException. /// - internal static int ReadIntForHandshake(this PipeStream stream, byte? byteToAccept -#if NETCOREAPP2_1_OR_GREATER - , int timeout -#endif - ) -#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter + internal static int ReadIntForHandshake(this PipeStream stream, byte? byteToAccept, int timeout) { byte[] bytes = new byte[4]; @@ -586,23 +561,6 @@ internal static int ReadIntForHandshake(this PipeStream stream, byte? byteToAcce } #nullable disable -#if !FEATURE_APM - internal static async ValueTask ReadAsync(Stream stream, byte[] buffer, int bytesToRead) - { - int totalBytesRead = 0; - while (totalBytesRead < bytesToRead) - { - int bytesRead = await stream.ReadAsync(buffer.AsMemory(totalBytesRead, bytesToRead - totalBytesRead), CancellationToken.None); - if (bytesRead == 0) - { - return totalBytesRead; - } - totalBytesRead += bytesRead; - } - return totalBytesRead; - } -#endif - /// /// Given the appropriate information, return the equivalent HandshakeOptions. /// diff --git a/src/Shared/INodePacketFactory.cs b/src/Shared/INodePacketFactory.cs index c972e0408b5..b0fd06bbbca 100644 --- a/src/Shared/INodePacketFactory.cs +++ b/src/Shared/INodePacketFactory.cs @@ -35,12 +35,11 @@ internal interface INodePacketFactory void UnregisterPacketHandler(NodePacketType packetType); /// - /// Takes a serializer, deserializes the packet and routes it to the appropriate handler. + /// Takes a serializer and deserializes the packet. /// - /// The node from which the packet was received. /// The packet type. /// The translator containing the data from which the packet should be reconstructed. - void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator); + INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator); /// /// Routes the specified packet diff --git a/src/Shared/NodeEndpointOutOfProcBase.cs b/src/Shared/NodeEndpointOutOfProcBase.cs index 846f7716ec4..5a5d1db3eb5 100644 --- a/src/Shared/NodeEndpointOutOfProcBase.cs +++ b/src/Shared/NodeEndpointOutOfProcBase.cs @@ -3,25 +3,16 @@ using System; using System.Diagnostics.CodeAnalysis; -#if CLR2COMPATIBILITY +#if TASKHOST using Microsoft.Build.Shared.Concurrent; #else using System.Collections.Concurrent; +using System.Threading.Tasks; #endif using System.IO; -using System.IO.Pipes; using System.Threading; using Microsoft.Build.Internal; using Microsoft.Build.Shared; -#if FEATURE_SECURITY_PERMISSIONS || FEATURE_PIPE_SECURITY -using System.Security.AccessControl; -#endif -#if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR -using System.Security.Principal; -#endif -#if NET451_OR_GREATER || NETCOREAPP -using System.Threading.Tasks; -#endif #nullable disable @@ -35,18 +26,6 @@ internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint { #region Private Data -#if NETCOREAPP2_1_OR_GREATER - /// - /// The amount of time to wait for the client to connect to the host. - /// - private const int ClientConnectTimeout = 60000; -#endif // NETCOREAPP2_1 - - /// - /// The size of the buffers to use for named pipes - /// - private const int PipeBufferSize = 131072; - /// /// The current communication status of the node. /// @@ -55,7 +34,7 @@ internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint /// /// The pipe client used by the nodes. /// - private NamedPipeServerStream _pipeServer; + private NodePipeServer _pipeServer; // The following private data fields are used only when the endpoint is in ASYNCHRONOUS mode. @@ -100,21 +79,6 @@ internal abstract class NodeEndpointOutOfProcBase : INodeEndpoint /// private ConcurrentQueue _packetQueue; - /// - /// Per-node shared read buffer. - /// - private BinaryReaderFactory _sharedReadBuffer; - - /// - /// A way to cache a byte array when writing out packets - /// - private MemoryStream _packetStream; - - /// - /// A binary writer to help write into - /// - private BinaryWriter _binaryWriter; - #endregion #region INodeEndpoint Events @@ -153,6 +117,7 @@ public void Listen(INodePacketFactory factory) ErrorUtilities.VerifyThrow(_status == LinkStatus.Inactive, "Link not inactive. Status is {0}", _status); ErrorUtilities.VerifyThrowArgumentNull(factory, nameof(factory)); _packetFactory = factory; + _pipeServer.RegisterPacketFactory(factory); InitializeAsyncPacketThread(); } @@ -206,54 +171,9 @@ internal void InternalConstruct(string pipeName = null) { _status = LinkStatus.Inactive; _asyncDataMonitor = new object(); - _sharedReadBuffer = InterningBinaryReader.CreateSharedBuffer(); - - _packetStream = new MemoryStream(); - _binaryWriter = new BinaryWriter(_packetStream); pipeName ??= NamedPipeUtil.GetPlatformSpecificPipeName(); - -#if FEATURE_PIPE_SECURITY && FEATURE_NAMED_PIPE_SECURITY_CONSTRUCTOR - SecurityIdentifier identifier = WindowsIdentity.GetCurrent().Owner; - PipeSecurity security = new PipeSecurity(); - - // Restrict access to just this account. We set the owner specifically here, and on the - // pipe client side they will check the owner against this one - they must have identical - // SIDs or the client will reject this server. This is used to avoid attacks where a - // hacked server creates a less restricted pipe in an attempt to lure us into using it and - // then sending build requests to the real pipe client (which is the MSBuild Build Manager.) - PipeAccessRule rule = new PipeAccessRule(identifier, PipeAccessRights.ReadWrite, AccessControlType.Allow); - security.AddAccessRule(rule); - security.SetOwner(identifier); - - _pipeServer = new NamedPipeServerStream( - pipeName, - PipeDirection.InOut, - 1, // Only allow one connection at a time. - PipeTransmissionMode.Byte, - PipeOptions.Asynchronous | PipeOptions.WriteThrough -#if FEATURE_PIPEOPTIONS_CURRENTUSERONLY - | PipeOptions.CurrentUserOnly -#endif - , - PipeBufferSize, // Default input buffer - PipeBufferSize, // Default output buffer - security, - HandleInheritability.None); -#else - _pipeServer = new NamedPipeServerStream( - pipeName, - PipeDirection.InOut, - 1, // Only allow one connection at a time. - PipeTransmissionMode.Byte, - PipeOptions.Asynchronous | PipeOptions.WriteThrough -#if FEATURE_PIPEOPTIONS_CURRENTUSERONLY - | PipeOptions.CurrentUserOnly -#endif - , - PipeBufferSize, // Default input buffer - PipeBufferSize); // Default output buffer -#endif + _pipeServer = new NodePipeServer(pipeName, GetHandshake()); } #endregion @@ -295,7 +215,7 @@ private void InternalDisconnect() ErrorUtilities.VerifyThrow(_packetPump.ManagedThreadId != Thread.CurrentThread.ManagedThreadId, "Can't join on the same thread."); _terminatePacketPump.Set(); _packetPump.Join(); -#if CLR2COMPATIBILITY +#if TASKHOST _terminatePacketPump.Close(); #else _terminatePacketPump.Dispose(); @@ -345,172 +265,25 @@ private void InitializeAsyncPacketThread() /// private void PacketPumpProc() { - NamedPipeServerStream localPipeServer = _pipeServer; + NodePipeServer localPipeServer = _pipeServer; AutoResetEvent localPacketAvailable = _packetAvailable; AutoResetEvent localTerminatePacketPump = _terminatePacketPump; ConcurrentQueue localPacketQueue = _packetQueue; - DateTime originalWaitStartTime = DateTime.UtcNow; - bool gotValidConnection = false; - while (!gotValidConnection) + ChangeLinkStatus(localPipeServer.WaitForConnection()); + if (_status != LinkStatus.Active) { - gotValidConnection = true; - DateTime restartWaitTime = DateTime.UtcNow; - - // We only wait to wait the difference between now and the last original start time, in case we have multiple hosts attempting - // to attach. This prevents each attempt from resetting the timer. - TimeSpan usedWaitTime = restartWaitTime - originalWaitStartTime; - int waitTimeRemaining = Math.Max(0, CommunicationsUtilities.NodeConnectionTimeout - (int)usedWaitTime.TotalMilliseconds); - - try - { - // Wait for a connection -#if FEATURE_APM - IAsyncResult resultForConnection = localPipeServer.BeginWaitForConnection(null, null); - CommunicationsUtilities.Trace("Waiting for connection {0} ms...", waitTimeRemaining); - bool connected = resultForConnection.AsyncWaitHandle.WaitOne(waitTimeRemaining, false); -#else - Task connectionTask = localPipeServer.WaitForConnectionAsync(); - CommunicationsUtilities.Trace("Waiting for connection {0} ms...", waitTimeRemaining); - bool connected = connectionTask.Wait(waitTimeRemaining); -#endif - if (!connected) - { - CommunicationsUtilities.Trace("Connection timed out waiting a host to contact us. Exiting comm thread."); - ChangeLinkStatus(LinkStatus.ConnectionFailed); - return; - } - - CommunicationsUtilities.Trace("Parent started connecting. Reading handshake from parent"); -#if FEATURE_APM - localPipeServer.EndWaitForConnection(resultForConnection); -#endif - - // The handshake protocol is a series of int exchanges. The host sends us a each component, and we - // verify it. Afterwards, the host sends an "End of Handshake" signal, to which we respond in kind. - // Once the handshake is complete, both sides can be assured the other is ready to accept data. - Handshake handshake = GetHandshake(); - try - { - int[] handshakeComponents = handshake.RetrieveHandshakeComponents(); - for (int i = 0; i < handshakeComponents.Length; i++) - { -#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter - int handshakePart = _pipeServer.ReadIntForHandshake( - byteToAccept: i == 0 ? (byte?)CommunicationsUtilities.handshakeVersion : null /* this will disconnect a < 16.8 host; it expects leading 00 or F5 or 06. 0x00 is a wildcard */ -#if NETCOREAPP2_1_OR_GREATER - , ClientConnectTimeout /* wait a long time for the handshake from this side */ -#endif - ); -#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter - - if (handshakePart != handshakeComponents[i]) - { - CommunicationsUtilities.Trace("Handshake failed. Received {0} from host not {1}. Probably the host is a different MSBuild build.", handshakePart, handshakeComponents[i]); - _pipeServer.WriteIntForHandshake(i + 1); - gotValidConnection = false; - break; - } - } - - if (gotValidConnection) - { - // To ensure that our handshake and theirs have the same number of bytes, receive and send a magic number indicating EOS. -#if NETCOREAPP2_1_OR_GREATER - _pipeServer.ReadEndOfHandshakeSignal(false, ClientConnectTimeout); /* wait a long time for the handshake from this side */ -#else - _pipeServer.ReadEndOfHandshakeSignal(false); -#endif - CommunicationsUtilities.Trace("Successfully connected to parent."); - _pipeServer.WriteEndOfHandshakeSignal(); - -#if FEATURE_SECURITY_PERMISSIONS - // We will only talk to a host that was started by the same user as us. Even though the pipe access is set to only allow this user, we want to ensure they - // haven't attempted to change those permissions out from under us. This ensures that the only way they can truly gain access is to be impersonating the - // user we were started by. - WindowsIdentity currentIdentity = WindowsIdentity.GetCurrent(); - WindowsIdentity clientIdentity = null; - localPipeServer.RunAsClient(delegate () { clientIdentity = WindowsIdentity.GetCurrent(true); }); - - if (clientIdentity == null || !String.Equals(clientIdentity.Name, currentIdentity.Name, StringComparison.OrdinalIgnoreCase)) - { - CommunicationsUtilities.Trace("Handshake failed. Host user is {0} but we were created by {1}.", (clientIdentity == null) ? "" : clientIdentity.Name, currentIdentity.Name); - gotValidConnection = false; - continue; - } -#endif - } - } - catch (IOException e) - { - // We will get here when: - // 1. The host (OOP main node) connects to us, it immediately checks for user privileges - // and if they don't match it disconnects immediately leaving us still trying to read the blank handshake - // 2. The host is too old sending us bits we automatically reject in the handshake - // 3. We expected to read the EndOfHandshake signal, but we received something else - CommunicationsUtilities.Trace("Client connection failed but we will wait for another connection. Exception: {0}", e.Message); - - gotValidConnection = false; - } - catch (InvalidOperationException) - { - gotValidConnection = false; - } - - if (!gotValidConnection) - { - if (localPipeServer.IsConnected) - { - localPipeServer.Disconnect(); - } - continue; - } - - ChangeLinkStatus(LinkStatus.Active); - } - catch (Exception e) when (!ExceptionHandling.IsCriticalException(e)) - { - CommunicationsUtilities.Trace("Client connection failed. Exiting comm thread. {0}", e); - if (localPipeServer.IsConnected) - { - localPipeServer.Disconnect(); - } - - ExceptionHandling.DumpExceptionToFile(e); - ChangeLinkStatus(LinkStatus.Failed); - return; - } + return; } - RunReadLoop( - new BufferedReadStream(_pipeServer), - _pipeServer, - localPacketQueue, localPacketAvailable, localTerminatePacketPump); + RunReadLoop(localPipeServer, localPacketQueue, localPacketAvailable, localTerminatePacketPump); CommunicationsUtilities.Trace("Ending read loop"); - - try - { - if (localPipeServer.IsConnected) - { -#if NET // OperatingSystem.IsWindows() is new in .NET 5.0 - if (OperatingSystem.IsWindows()) -#endif - { - localPipeServer.WaitForPipeDrain(); - } - - localPipeServer.Disconnect(); - } - } - catch (Exception) - { - // We don't really care if Disconnect somehow fails, but it gives us a chance to do the right thing. - } + localPipeServer.Disconnect(); } - private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream localWritePipe, + private void RunReadLoop(NodePipeServer localPipeServer, ConcurrentQueue localPacketQueue, AutoResetEvent localPacketAvailable, AutoResetEvent localTerminatePacketPump) { // Ordering of the wait handles is important. The first signalled wait handle in the array @@ -518,13 +291,11 @@ private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream // terminate event triggered so that we cannot get into a situation where packets are being // spammed to the endpoint and it never gets an opportunity to shutdown. CommunicationsUtilities.Trace("Entering read loop."); - byte[] headerByte = new byte[5]; -#if NET451_OR_GREATER - Task readTask = localReadPipe.ReadAsync(headerByte, 0, headerByte.Length, CancellationToken.None); -#elif NETCOREAPP - Task readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length).AsTask(); +#if TASKHOST + Func readPacketFunc = localPipeServer.ReadPacket; + IAsyncResult result = readPacketFunc.BeginInvoke(null, null); #else - IAsyncResult result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null); + Task readTask = localPipeServer.ReadPacketAsync(); #endif // Ordering is important. We want packetAvailable to supercede terminate otherwise we will not properly wait for all @@ -548,36 +319,25 @@ private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream { case 0: { - int bytesRead = 0; + INodePacket packet = null; + try { -#if NET451_OR_GREATER || NETCOREAPP - bytesRead = readTask.Result; +#if TASKHOST + packet = readPacketFunc.EndInvoke(result); #else - bytesRead = localReadPipe.EndRead(result); + packet = readTask.GetAwaiter().GetResult(); #endif - } - catch (Exception e) - { - // Lost communications. Abort (but allow node reuse) - CommunicationsUtilities.Trace("Exception reading from server. {0}", e); - ExceptionHandling.DumpExceptionToFile(e); - ChangeLinkStatus(LinkStatus.Inactive); - exitLoop = true; - break; - } - - if (bytesRead != headerByte.Length) - { - // Incomplete read. Abort. - if (bytesRead == 0) + if (packet.Type == NodePacketType.NodeShutdown) { if (_isClientDisconnecting) { - CommunicationsUtilities.Trace("Parent disconnected gracefully."); + // Lost communications. Abort (but allow node reuse). // Do not change link status to failed as this could make node think connection has failed // and recycle node, while this is perfectly expected and handled race condition // (both client and node is about to close pipe and client can be faster). + CommunicationsUtilities.Trace("Parent disconnected gracefully."); + ChangeLinkStatus(LinkStatus.Inactive); } else { @@ -587,43 +347,35 @@ private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream } else { - CommunicationsUtilities.Trace("Incomplete header read from server. {0} of {1} bytes read", bytesRead, headerByte.Length); - ChangeLinkStatus(LinkStatus.Failed); + _packetFactory.RoutePacket(0, packet); } - - exitLoop = true; - break; - } - - NodePacketType packetType = (NodePacketType)headerByte[0]; - - try - { - _packetFactory.DeserializeAndRoutePacket(0, packetType, BinaryTranslator.GetReadTranslator(localReadPipe, _sharedReadBuffer)); } catch (Exception e) { - // Error while deserializing or handling packet. Abort. - CommunicationsUtilities.Trace("Exception while deserializing packet {0}: {1}", packetType, e); + if (packet == null) + { + CommunicationsUtilities.Trace("Exception while reading packet from server: {0}", e); + } + else + { + CommunicationsUtilities.Trace("Exception while deserializing or handling packet {0}: {1}", packet.Type, e); + } + ExceptionHandling.DumpExceptionToFile(e); ChangeLinkStatus(LinkStatus.Failed); - exitLoop = true; - break; } -#if NET451_OR_GREATER - readTask = localReadPipe.ReadAsync(headerByte, 0, headerByte.Length, CancellationToken.None); -#elif NETCOREAPP - readTask = CommunicationsUtilities.ReadAsync(localReadPipe, headerByte, headerByte.Length).AsTask(); -#else - result = localReadPipe.BeginRead(headerByte, 0, headerByte.Length, null, null); -#endif - -#if NET451_OR_GREATER || NETCOREAPP - handles[0] = ((IAsyncResult)readTask).AsyncWaitHandle; + exitLoop = _status != LinkStatus.Active; + if (!exitLoop) + { +#if TASKHOST + result = readPacketFunc.BeginInvoke(null, null); + handles[0] = result.AsyncWaitHandle; #else - handles[0] = result.AsyncWaitHandle; + readTask = localPipeServer.ReadPacketAsync(); + handles[0] = ((IAsyncResult)readTask).AsyncWaitHandle; #endif + } } break; @@ -633,29 +385,9 @@ private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream try { // Write out all the queued packets. - INodePacket packet; - while (localPacketQueue.TryDequeue(out packet)) + while (localPacketQueue.TryDequeue(out INodePacket packet)) { - var packetStream = _packetStream; - packetStream.SetLength(0); - - ITranslator writeTranslator = BinaryTranslator.GetWriteTranslator(packetStream); - - packetStream.WriteByte((byte)packet.Type); - - // Pad for packet length - _binaryWriter.Write(0); - - // Reset the position in the write buffer. - packet.Translate(writeTranslator); - - int packetStreamLength = (int)packetStream.Position; - - // Now write in the actual packet length - packetStream.Position = 1; - _binaryWriter.Write(packetStreamLength - 5); - - localWritePipe.Write(packetStream.GetBuffer(), 0, packetStreamLength); + localPipeServer.WritePacket(packet); } } catch (Exception e) @@ -685,8 +417,8 @@ private void RunReadLoop(BufferedReadStream localReadPipe, NamedPipeServerStream while (!exitLoop); } -#endregion + #endregion -#endregion + #endregion } } diff --git a/src/Shared/NodePacketFactory.cs b/src/Shared/NodePacketFactory.cs index 214ddfa20f9..51cbee08655 100644 --- a/src/Shared/NodePacketFactory.cs +++ b/src/Shared/NodePacketFactory.cs @@ -45,9 +45,9 @@ public void UnregisterPacketHandler(NodePacketType packetType) } /// - /// Creates and routes a packet with data from a binary stream. + /// Creates a packet with data from a binary stream. /// - public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITranslator translator) + public INodePacket DeserializePacket(NodePacketType packetType, ITranslator translator) { // PERF: Not using VerifyThrow to avoid boxing of packetType in the non-error case if (!_packetFactories.TryGetValue(packetType, out PacketFactoryRecord record)) @@ -55,7 +55,7 @@ public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITr ErrorUtilities.ThrowInternalError("No packet handler for type {0}", packetType); } - record.DeserializeAndRoutePacket(nodeId, translator); + return record.DeserializePacket(translator); } /// @@ -63,7 +63,12 @@ public void DeserializeAndRoutePacket(int nodeId, NodePacketType packetType, ITr /// public void RoutePacket(int nodeId, INodePacket packet) { - PacketFactoryRecord record = _packetFactories[packet.Type]; + // PERF: Not using VerifyThrow to avoid boxing of packetType in the non-error case + if (!_packetFactories.TryGetValue(packet.Type, out PacketFactoryRecord record)) + { + ErrorUtilities.ThrowInternalError("No packet handler for type {0}", packet.Type); + } + record.RoutePacket(nodeId, packet); } @@ -94,13 +99,9 @@ public PacketFactoryRecord(INodePacketHandler handler, NodePacketFactoryMethod f } /// - /// Creates a packet from a binary stream and sends it to the registered handler. + /// Creates a packet from a binary stream. /// - public void DeserializeAndRoutePacket(int nodeId, ITranslator translator) - { - INodePacket packet = _factoryMethod(translator); - RoutePacket(nodeId, packet); - } + public INodePacket DeserializePacket(ITranslator translator) => _factoryMethod(translator); /// /// Routes the packet to the correct destination. diff --git a/src/Shared/NodePipeBase.cs b/src/Shared/NodePipeBase.cs new file mode 100644 index 00000000000..a9c9692a880 --- /dev/null +++ b/src/Shared/NodePipeBase.cs @@ -0,0 +1,272 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.IO; +using System.IO.Pipes; +using System.Threading; +using Microsoft.Build.BackEnd; +using Microsoft.Build.Framework; +using Microsoft.Build.Shared; + +#if !TASKHOST +using System.Buffers.Binary; +using System.Threading.Tasks; +using Microsoft.Build.Eventing; +#endif + +namespace Microsoft.Build.Internal +{ + internal abstract class NodePipeBase : IDisposable + { + /// + /// A packet header consists of 1 byte (enum) for the packet type + 4 bytes (int32) for the packet length. + /// + private const int HeaderLength = 5; + + /// + /// The size of the intermediate in-memory buffers. + /// + private const int InitialBufferSize = 131_072; + + /// + /// The maximum number of bytes to write in a single operation. + /// + private const int MaxPacketWriteSize = 104_8576; + + /// + /// A reusable buffer for reading the packet header. + /// + private readonly byte[] _headerData = new byte[HeaderLength]; + + /// + /// A buffer typically big enough to handle a packet body. + /// We use this as a convenient way to manage and cache a byte[] that's resized + /// automatically to fit our payload. + /// + private readonly MemoryStream _readBuffer = new(InitialBufferSize); + + /// + /// A buffer typically big enough to handle a packet body. + /// We use this as a convenient way to manage and cache a byte[] that's resized + /// automatically to fit our payload. + /// + private readonly MemoryStream _writeBuffer = new(InitialBufferSize); + + private readonly ITranslator _readTranslator; + + private readonly ITranslator _writeTranslator; + + /// + /// The packet factory to be used for deserialization, as packet types may have custom factory logic. + /// + private INodePacketFactory? _packetFactory; + + protected NodePipeBase(string pipeName, Handshake handshake) + { + PipeName = pipeName; + HandshakeComponents = handshake.RetrieveHandshakeComponents(); + _readTranslator = BinaryTranslator.GetReadTranslator(_readBuffer, InterningBinaryReader.CreateSharedBuffer()); + _writeTranslator = BinaryTranslator.GetWriteTranslator(_writeBuffer); + } + + protected abstract PipeStream NodeStream { get; } + + protected string PipeName { get; } + + protected int[] HandshakeComponents { get; } + + public void Dispose() + { + _readBuffer.Dispose(); + _writeBuffer.Dispose(); + _readTranslator.Dispose(); + _writeTranslator.Dispose(); + NodeStream.Dispose(); + } + + internal void RegisterPacketFactory(INodePacketFactory packetFactory) => _packetFactory = packetFactory; + + internal void WritePacket(INodePacket packet) + { + int messageLength = WritePacketToBuffer(packet); + byte[] buffer = _writeBuffer.GetBuffer(); + + for (int i = 0; i < messageLength; i += MaxPacketWriteSize) + { + int lengthToWrite = Math.Min(messageLength - i, MaxPacketWriteSize); + NodeStream.Write(buffer, i, lengthToWrite); + } + } + + internal INodePacket ReadPacket() + { + // Read the header. + int headerBytesRead = Read(_headerData, HeaderLength); + + // When an active connection is broken, any pending read will return 0 bytes before the pipe transitions to + // the broken state. As this is expected behavior, don't throw an exception if no packet is pending, A node + // may disconnect without waiting on the other end to gracefully cancel, and the caller can decide whether + // this was intentional. + if (headerBytesRead == 0) + { + return new NodeShutdown(NodeShutdownReason.ConnectionFailed); + } + else if (headerBytesRead != HeaderLength) + { + throw new IOException($"Incomplete header read. {headerBytesRead} of {HeaderLength} bytes read."); + } + +#if TASKHOST + int packetLength = BitConverter.ToInt32(_headerData, 1); +#else + int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span(_headerData, 1, 4)); + MSBuildEventSource.Log.PacketReadSize(packetLength); +#endif + + // Read the packet. Set the buffer length now to avoid additional resizing during the read. + _readBuffer.Position = 0; + _readBuffer.SetLength(packetLength); + int packetBytesRead = Read(_readBuffer.GetBuffer(), packetLength); + + if (packetBytesRead < packetLength) + { + throw new IOException($"Incomplete packet read. {packetBytesRead} of {packetLength} bytes read."); + } + + return DeserializePacket(); + } + +#if !TASKHOST + internal async Task WritePacketAsync(INodePacket packet, CancellationToken cancellationToken = default) + { + int messageLength = WritePacketToBuffer(packet); + byte[] buffer = _writeBuffer.GetBuffer(); + + for (int i = 0; i < messageLength; i += MaxPacketWriteSize) + { + int lengthToWrite = Math.Min(messageLength - i, MaxPacketWriteSize); +#if NET + await NodeStream.WriteAsync(buffer.AsMemory(i, lengthToWrite), cancellationToken).ConfigureAwait(false); +#else + await NodeStream.WriteAsync(buffer, i, lengthToWrite, cancellationToken).ConfigureAwait(false); +#endif + } + } + + internal async Task ReadPacketAsync(CancellationToken cancellationToken = default) + { + // Read the header. + int headerBytesRead = await ReadAsync(_headerData, HeaderLength, cancellationToken).ConfigureAwait(false); + + // When an active connection is broken, any pending read will return 0 bytes before the pipe transitions to + // the broken state. As this is expected behavior, don't throw an exception if no packet is pending, A node + // may disconnect without waiting on the other end to gracefully cancel, and the caller can decide whether + // this was intentional. + if (headerBytesRead == 0) + { + return new NodeShutdown(NodeShutdownReason.ConnectionFailed); + } + else if (headerBytesRead != HeaderLength) + { + throw new IOException($"Incomplete header read. {headerBytesRead} of {HeaderLength} bytes read."); + } + + int packetLength = BinaryPrimitives.ReadInt32LittleEndian(new Span(_headerData, 1, 4)); + MSBuildEventSource.Log.PacketReadSize(packetLength); + + // Read the packet. Set the buffer length now to avoid additional resizing during the read. + _readBuffer.Position = 0; + _readBuffer.SetLength(packetLength); + int packetBytesRead = await ReadAsync(_readBuffer.GetBuffer(), packetLength, cancellationToken).ConfigureAwait(false); + + if (packetBytesRead < packetLength) + { + throw new IOException($"Incomplete packet read. {packetBytesRead} of {packetLength} bytes read."); + } + + return DeserializePacket(); + } +#endif + + private int WritePacketToBuffer(INodePacket packet) + { + // Clear the buffer but keep the underlying capacity to avoid reallocations. + _writeBuffer.SetLength(HeaderLength); + _writeBuffer.Position = HeaderLength; + + // Serialize and write the packet to the buffer. + packet.Translate(_writeTranslator); + + // Write the header to the buffer. + _writeBuffer.Position = 0; + _writeBuffer.WriteByte((byte)packet.Type); + int messageLength = (int)_writeBuffer.Length; + _writeTranslator.Writer.Write(messageLength - HeaderLength); + + return messageLength; + } + + private int Read(byte[] buffer, int bytesToRead) + { + int totalBytesRead = 0; + while (totalBytesRead < bytesToRead) + { + int bytesRead = NodeStream.Read(buffer, totalBytesRead, bytesToRead - totalBytesRead); + + // 0 byte read will occur if the pipe disconnects. + if (bytesRead == 0) + { + break; + } + + totalBytesRead += bytesRead; + } + + return totalBytesRead; + } + +#if !TASKHOST + private async ValueTask ReadAsync(byte[] buffer, int bytesToRead, CancellationToken cancellationToken) + { + int totalBytesRead = 0; + while (totalBytesRead < bytesToRead) + { +#if NET + int bytesRead = await NodeStream.ReadAsync(buffer.AsMemory(totalBytesRead, bytesToRead - totalBytesRead), cancellationToken).ConfigureAwait(false); +#else + int bytesRead = await NodeStream.ReadAsync(buffer, totalBytesRead, bytesToRead - totalBytesRead, cancellationToken).ConfigureAwait(false); +#endif + + // 0 byte read will occur if the pipe disconnects. + if (bytesRead == 0) + { + break; + } + + totalBytesRead += bytesRead; + } + + return totalBytesRead; + } +#endif + + private INodePacket DeserializePacket() + { + if (_packetFactory == null) + { + throw new InternalErrorException("No packet factory is registered for deserialization."); + } + + NodePacketType packetType = (NodePacketType)_headerData[0]; + try + { + return _packetFactory.DeserializePacket(packetType, _readTranslator); + } + catch (Exception e) when (e is not InternalErrorException) + { + throw new InternalErrorException($"Exception while deserializing packet {packetType}: {e}"); + } + } + } +} diff --git a/src/Shared/NodePipeClient.cs b/src/Shared/NodePipeClient.cs new file mode 100644 index 00000000000..6be1e0e422b --- /dev/null +++ b/src/Shared/NodePipeClient.cs @@ -0,0 +1,90 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.IO; +using System.IO.Pipes; +using System.Security.Principal; +using Microsoft.Build.BackEnd; + +namespace Microsoft.Build.Internal +{ + internal sealed class NodePipeClient : NodePipeBase + { + /// + /// If true, sets a timeout for the handshake. This is only used on Unix-like socket implementations, because the + /// timeout on the PipeStream connection is ignore. + /// + private static readonly bool s_useHandhakeTimeout = !NativeMethodsShared.IsWindows; + + private readonly NamedPipeClientStream _pipeClient; + + internal NodePipeClient(string pipeName, Handshake handshake) + : base(pipeName, handshake) => +#pragma warning disable SA1111, SA1009 // Closing parenthesis should be on line of last parameter + _pipeClient = new( + serverName: ".", + pipeName, + PipeDirection.InOut, + PipeOptions.Asynchronous +#if FEATURE_PIPEOPTIONS_CURRENTUSERONLY + | PipeOptions.CurrentUserOnly +#endif + ); +#pragma warning restore SA1111, SA1009 // Closing parenthesis should be on line of last parameter + + protected override PipeStream NodeStream => _pipeClient; + + internal void ConnectToServer(int timeout) + { + CommunicationsUtilities.Trace("Attempting connect to pipe {0} with timeout {1} ms", PipeName, timeout); + _pipeClient.Connect(timeout); +#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY + // Verify that the owner of the pipe is us. This prevents a security hole where a remote node has + // been faked up with ACLs that would let us attach to it. It could then issue fake build requests back to + // us, potentially causing us to execute builds that do harmful or unexpected things. The pipe owner can + // only be set to the user's own SID by a normal, unprivileged process. The conditions where a faked up + // remote node could set the owner to something else would also let it change owners on other objects, so + // this would be a security flaw upstream of us. + ValidateRemotePipeOwner(); +#endif + PerformHandshake(s_useHandhakeTimeout ? timeout : 0); + CommunicationsUtilities.Trace("Successfully connected to pipe {0}...!", PipeName); + } + +#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY + // This code needs to be in a separate method so that we don't try (and fail) to load the Windows-only APIs when JIT-ing the code + // on non-Windows operating systems + private void ValidateRemotePipeOwner() + { + SecurityIdentifier identifier = WindowsIdentity.GetCurrent().Owner; + PipeSecurity remoteSecurity = _pipeClient.GetAccessControl(); + IdentityReference remoteOwner = remoteSecurity.GetOwner(typeof(SecurityIdentifier)); + + if (remoteOwner != identifier) + { + CommunicationsUtilities.Trace("The remote pipe owner {0} does not match {1}", remoteOwner.Value, identifier.Value); + throw new UnauthorizedAccessException(); + } + } +#endif + + /// + /// Connect to named pipe stream and ensure validate handshake and security. + /// + private void PerformHandshake(int timeout) + { + for (int i = 0; i < HandshakeComponents.Length; i++) + { + CommunicationsUtilities.Trace("Writing handshake part {0} ({1}) to pipe {2}", i, HandshakeComponents[i], PipeName); + _pipeClient.WriteIntForHandshake(HandshakeComponents[i]); + } + + // This indicates that we have finished all the parts of our handshake; hopefully the endpoint has as well. + _pipeClient.WriteEndOfHandshakeSignal(); + + CommunicationsUtilities.Trace("Reading handshake from pipe {0}", PipeName); + _pipeClient.ReadEndOfHandshakeSignal(true, timeout); + } + } +} diff --git a/src/Shared/NodePipeServer.cs b/src/Shared/NodePipeServer.cs new file mode 100644 index 00000000000..91fba144c52 --- /dev/null +++ b/src/Shared/NodePipeServer.cs @@ -0,0 +1,220 @@ +// Licensed to the .NET Foundation under one or more agreements. +// The .NET Foundation licenses this file to you under the MIT license. + +using System; +using System.IO; +using System.IO.Pipes; +using System.Security.AccessControl; +using System.Security.Principal; +using Microsoft.Build.BackEnd; +using Microsoft.Build.Shared; + +#if !TASKHOST +using System.Threading.Tasks; +#endif + +namespace Microsoft.Build.Internal +{ + internal sealed class NodePipeServer : NodePipeBase + { + /// + /// The size of kernel-level buffers used by the named pipe. If the total size of pending reads or write requests exceed + /// this amount (known as the quota), IO will block until either pending operations complete, or the OS increases the quota. + /// + private const int PipeBufferSize = 131_072; + + /// + /// A timeout for the handshake. This is only used on Unix-like socket implementations, because the + /// timeout on the PipeStream connection is ignore. + /// + private static readonly int s_handshakeTimeout = NativeMethodsShared.IsWindows ? 0 : 60_000; + + private readonly NamedPipeServerStream _pipeServer; + + internal NodePipeServer(string pipeName, Handshake handshake, int maxNumberOfServerInstances = 1) + : base(pipeName, handshake) + { + PipeOptions pipeOptions = PipeOptions.Asynchronous; +#if FEATURE_PIPEOPTIONS_CURRENTUSERONLY + pipeOptions |= PipeOptions.CurrentUserOnly; +#else + // Restrict access to just this account. We set the owner specifically here, and on the + // pipe client side they will check the owner against this one - they must have identical + // SIDs or the client will reject this server. This is used to avoid attacks where a + // hacked server creates a less restricted pipe in an attempt to lure us into using it and + // then sending build requests to the real pipe client (which is the MSBuild Build Manager.) + PipeAccessRule rule = new(WindowsIdentity.GetCurrent().Owner, PipeAccessRights.ReadWrite, AccessControlType.Allow); + PipeSecurity security = new(); + security.AddAccessRule(rule); + security.SetOwner(rule.IdentityReference); +#endif + + _pipeServer = new NamedPipeServerStream( + pipeName, + PipeDirection.InOut, + maxNumberOfServerInstances, + PipeTransmissionMode.Byte, + pipeOptions, + inBufferSize: PipeBufferSize, + outBufferSize: PipeBufferSize +#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY + , security, + HandleInheritability.None +#endif +#pragma warning disable SA1111 // Closing parenthesis should be on line of last parameter + ); +#pragma warning restore SA1111 // Closing parenthesis should be on line of last parameter + } + + protected override PipeStream NodeStream => _pipeServer; + + internal LinkStatus WaitForConnection() + { + DateTime originalWaitStartTime = DateTime.UtcNow; + bool gotValidConnection = false; + + while (!gotValidConnection) + { + gotValidConnection = true; + DateTime restartWaitTime = DateTime.UtcNow; + + // We only wait to wait the difference between now and the last original start time, in case we have multiple hosts attempting + // to attach. This prevents each attempt from resetting the timer. + TimeSpan usedWaitTime = restartWaitTime - originalWaitStartTime; + int waitTimeRemaining = Math.Max(0, CommunicationsUtilities.NodeConnectionTimeout - (int)usedWaitTime.TotalMilliseconds); + + try + { + // Wait for a connection +#if TASKHOST + IAsyncResult resultForConnection = _pipeServer.BeginWaitForConnection(null, null); + CommunicationsUtilities.Trace("Waiting for connection {0} ms...", waitTimeRemaining); + bool connected = resultForConnection.AsyncWaitHandle.WaitOne(waitTimeRemaining, false); + _pipeServer.EndWaitForConnection(resultForConnection); +#else + Task connectionTask = _pipeServer.WaitForConnectionAsync(); + CommunicationsUtilities.Trace("Waiting for connection {0} ms...", waitTimeRemaining); + bool connected = connectionTask.Wait(waitTimeRemaining); +#endif + if (!connected) + { + CommunicationsUtilities.Trace("Connection timed out waiting a host to contact us. Exiting comm thread."); + return LinkStatus.ConnectionFailed; + } + + CommunicationsUtilities.Trace("Parent started connecting. Reading handshake from parent"); + + // The handshake protocol is a series of int exchanges. The host sends us a each component, and we + // verify it. Afterwards, the host sends an "End of Handshake" signal, to which we respond in kind. + // Once the handshake is complete, both sides can be assured the other is ready to accept data. + try + { + gotValidConnection = ValidateHandshake(); +#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY + gotValidConnection &= ValidateClientIdentity(); +#endif + } + catch (IOException e) + { + // We will get here when: + // 1. The host (OOP main node) connects to us, it immediately checks for user privileges + // and if they don't match it disconnects immediately leaving us still trying to read the blank handshake + // 2. The host is too old sending us bits we automatically reject in the handshake + // 3. We expected to read the EndOfHandshake signal, but we received something else + CommunicationsUtilities.Trace("Client connection failed but we will wait for another connection. Exception: {0}", e.Message); + gotValidConnection = false; + } + catch (InvalidOperationException) + { + gotValidConnection = false; + } + + if (!gotValidConnection && _pipeServer.IsConnected) + { + _pipeServer.Disconnect(); + } + } + catch (Exception e) when (!ExceptionHandling.IsCriticalException(e)) + { + CommunicationsUtilities.Trace("Client connection failed. Exiting comm thread. {0}", e); + if (_pipeServer.IsConnected) + { + _pipeServer.Disconnect(); + } + + ExceptionHandling.DumpExceptionToFile(e); + return LinkStatus.Failed; + } + } + + return LinkStatus.Active; + } + + internal void Disconnect() + { + try + { + if (_pipeServer.IsConnected) + { +#if NET // OperatingSystem.IsWindows() is new in .NET 5.0 + if (OperatingSystem.IsWindows()) +#endif + { + _pipeServer.WaitForPipeDrain(); + } + + _pipeServer.Disconnect(); + } + } + catch (Exception) + { + // We don't really care if Disconnect somehow fails, but it gives us a chance to do the right thing. + } + } + + private bool ValidateHandshake() + { + for (int i = 0; i < HandshakeComponents.Length; i++) + { + // This will disconnect a < 16.8 host; it expects leading 00 or F5 or 06. 0x00 is a wildcard. + int handshakePart = _pipeServer.ReadIntForHandshake(byteToAccept: i == 0 ? CommunicationsUtilities.handshakeVersion : null, s_handshakeTimeout); + + if (handshakePart != HandshakeComponents[i]) + { + CommunicationsUtilities.Trace("Handshake failed. Received {0} from host not {1}. Probably the host is a different MSBuild build.", handshakePart, HandshakeComponents[i]); + _pipeServer.WriteIntForHandshake(i + 1); + return false; + } + } + + // To ensure that our handshake and theirs have the same number of bytes, receive and send a magic number indicating EOS. + _pipeServer.ReadEndOfHandshakeSignal(false, s_handshakeTimeout); + + CommunicationsUtilities.Trace("Successfully connected to parent."); + _pipeServer.WriteEndOfHandshakeSignal(); + + return true; + } + +#if !FEATURE_PIPEOPTIONS_CURRENTUSERONLY + private bool ValidateClientIdentity() + { + // We will only talk to a host that was started by the same user as us. Even though the pipe access is set to only allow this user, we want to ensure they + // haven't attempted to change those permissions out from under us. This ensures that the only way they can truly gain access is to be impersonating the + // user we were started by. + WindowsIdentity currentIdentity = WindowsIdentity.GetCurrent(); + WindowsIdentity? clientIdentity = null; + _pipeServer.RunAsClient(() => { clientIdentity = WindowsIdentity.GetCurrent(true); }); + + if (clientIdentity == null || !string.Equals(clientIdentity.Name, currentIdentity.Name, StringComparison.OrdinalIgnoreCase)) + { + CommunicationsUtilities.Trace("Handshake failed. Host user is {0} but we were created by {1}.", (clientIdentity == null) ? "" : clientIdentity.Name, currentIdentity.Name); + return false; + } + + return true; + } +#endif + + } +}