From b5a4ceff9dac74a367bb5407546ae62676189663 Mon Sep 17 00:00:00 2001 From: Tingluo Huang Date: Mon, 6 Jan 2020 13:04:23 -0500 Subject: [PATCH] upload log on runner force kill worker. (#255) --- src/Runner.Common/Logging.cs | 4 +- src/Runner.Listener/JobDispatcher.cs | 114 +++++++++++++++++++++++++++ 2 files changed, 115 insertions(+), 3 deletions(-) diff --git a/src/Runner.Common/Logging.cs b/src/Runner.Common/Logging.cs index 26a25d64482..2abf9b54835 100644 --- a/src/Runner.Common/Logging.cs +++ b/src/Runner.Common/Logging.cs @@ -24,7 +24,6 @@ public class PagingLogger : RunnerService, IPagingLogger private Guid _timelineId; private Guid _timelineRecordId; - private string _pageId; private FileStream _pageData; private StreamWriter _pageWriter; private int _byteCount; @@ -40,7 +39,6 @@ public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); _totalLines = 0; - _pageId = Guid.NewGuid().ToString(); _pagesFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), PagingFolder); _jobServerQueue = HostContext.GetService(); Directory.CreateDirectory(_pagesFolder); @@ -102,7 +100,7 @@ private void NewPage() { EndPage(); _byteCount = 0; - _dataFileName = Path.Combine(_pagesFolder, $"{_pageId}_{++_pageCount}.log"); + _dataFileName = Path.Combine(_pagesFolder, $"{_timelineRecordId}_{++_pageCount}.log"); _pageData = new FileStream(_dataFileName, FileMode.CreateNew); _pageWriter = new StreamWriter(_pageData, System.Text.Encoding.UTF8); } diff --git a/src/Runner.Listener/JobDispatcher.cs b/src/Runner.Listener/JobDispatcher.cs index 867fffb3ba0..414246383de 100644 --- a/src/Runner.Listener/JobDispatcher.cs +++ b/src/Runner.Listener/JobDispatcher.cs @@ -568,6 +568,10 @@ await processChannel.SendAsync( { Trace.Info("worker process has been killed."); } + + // When worker doesn't exit within cancel timeout, the runner will kill the worker process and worker won't finish upload job logs. + // The runner will try to upload these logs at this time. + await TryUploadUnfinishedLogs(message); } Trace.Info($"finish job request for job {message.JobId} with result: {resultOnAbandonOrCancel}"); @@ -712,6 +716,116 @@ public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToke } } + // Best effort upload any logs for this job. + private async Task TryUploadUnfinishedLogs(Pipelines.AgentJobRequestMessage message) + { + Trace.Entering(); + + var logFolder = Path.Combine(HostContext.GetDirectory(WellKnownDirectory.Diag), PagingLogger.PagingFolder); + if (!Directory.Exists(logFolder)) + { + return; + } + + var logs = Directory.GetFiles(logFolder); + if (logs.Length == 0) + { + return; + } + + try + { + var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection)); + ArgUtil.NotNull(systemConnection, nameof(systemConnection)); + + var jobServer = HostContext.GetService(); + VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection); + VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential); + + await jobServer.ConnectAsync(jobConnection); + + var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None); + + var updatedRecords = new List(); + var logPages = new Dictionary>(); + var logRecords = new Dictionary(); + foreach (var log in logs) + { + var logName = Path.GetFileNameWithoutExtension(log); + var logPageSeperator = logName.IndexOf('_'); + var logRecordId = Guid.Empty; + var pageNumber = 0; + if (logPageSeperator < 0) + { + Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_INT'."); + continue; + } + else + { + if (!Guid.TryParse(logName.Substring(0, logPageSeperator), out logRecordId)) + { + Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_INT'."); + continue; + } + + if (!int.TryParse(logName.Substring(logPageSeperator + 1), out pageNumber)) + { + Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_INT'."); + continue; + } + } + + var record = timeline.Records.FirstOrDefault(x => x.Id == logRecordId); + if (record != null) + { + if (!logPages.ContainsKey(record.Id)) + { + logPages[record.Id] = new Dictionary(); + logRecords[record.Id] = record; + } + + logPages[record.Id][pageNumber] = log; + } + } + + foreach (var pages in logPages) + { + var record = logRecords[pages.Key]; + if (record.Log == null) + { + // Create the log + record.Log = await jobServer.CreateLogAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, new TaskLog(String.Format(@"logs\{0:D}", record.Id)), default(CancellationToken)); + + // Need to post timeline record updates to reflect the log creation + updatedRecords.Add(record.Clone()); + } + + for (var i = 1; i <= pages.Value.Count; i++) + { + var logFile = pages.Value[i]; + // Upload the contents + using (FileStream fs = File.Open(logFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite)) + { + var logUploaded = await jobServer.AppendLogContentAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, record.Log.Id, fs, default(CancellationToken)); + } + + Trace.Info($"Uploaded unfinished log '{logFile}' for current job."); + IOUtil.DeleteFile(logFile); + } + } + + if (updatedRecords.Count > 0) + { + await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, updatedRecords, CancellationToken.None); + } + } + catch (Exception ex) + { + // Ignore any error during log upload since it's best effort + Trace.Error(ex); + } + } + // TODO: We need send detailInfo back to DT in order to add an issue for the job private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequestMessage message, Guid lockToken, TaskResult result, string detailInfo = null) {