Skip to content

Commit

Permalink
upload log on runner force kill worker. (actions#255)
Browse files Browse the repository at this point in the history
  • Loading branch information
TingluoHuang committed Jan 6, 2020
1 parent 6d9256d commit b5a4cef
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 3 deletions.
4 changes: 1 addition & 3 deletions src/Runner.Common/Logging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class PagingLogger : RunnerService, IPagingLogger

private Guid _timelineId;
private Guid _timelineRecordId;
private string _pageId;
private FileStream _pageData;
private StreamWriter _pageWriter;
private int _byteCount;
Expand All @@ -40,7 +39,6 @@ public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
_totalLines = 0;
_pageId = Guid.NewGuid().ToString();
_pagesFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), PagingFolder);
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
Directory.CreateDirectory(_pagesFolder);
Expand Down Expand Up @@ -102,7 +100,7 @@ private void NewPage()
{
EndPage();
_byteCount = 0;
_dataFileName = Path.Combine(_pagesFolder, $"{_pageId}_{++_pageCount}.log");
_dataFileName = Path.Combine(_pagesFolder, $"{_timelineRecordId}_{++_pageCount}.log");
_pageData = new FileStream(_dataFileName, FileMode.CreateNew);
_pageWriter = new StreamWriter(_pageData, System.Text.Encoding.UTF8);
}
Expand Down
114 changes: 114 additions & 0 deletions src/Runner.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ await processChannel.SendAsync(
{
Trace.Info("worker process has been killed.");
}

// When worker doesn't exit within cancel timeout, the runner will kill the worker process and worker won't finish upload job logs.
// The runner will try to upload these logs at this time.
await TryUploadUnfinishedLogs(message);
}

Trace.Info($"finish job request for job {message.JobId} with result: {resultOnAbandonOrCancel}");
Expand Down Expand Up @@ -712,6 +716,116 @@ public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToke
}
}

// Best effort upload any logs for this job.
private async Task TryUploadUnfinishedLogs(Pipelines.AgentJobRequestMessage message)
{
Trace.Entering();

var logFolder = Path.Combine(HostContext.GetDirectory(WellKnownDirectory.Diag), PagingLogger.PagingFolder);
if (!Directory.Exists(logFolder))
{
return;
}

var logs = Directory.GetFiles(logFolder);
if (logs.Length == 0)
{
return;
}

try
{
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
ArgUtil.NotNull(systemConnection, nameof(systemConnection));

var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential);

await jobServer.ConnectAsync(jobConnection);

var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);

var updatedRecords = new List<TimelineRecord>();
var logPages = new Dictionary<Guid, Dictionary<int, string>>();
var logRecords = new Dictionary<Guid, TimelineRecord>();
foreach (var log in logs)
{
var logName = Path.GetFileNameWithoutExtension(log);
var logPageSeperator = logName.IndexOf('_');
var logRecordId = Guid.Empty;
var pageNumber = 0;
if (logPageSeperator < 0)
{
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_INT'.");
continue;
}
else
{
if (!Guid.TryParse(logName.Substring(0, logPageSeperator), out logRecordId))
{
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_INT'.");
continue;
}

if (!int.TryParse(logName.Substring(logPageSeperator + 1), out pageNumber))
{
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_INT'.");
continue;
}
}

var record = timeline.Records.FirstOrDefault(x => x.Id == logRecordId);
if (record != null)
{
if (!logPages.ContainsKey(record.Id))
{
logPages[record.Id] = new Dictionary<int, string>();
logRecords[record.Id] = record;
}

logPages[record.Id][pageNumber] = log;
}
}

foreach (var pages in logPages)
{
var record = logRecords[pages.Key];
if (record.Log == null)
{
// Create the log
record.Log = await jobServer.CreateLogAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, new TaskLog(String.Format(@"logs\{0:D}", record.Id)), default(CancellationToken));

// Need to post timeline record updates to reflect the log creation
updatedRecords.Add(record.Clone());
}

for (var i = 1; i <= pages.Value.Count; i++)
{
var logFile = pages.Value[i];
// Upload the contents
using (FileStream fs = File.Open(logFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
var logUploaded = await jobServer.AppendLogContentAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, record.Log.Id, fs, default(CancellationToken));
}

Trace.Info($"Uploaded unfinished log '{logFile}' for current job.");
IOUtil.DeleteFile(logFile);
}
}

if (updatedRecords.Count > 0)
{
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, updatedRecords, CancellationToken.None);
}
}
catch (Exception ex)
{
// Ignore any error during log upload since it's best effort
Trace.Error(ex);
}
}

// TODO: We need send detailInfo back to DT in order to add an issue for the job
private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequestMessage message, Guid lockToken, TaskResult result, string detailInfo = null)
{
Expand Down

0 comments on commit b5a4cef

Please sign in to comment.