Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upload unfinished job/step logs from runner on cancellation timeout #255

Merged
merged 1 commit into from
Jan 6, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions src/Runner.Common/Logging.cs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ public class PagingLogger : RunnerService, IPagingLogger

private Guid _timelineId;
private Guid _timelineRecordId;
private string _pageId;
private FileStream _pageData;
private StreamWriter _pageWriter;
private int _byteCount;
Expand All @@ -40,7 +39,6 @@ public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
_totalLines = 0;
_pageId = Guid.NewGuid().ToString();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we don't really need a new guid for each page, we can leverage the timeline record id for each steps, those are already unique per step.

_pagesFolder = Path.Combine(hostContext.GetDirectory(WellKnownDirectory.Diag), PagingFolder);
_jobServerQueue = HostContext.GetService<IJobServerQueue>();
Directory.CreateDirectory(_pagesFolder);
Expand Down Expand Up @@ -102,7 +100,7 @@ private void NewPage()
{
EndPage();
_byteCount = 0;
_dataFileName = Path.Combine(_pagesFolder, $"{_pageId}_{++_pageCount}.log");
_dataFileName = Path.Combine(_pagesFolder, $"{_timelineRecordId}_{++_pageCount}.log");
_pageData = new FileStream(_dataFileName, FileMode.CreateNew);
_pageWriter = new StreamWriter(_pageData, System.Text.Encoding.UTF8);
}
Expand Down
114 changes: 114 additions & 0 deletions src/Runner.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -568,6 +568,10 @@ await processChannel.SendAsync(
{
Trace.Info("worker process has been killed.");
}

// When worker doesn't exit within cancel timeout, the runner will kill the worker process and worker won't finish upload job logs.
// The runner will try to upload these logs at this time.
await TryUploadUnfinishedLogs(message);
}

Trace.Info($"finish job request for job {message.JobId} with result: {resultOnAbandonOrCancel}");
Expand Down Expand Up @@ -712,6 +716,116 @@ public async Task RenewJobRequestAsync(int poolId, long requestId, Guid lockToke
}
}

// Best effort upload any logs for this job.
private async Task TryUploadUnfinishedLogs(Pipelines.AgentJobRequestMessage message)
{
Trace.Entering();

var logFolder = Path.Combine(HostContext.GetDirectory(WellKnownDirectory.Diag), PagingLogger.PagingFolder);
if (!Directory.Exists(logFolder))
{
return;
}

var logs = Directory.GetFiles(logFolder);
if (logs.Length == 0)
{
return;
}

try
{
var systemConnection = message.Resources.Endpoints.SingleOrDefault(x => string.Equals(x.Name, WellKnownServiceEndpointNames.SystemVssConnection));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

pull the cred from the job message, we will use this cred to talk to actions service and upload the log.

ArgUtil.NotNull(systemConnection, nameof(systemConnection));

var jobServer = HostContext.GetService<IJobServer>();
VssCredentials jobServerCredential = VssUtil.GetVssCredential(systemConnection);
VssConnection jobConnection = VssUtil.CreateConnection(systemConnection.Url, jobServerCredential);

await jobServer.ConnectAsync(jobConnection);

var timeline = await jobServer.GetTimelineAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, CancellationToken.None);

var updatedRecords = new List<TimelineRecord>();
var logPages = new Dictionary<Guid, Dictionary<int, string>>();
var logRecords = new Dictionary<Guid, TimelineRecord>();
foreach (var log in logs)
{
var logName = Path.GetFileNameWithoutExtension(log);
var logPageSeperator = logName.IndexOf('_');
var logRecordId = Guid.Empty;
var pageNumber = 0;
if (logPageSeperator < 0)
{
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_INT'.");
continue;
}
else
{
if (!Guid.TryParse(logName.Substring(0, logPageSeperator), out logRecordId))
{
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_INT'.");
continue;
}

if (!int.TryParse(logName.Substring(logPageSeperator + 1), out pageNumber))
{
Trace.Warning($"log file '{log}' doesn't follow naming convension 'GUID_INT'.");
continue;
}
}

var record = timeline.Records.FirstOrDefault(x => x.Id == logRecordId);
if (record != null)
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We only upload log files to timeline records for this job.

{
if (!logPages.ContainsKey(record.Id))
{
logPages[record.Id] = new Dictionary<int, string>();
logRecords[record.Id] = record;
}

logPages[record.Id][pageNumber] = log;
}
}

foreach (var pages in logPages)
{
var record = logRecords[pages.Key];
if (record.Log == null)
{
// Create the log
record.Log = await jobServer.CreateLogAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, new TaskLog(String.Format(@"logs\{0:D}", record.Id)), default(CancellationToken));

// Need to post timeline record updates to reflect the log creation
updatedRecords.Add(record.Clone());
}

for (var i = 1; i <= pages.Value.Count; i++)
{
var logFile = pages.Value[i];
// Upload the contents
using (FileStream fs = File.Open(logFile, FileMode.Open, FileAccess.Read, FileShare.ReadWrite))
{
var logUploaded = await jobServer.AppendLogContentAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, record.Log.Id, fs, default(CancellationToken));
}

Trace.Info($"Uploaded unfinished log '{logFile}' for current job.");
IOUtil.DeleteFile(logFile);
}
}

if (updatedRecords.Count > 0)
{
await jobServer.UpdateTimelineRecordsAsync(message.Plan.ScopeIdentifier, message.Plan.PlanType, message.Plan.PlanId, message.Timeline.Id, updatedRecords, CancellationToken.None);
}
}
catch (Exception ex)
{
// Ignore any error during log upload since it's best effort
Trace.Error(ex);
}
}

// TODO: We need send detailInfo back to DT in order to add an issue for the job
private async Task CompleteJobRequestAsync(int poolId, Pipelines.AgentJobRequestMessage message, Guid lockToken, TaskResult result, string detailInfo = null)
{
Expand Down