Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions src/Runner.Common/JobServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,7 @@ public interface IJobServer : IRunnerService, IAsyncDisposable
Task<TaskLog> AppendLogContentAsync(Guid scopeIdentifier, string hubName, Guid planId, int logId, Stream uploadStream, CancellationToken cancellationToken);
Task AppendTimelineRecordFeedAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, Guid stepId, IList<string> lines, long? startLine, CancellationToken cancellationToken);
Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, Guid timelineRecordId, String type, String name, Stream uploadStream, CancellationToken cancellationToken);
Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken);
Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken);
Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken);
Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken);
Task<Timeline> CreateTimelineAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, CancellationToken cancellationToken);
Task<List<TimelineRecord>> UpdateTimelineRecordsAsync(Guid scopeIdentifier, string hubName, Guid planId, Guid timelineId, IEnumerable<TimelineRecord> records, CancellationToken cancellationToken);
Expand Down Expand Up @@ -317,7 +316,7 @@ public Task<TaskAttachment> CreateAttachmentAsync(Guid scopeIdentifier, string h
return _taskClient.CreateAttachmentAsync(scopeIdentifier, hubName, planId, timelineId, timelineRecordId, type, name, uploadStream, cancellationToken: cancellationToken);
}

public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, string file, CancellationToken cancellationToken)
public Task CreateStepSymmaryAsync(string planId, string jobId, string stepId, string file, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
Expand All @@ -326,15 +325,6 @@ public Task CreateStepSummaryAsync(string planId, string jobId, Guid stepId, str
throw new InvalidOperationException("Results client is not initialized.");
}

public Task CreateResultsStepLogAsync(string planId, string jobId, Guid stepId, string file, bool finalize, bool firstBlock, long lineCount, CancellationToken cancellationToken)
{
if (_resultsClient != null)
{
return _resultsClient.UploadResultsStepLogAsync(planId, jobId, stepId, file, finalize, firstBlock, lineCount, cancellationToken: cancellationToken);
}
throw new InvalidOperationException("Results client is not initialized.");
}


public Task<TaskLog> CreateLogAsync(Guid scopeIdentifier, string hubName, Guid planId, TaskLog log, CancellationToken cancellationToken)
{
Expand Down
116 changes: 33 additions & 83 deletions src/Runner.Common/JobServerQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ public interface IJobServerQueue : IRunnerService, IThrottlingReporter
void Start(Pipelines.AgentJobRequestMessage jobRequest);
void QueueWebConsoleLine(Guid stepRecordId, string line, long? lineNumber = null);
void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type, string name, string path, bool deleteSource);
void QueueResultsUpload(Guid timelineRecordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines = 0);
void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource);
void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord);
}

Expand All @@ -31,7 +31,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
private static readonly TimeSpan _delayForWebConsoleLineDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForTimelineUpdateDequeue = TimeSpan.FromMilliseconds(500);
private static readonly TimeSpan _delayForFileUploadDequeue = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan _delayForResultsUploadDequeue = TimeSpan.FromMilliseconds(1000);
private static readonly TimeSpan _delayForSummaryUploadDequeue = TimeSpan.FromMilliseconds(1000);

// Job message information
private Guid _scopeIdentifier;
Expand All @@ -46,7 +46,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
// queue for file upload (log file or attachment)
private readonly ConcurrentQueue<UploadFileInfo> _fileUploadQueue = new();

private readonly ConcurrentQueue<ResultsUploadFileInfo> _resultsFileUploadQueue = new();
private readonly ConcurrentQueue<SummaryUploadFileInfo> _summaryFileUploadQueue = new();

// queue for timeline or timeline record update (one queue per timeline)
private readonly ConcurrentDictionary<Guid, ConcurrentQueue<TimelineRecord>> _timelineUpdateQueue = new();
Expand All @@ -60,7 +60,7 @@ public sealed class JobServerQueue : RunnerService, IJobServerQueue
// Task for each queue's dequeue process
private Task _webConsoleLineDequeueTask;
private Task _fileUploadDequeueTask;
private Task _resultsUploadDequeueTask;
private Task _summaryUploadDequeueTask;
private Task _timelineUpdateDequeueTask;

