Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions src/Runner.Common/RunServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ public interface IRunServer : IRunnerService
{
Task ConnectAsync(Uri serverUrl, VssCredentials credentials);

Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken token);
Task<AgentJobRequestMessage> GetJobMessageAsync(string id, string billingOwnerId, CancellationToken token);

Task CompleteJobAsync(
Guid planId,
Expand All @@ -29,6 +29,7 @@ Task CompleteJobAsync(
IList<Annotation> jobAnnotations,
string environmentUrl,
IList<Telemetry> telemetry,
string billingOwnerId,
CancellationToken token);

Task<RenewJobResponse> RenewJobAsync(Guid planId, Guid jobId, CancellationToken token);
Expand Down Expand Up @@ -58,11 +59,11 @@ private void CheckConnection()
}
}

public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, CancellationToken cancellationToken)
public Task<AgentJobRequestMessage> GetJobMessageAsync(string id, string billingOwnerId, CancellationToken cancellationToken)
{
CheckConnection();
return RetryRequest<AgentJobRequestMessage>(
async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, VarUtil.OS, cancellationToken), cancellationToken,
async () => await _runServiceHttpClient.GetJobMessageAsync(requestUri, id, VarUtil.OS, billingOwnerId, cancellationToken), cancellationToken,
shouldRetry: ex =>
ex is not TaskOrchestrationJobNotFoundException && // HTTP status 404
ex is not TaskOrchestrationJobAlreadyAcquiredException && // HTTP status 409
Expand All @@ -78,11 +79,12 @@ public Task CompleteJobAsync(
IList<Annotation> jobAnnotations,
string environmentUrl,
IList<Telemetry> telemetry,
string billingOwnerId,
CancellationToken cancellationToken)
{
CheckConnection();
return RetryRequest(
async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, result, outputs, stepResults, jobAnnotations, environmentUrl, telemetry, cancellationToken), cancellationToken);
async () => await _runServiceHttpClient.CompleteJobAsync(requestUri, planId, jobId, result, outputs, stepResults, jobAnnotations, environmentUrl, telemetry, billingOwnerId, cancellationToken), cancellationToken);
}

public Task<RenewJobResponse> RenewJobAsync(Guid planId, Guid jobId, CancellationToken cancellationToken)
Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1206,7 +1206,7 @@ private async Task ForceFailJob(IRunServer runServer, Pipelines.AgentJobRequestM
jobAnnotations.Add(annotation.Value);
}

await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, environmentUrl: null, telemetry: null, CancellationToken.None);
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, TaskResult.Failed, outputs: null, stepResults: null, jobAnnotations: jobAnnotations, environmentUrl: null, telemetry: null, billingOwnerId: message.BillingOwnerId, CancellationToken.None);
}
catch (Exception ex)
{
Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Listener/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -584,7 +584,7 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
await runServer.ConnectAsync(new Uri(messageRef.RunServiceUrl), creds);
try
{
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageQueueLoopTokenSource.Token);
jobRequestMessage = await runServer.GetJobMessageAsync(messageRef.RunnerRequestId, messageRef.BillingOwnerId, messageQueueLoopTokenSource.Token);
_acquireJobThrottler.Reset();
}
catch (Exception ex) when (
Expand Down
5 changes: 5 additions & 0 deletions src/Runner.Listener/RunnerJobRequestRef.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,14 @@ public sealed class RunnerJobRequestRef
{
[DataMember(Name = "id")]
public string Id { get; set; }

[DataMember(Name = "runner_request_id")]
public string RunnerRequestId { get; set; }

[DataMember(Name = "run_service_url")]
public string RunServiceUrl { get; set; }

[DataMember(Name = "billing_owner_id")]
public string BillingOwnerId { get; set; }
}
}
2 changes: 1 addition & 1 deletion src/Runner.Worker/JobRunner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ private async Task<TaskResult> CompleteJobAsync(IRunServer runServer, IExecution
{
try
{
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, result, jobContext.JobOutputs, jobContext.Global.StepsResult, jobContext.Global.JobAnnotations, environmentUrl, telemetry, default);
await runServer.CompleteJobAsync(message.Plan.PlanId, message.JobId, result, jobContext.JobOutputs, jobContext.Global.StepsResult, jobContext.Global.JobAnnotations, environmentUrl, telemetry, billingOwnerId: message.BillingOwnerId, default);
return result;
}
catch (Exception ex)
Expand Down
7 changes: 7 additions & 0 deletions src/Sdk/DTPipelines/Pipelines/AgentJobRequestMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,6 +246,13 @@ public TemplateToken Snapshot
set;
}

[DataMember(EmitDefaultValue = false)]
public String BillingOwnerId
{
get;
set;
}

/// <summary>
/// Gets the collection of variables associated with the current context.
/// </summary>
Expand Down
3 changes: 3 additions & 0 deletions src/Sdk/RSWebApi/Contracts/AcquireJobRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ public class AcquireJobRequest

[DataMember(Name = "runnerOS", EmitDefaultValue = false)]
public string RunnerOS { get; set; }

[DataMember(Name = "billingOwnerId", EmitDefaultValue = false)]
public string BillingOwnerId { get; set; }
}
}
3 changes: 3 additions & 0 deletions src/Sdk/RSWebApi/Contracts/CompleteJobRequest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,8 @@ public class CompleteJobRequest

[DataMember(Name = "environmentUrl", EmitDefaultValue = false)]
public string EnvironmentUrl { get; set; }

[DataMember(Name = "billingOwnerId", EmitDefaultValue = false)]
public string BillingOwnerId { get; set; }
}
}
6 changes: 5 additions & 1 deletion src/Sdk/RSWebApi/RunServiceHttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -71,13 +71,15 @@ public async Task<AgentJobRequestMessage> GetJobMessageAsync(
Uri requestUri,
string messageId,
string runnerOS,
string billingOwnerId,
CancellationToken cancellationToken = default)
{
HttpMethod httpMethod = new HttpMethod("POST");
var payload = new AcquireJobRequest
{
JobMessageId = messageId,
RunnerOS = runnerOS
RunnerOS = runnerOS,
BillingOwnerId = billingOwnerId,
};

requestUri = new Uri(requestUri, "acquirejob");
Expand Down Expand Up @@ -128,6 +130,7 @@ public async Task CompleteJobAsync(
IList<Annotation> jobAnnotations,
string environmentUrl,
IList<Telemetry> telemetry,
string billingOwnerId,
CancellationToken cancellationToken = default)
{
HttpMethod httpMethod = new HttpMethod("POST");
Expand All @@ -141,6 +144,7 @@ public async Task CompleteJobAsync(
Annotations = jobAnnotations,
EnvironmentUrl = environmentUrl,
Telemetry = telemetry,
BillingOwnerId = billingOwnerId,
};

requestUri = new Uri(requestUri, "completejob");
Expand Down
11 changes: 7 additions & 4 deletions src/Test/L0/Listener/JobDispatcherL0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,23 @@ public JobDispatcherL0()
_configurationStore = new Mock<IConfigurationStore>();
}

private Pipelines.AgentJobRequestMessage CreateJobRequestMessage()
private Pipelines.AgentJobRequestMessage CreateJobRequestMessage(string billingOwnerId = null)
{
TaskOrchestrationPlanReference plan = new();
TimelineReference timeline = null;
Guid jobId = Guid.NewGuid();
var result = new Pipelines.AgentJobRequestMessage(plan, timeline, jobId, "someJob", "someJob", null, null, null, new Dictionary<string, VariableValue>(), new List<MaskHint>(), new Pipelines.JobResources(), new Pipelines.ContextData.DictionaryContextData(), new Pipelines.WorkspaceOptions(), new List<Pipelines.ActionStep>(), null, null, null, null, null);
result.ContextData["github"] = new Pipelines.ContextData.DictionaryContextData();
result.BillingOwnerId = billingOwnerId;
return result;
}

[Fact]
[Theory]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async void DispatchesJobRequest()
[InlineData(null)]
[InlineData("billingOwnerId")]
public async void DispatchesJobRequest(string billingOwnerId)
{
//Arrange
using (var hc = new TestHostContext(this))
Expand All @@ -65,7 +68,7 @@ public async void DispatchesJobRequest()
jobDispatcher.Initialize(hc);

var ts = new CancellationTokenSource();
Pipelines.AgentJobRequestMessage message = CreateJobRequestMessage();
Pipelines.AgentJobRequestMessage message = CreateJobRequestMessage(billingOwnerId);
string strMessage = JsonUtility.ToString(message);

_processInvoker.Setup(x => x.ExecuteAsync(It.IsAny<String>(), It.IsAny<String>(), "spawnclient 1 2", null, It.IsAny<CancellationToken>()))
Expand Down
Loading