// common
Expand Down Expand Up @@ -140,12 +140,12 @@ public void Start(Pipelines.AgentJobRequestMessage jobRequest)
_fileUploadDequeueTask = ProcessFilesUploadQueueAsync();

Trace.Info("Start results file upload queue.");
_resultsUploadDequeueTask = ProcessResultsUploadQueueAsync();
_summaryUploadDequeueTask = ProcessSummaryUploadQueueAsync();

Trace.Info("Start process timeline update queue.");
_timelineUpdateDequeueTask = ProcessTimelinesUpdateQueueAsync();

_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _resultsUploadDequeueTask };
_allDequeueTasks = new Task[] { _webConsoleLineDequeueTask, _fileUploadDequeueTask, _timelineUpdateDequeueTask, _summaryUploadDequeueTask };
_queueInProcess = true;
}

Expand Down Expand Up @@ -176,9 +176,9 @@ public async Task ShutdownAsync()
await ProcessFilesUploadQueueAsync(runOnce: true);
Trace.Info("File upload queue drained.");

Trace.Verbose("Draining results upload queue.");
await ProcessResultsUploadQueueAsync(runOnce: true);
Trace.Info("Results upload queue drained.");
Trace.Verbose("Draining results summary upload queue.");
await ProcessSummaryUploadQueueAsync(runOnce: true);
Trace.Info("Results summary upload queue drained.");

// ProcessTimelinesUpdateQueueAsync() will throw exception during shutdown
// if there is any timeline records that failed to update contains output variabls.
Expand Down Expand Up @@ -230,31 +230,21 @@ public void QueueFileUpload(Guid timelineId, Guid timelineRecordId, string type,
_fileUploadQueue.Enqueue(newFile);
}

public void QueueResultsUpload(Guid recordId, string name, string path, string type, bool deleteSource, bool finalize, bool firstBlock, long totalLines)
public void QueueSummaryUpload(Guid stepRecordId, string name, string path, bool deleteSource)
{
if (recordId == _jobTimelineRecordId)
{
Trace.Verbose("Skipping job log {0} for record {1}", path, recordId);
return;
}

// all parameter not null, file path exist.
var newFile = new ResultsUploadFileInfo()
var newFile = new SummaryUploadFileInfo()
{
Name = name,
Path = path,
Type = type,
PlanId = _planId.ToString(),
JobId = _jobTimelineRecordId.ToString(),
RecordId = recordId,
DeleteSource = deleteSource,
Finalize = finalize,
FirstBlock = firstBlock,
TotalLines = totalLines,
StepId = stepRecordId.ToString(),
DeleteSource = deleteSource
};

Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, recordId);
_resultsFileUploadQueue.Enqueue(newFile);
Trace.Verbose("Enqueue results file upload queue: file '{0}' attach to job {1} step {2}", newFile.Path, _jobTimelineRecordId, stepRecordId);
_summaryFileUploadQueue.Enqueue(newFile);
}

public void QueueTimelineRecordUpdate(Guid timelineId, TimelineRecord timelineRecord)
Expand Down Expand Up @@ -447,18 +437,18 @@ private async Task ProcessFilesUploadQueueAsync(bool runOnce = false)
}
}

private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
private async Task ProcessSummaryUploadQueueAsync(bool runOnce = false)
{
Trace.Info("Starting results-based upload queue...");

while (!_jobCompletionSource.Task.IsCompleted || runOnce)
{
List<ResultsUploadFileInfo> filesToUpload = new();
ResultsUploadFileInfo dequeueFile;
while (_resultsFileUploadQueue.TryDequeue(out dequeueFile))
List<SummaryUploadFileInfo> filesToUpload = new();
SummaryUploadFileInfo dequeueFile;
while (_summaryFileUploadQueue.TryDequeue(out dequeueFile))
{
filesToUpload.Add(dequeueFile);
// process at most 10 file uploads.
// process at most 10 file upload.
if (!runOnce && filesToUpload.Count > 10)
{
break;
Expand All @@ -469,30 +459,19 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
{
if (runOnce)
{
Trace.Info($"Uploading {filesToUpload.Count} file(s) in one shot through results service.");
Trace.Info($"Uploading {filesToUpload.Count} summary files in one shot through results service.");
}

int errorCount = 0;
foreach (var file in filesToUpload)
{
try
{
if (String.Equals(file.Type, ChecksAttachmentType.StepSummary, StringComparison.OrdinalIgnoreCase))
{
await UploadSummaryFile(file);
}
else if (String.Equals(file.Type, CoreAttachmentType.ResultsLog, StringComparison.OrdinalIgnoreCase))
{
if (file.RecordId != _jobTimelineRecordId)
{
Trace.Info($"Got a step log file to send to results service.");
await UploadResultsStepLogFile(file);
}
}
await UploadSummaryFile(file);
}
catch (Exception ex)
{
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during file upload to results. {ex.Message}" };
var issue = new Issue() { Type = IssueType.Warning, Message = $"Caught exception during summary file upload to results. {ex.Message}" };
issue.Data[Constants.Runner.InternalTelemetryIssueDataKey] = Constants.Runner.ResultsUploadFailure;

var telemetryRecord = new TimelineRecord()
Expand All @@ -502,13 +481,16 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
telemetryRecord.Issues.Add(issue);
QueueTimelineRecordUpdate(_jobTimelineId, telemetryRecord);

Trace.Info("Catch exception during file upload to results, keep going since the process is best effort.");
Trace.Info("Catch exception during summary file upload to results, keep going since the process is best effort.");
Trace.Error(ex);
}
finally
{
errorCount++;
}
}

Trace.Info("Tried to upload {0} file(s) to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
Trace.Info("Tried to upload {0} summary files to results, success rate: {1}/{0}.", filesToUpload.Count, filesToUpload.Count - errorCount);
}

if (runOnce)
Expand All @@ -517,7 +499,7 @@ private async Task ProcessResultsUploadQueueAsync(bool runOnce = false)
}
else
{
await Task.Delay(_delayForResultsUploadDequeue);
await Task.Delay(_delayForSummaryUploadDequeue);
}
}
}
Expand Down Expand Up @@ -794,15 +776,15 @@ private async Task UploadFile(UploadFileInfo file)
}
}

private async Task UploadSummaryFile(ResultsUploadFileInfo file)
private async Task UploadSummaryFile(SummaryUploadFileInfo file)
{
bool uploadSucceed = false;
try
{
// Upload the step summary
Trace.Info($"Starting to upload summary file to results service {file.Name}, {file.Path}");
var cancellationTokenSource = new CancellationTokenSource();
await _jobServer.CreateStepSummaryAsync(file.PlanId, file.JobId, file.RecordId, file.Path, cancellationTokenSource.Token);
await _jobServer.CreateStepSymmaryAsync(file.PlanId, file.JobId, file.StepId, file.Path, cancellationTokenSource.Token);

uploadSucceed = true;
}
Expand All @@ -822,34 +804,6 @@ private async Task UploadSummaryFile(ResultsUploadFileInfo file)
}
}
}

private async Task UploadResultsStepLogFile(ResultsUploadFileInfo file)
{
bool uploadSucceed = false;
try
{
Trace.Info($"Starting upload of step log file to results service {file.Name}, {file.Path}");
var cancellationTokenSource = new CancellationTokenSource();
await _jobServer.CreateResultsStepLogAsync(file.PlanId, file.JobId, file.RecordId, file.Path, file.Finalize, file.FirstBlock, file.TotalLines, cancellationTokenSource.Token);

uploadSucceed = true;
}
finally
{
if (uploadSucceed && file.DeleteSource)
{
try
{
File.Delete(file.Path);
}
catch (Exception ex)
{
Trace.Info("Exception encountered during deletion of a temporary file that was already successfully uploaded to results.");
Trace.Error(ex);
}
}
}
}
}

internal class PendingTimelineRecord
Expand All @@ -868,18 +822,14 @@ internal class UploadFileInfo
public bool DeleteSource { get; set; }
}

internal class ResultsUploadFileInfo
internal class SummaryUploadFileInfo
{
public string Name { get; set; }
public string Type { get; set; }
public string Path { get; set; }
public string PlanId { get; set; }
public string JobId { get; set; }
public Guid RecordId { get; set; }
public string StepId { get; set; }
public bool DeleteSource { get; set; }
public bool Finalize { get; set; }
public bool FirstBlock { get; set; }
public long TotalLines { get; set; }
}


Expand Down
Loading