Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions src/Runner.Common/ConfigurationStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ public interface IConfigurationStore : IRunnerService
bool IsConfigured();
bool IsServiceConfigured();
bool HasCredentials();
bool IsMigratedConfigured();
CredentialData GetCredentials();
CredentialData GetMigratedCredentials();
RunnerSettings GetSettings();
Expand Down Expand Up @@ -198,6 +199,14 @@ public bool IsServiceConfigured()
return serviceConfigured;
}

public bool IsMigratedConfigured()
{
Trace.Info("IsMigratedConfigured()");
bool configured = new FileInfo(_migratedConfigFilePath).Exists;
Trace.Info("IsMigratedConfigured: {0}", configured);
return configured;
}

public CredentialData GetCredentials()
{
if (_creds == null)
Expand Down
4 changes: 4 additions & 0 deletions src/Runner.Common/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,10 @@ public static class ReturnCode
public const int RunnerUpdating = 3;
public const int RunOnceRunnerUpdating = 4;
public const int SessionConflict = 5;
// Temporary error code to indicate that the runner configuration has been refreshed
// and the runner should be restarted. This is a temporary code and will be removed in the future after
// the runner is migrated to runner admin.
public const int RunnerConfigurationRefreshed = 6;
}

public static class Features
Expand Down
45 changes: 42 additions & 3 deletions src/Runner.Listener/BrokerMessageListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,19 @@ public sealed class BrokerMessageListener : RunnerService, IMessageListener
private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30);
private bool _needRefreshCredsV2 = false;
private bool _handlerInitialized = false;
private bool _isMigratedSettings = false;
private const int _maxMigratedSettingsRetries = 3;
private int _migratedSettingsRetryCount = 0;

public BrokerMessageListener()
{
}

public BrokerMessageListener(RunnerSettings settings, bool isMigratedSettings = false)
{
_settings = settings;
_isMigratedSettings = isMigratedSettings;
}

public override void Initialize(IHostContext hostContext)
{
Expand All @@ -53,9 +66,22 @@ public async Task<CreateSessionResult> CreateSessionAsync(CancellationToken toke
{
Trace.Entering();

// Settings
var configManager = HostContext.GetService<IConfigurationManager>();
_settings = configManager.LoadSettings();
// Load settings if not provided through constructor
if (_settings == null)
{
var configManager = HostContext.GetService<IConfigurationManager>();
_settings = configManager.LoadSettings();
Trace.Info("Settings loaded from config manager");
}
else
{
Trace.Info("Using provided settings");
if (_isMigratedSettings)
{
Trace.Info("Using migrated settings from .runner_migrated");
}
}

var serverUrlV2 = _settings.ServerUrlV2;
var serverUrl = _settings.ServerUrl;
Trace.Info(_settings);
Expand Down Expand Up @@ -141,6 +167,19 @@ public async Task<CreateSessionResult> CreateSessionAsync(CancellationToken toke
Trace.Error("Catch exception during create session.");
Trace.Error(ex);

// If using migrated settings, limit the number of retries before returning failure
if (_isMigratedSettings)
{
_migratedSettingsRetryCount++;
Trace.Warning($"Migrated settings retry {_migratedSettingsRetryCount} of {_maxMigratedSettingsRetries}");

if (_migratedSettingsRetryCount >= _maxMigratedSettingsRetries)
{
Trace.Warning("Reached maximum retry attempts for migrated settings. Returning failure to try default settings.");
return CreateSessionResult.Failure;
}
}

if (!HostContext.AllowAuthMigration &&
ex is VssOAuthTokenRequestException vssOAuthEx &&
_credsV2.Federated is VssOAuthCredential vssOAuthCred)
Expand Down
17 changes: 17 additions & 0 deletions src/Runner.Listener/Configuration/ConfigurationManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ public interface IConfigurationManager : IRunnerService
Task UnconfigureAsync(CommandSettings command);
void DeleteLocalRunnerConfig();
RunnerSettings LoadSettings();
RunnerSettings LoadMigratedSettings();
}

public sealed class ConfigurationManager : RunnerService, IConfigurationManager
Expand Down Expand Up @@ -66,6 +67,22 @@ public RunnerSettings LoadSettings()
return settings;
}

public RunnerSettings LoadMigratedSettings()
{
Trace.Info(nameof(LoadMigratedSettings));

// Check if migrated settings file exists
if (!_store.IsMigratedConfigured())
{
throw new NonRetryableException("No migrated configuration found.");
}

RunnerSettings settings = _store.GetMigratedSettings();
Trace.Info("Migrated Settings Loaded");

return settings;
}

public async Task ConfigureAsync(CommandSettings command)
{
_term.WriteLine();
Expand Down
120 changes: 110 additions & 10 deletions src/Runner.Listener/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ public async Task<int> ExecuteCommand(CommandSettings command)
}

// Run the runner interactively or as service
return await RunAsync(settings, command.RunOnce || settings.Ephemeral);
return await ExecuteRunnerAsync(settings, command.RunOnce || settings.Ephemeral);
}
else
{
Expand Down Expand Up @@ -387,12 +387,12 @@ private void CtrlCHandler(object sender, EventArgs e)
}
}

private IMessageListener GetMessageListener(RunnerSettings settings)
private IMessageListener GetMessageListener(RunnerSettings settings, bool isMigratedSettings = false)
{
if (settings.UseV2Flow)
{
Trace.Info($"Using BrokerMessageListener");
var brokerListener = new BrokerMessageListener();
var brokerListener = new BrokerMessageListener(settings, isMigratedSettings);
brokerListener.Initialize(HostContext);
return brokerListener;
}
Expand All @@ -406,15 +406,65 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
try
{
Trace.Info(nameof(RunAsync));
_listener = GetMessageListener(settings);
CreateSessionResult createSessionResult = await _listener.CreateSessionAsync(HostContext.RunnerShutdownToken);
if (createSessionResult == CreateSessionResult.SessionConflict)

// First try using migrated settings if available
var configManager = HostContext.GetService<IConfigurationManager>();
RunnerSettings migratedSettings = null;

try
{
migratedSettings = configManager.LoadMigratedSettings();
Trace.Info("Loaded migrated settings from .runner_migrated file");
Trace.Info(migratedSettings);
}
catch (Exception ex)
{
return Constants.Runner.ReturnCode.SessionConflict;
// If migrated settings file doesn't exist or can't be loaded, we'll use the provided settings
Trace.Info($"Failed to load migrated settings: {ex.Message}");
}
else if (createSessionResult == CreateSessionResult.Failure)

bool usedMigratedSettings = false;

if (migratedSettings != null)
{
return Constants.Runner.ReturnCode.TerminatedError;
// Try to create session with migrated settings first
Trace.Info("Attempting to create session using migrated settings");
_listener = GetMessageListener(migratedSettings, isMigratedSettings: true);

try
{
CreateSessionResult createSessionResult = await _listener.CreateSessionAsync(HostContext.RunnerShutdownToken);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we want to report telemetry back to service in case of any failure, so we know customers are having issue with migration.

if (createSessionResult == CreateSessionResult.Success)
{
Trace.Info("Successfully created session with migrated settings");
settings = migratedSettings; // Use migrated settings for the rest of the process
usedMigratedSettings = true;
}
else
{
Trace.Warning($"Failed to create session with migrated settings: {createSessionResult}");
}
}
catch (Exception ex)
{
Trace.Error($"Exception when creating session with migrated settings: {ex}");
}
}

// If migrated settings weren't used or session creation failed, use original settings
if (!usedMigratedSettings)
{
Trace.Info("Falling back to original .runner settings");
_listener = GetMessageListener(settings);
CreateSessionResult createSessionResult = await _listener.CreateSessionAsync(HostContext.RunnerShutdownToken);
if (createSessionResult == CreateSessionResult.SessionConflict)
{
return Constants.Runner.ReturnCode.SessionConflict;
}
else if (createSessionResult == CreateSessionResult.Failure)
{
return Constants.Runner.ReturnCode.TerminatedError;
}
}

HostContext.WritePerfCounter("SessionCreated");
Expand All @@ -428,6 +478,8 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
// Should we try to cleanup ephemeral runners
bool runOnceJobCompleted = false;
bool skipSessionDeletion = false;
bool restartSession = false; // Flag to indicate session restart
bool restartSessionPending = false;
try
{
var notification = HostContext.GetService<IJobNotification>();
Expand All @@ -443,6 +495,15 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)

while (!HostContext.RunnerShutdownToken.IsCancellationRequested)
{
// Check if we need to restart the session and can do so (job dispatcher not busy)
if (restartSessionPending && !jobDispatcher.Busy)
{
Trace.Info("Pending session restart detected and job dispatcher is not busy. Restarting session now.");
messageQueueLoopTokenSource.Cancel();
restartSession = true;
break;
}

TaskAgentMessage message = null;
bool skipMessageDeletion = false;
try
Expand Down Expand Up @@ -679,6 +740,17 @@ await configUpdater.UpdateRunnerConfigAsync(
configType: runnerRefreshConfigMessage.ConfigType,
serviceType: runnerRefreshConfigMessage.ServiceType,
configRefreshUrl: runnerRefreshConfigMessage.ConfigRefreshUrl);

// Set flag to schedule session restart if ConfigType is "runner"
if (string.Equals(runnerRefreshConfigMessage.ConfigType, "runner", StringComparison.OrdinalIgnoreCase))
{
Trace.Info("Runner configuration was updated. Session restart has been scheduled");
restartSessionPending = true;
}
else
{
Trace.Info($"No session restart needed for config type: {runnerRefreshConfigMessage.ConfigType}");
}
}
else
{
Expand Down Expand Up @@ -733,10 +805,16 @@ await configUpdater.UpdateRunnerConfigAsync(

if (settings.Ephemeral && runOnceJobCompleted)
{
var configManager = HostContext.GetService<IConfigurationManager>();
configManager.DeleteLocalRunnerConfig();
}
}

// After cleanup, check if we need to restart the session
if (restartSession)
{
Trace.Info("Restarting runner session after config update...");
return Constants.Runner.ReturnCode.RunnerConfigurationRefreshed;
}
}
catch (TaskAgentAccessTokenExpiredException)
{
Expand All @@ -750,6 +828,28 @@ await configUpdater.UpdateRunnerConfigAsync(
return Constants.Runner.ReturnCode.Success;
}

private async Task<int> ExecuteRunnerAsync(RunnerSettings settings, bool runOnce)
{
int returnCode = Constants.Runner.ReturnCode.Success;
bool restart = false;
do
{
restart = false;
returnCode = await RunAsync(settings, runOnce);

if (returnCode == Constants.Runner.ReturnCode.RunnerConfigurationRefreshed)
{
Trace.Info("Runner configuration was refreshed, restarting session...");
// Reload settings in case they changed
var configManager = HostContext.GetService<IConfigurationManager>();
settings = configManager.LoadSettings();
restart = true;
}
} while (restart);

return returnCode;
}

private void HandleAuthMigrationChanged(object sender, AuthMigrationEventArgs e)
{
Trace.Verbose("Handle AuthMigrationChanged in Runner");
Expand Down
43 changes: 43 additions & 0 deletions src/Test/L0/Listener/BrokerMessageListenerL0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,49 @@ public async Task GetNextMessage_AuthMigrationFallback()
}
}

[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async Task CreatesSessionWithProvidedSettings()
{
using (TestHostContext tc = CreateTestContext())
using (var tokenSource = new CancellationTokenSource())
{
Tracing trace = tc.GetTrace();

// Arrange.
var expectedSession = new TaskAgentSession();
_brokerServer
.Setup(x => x.CreateSessionAsync(
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token))
.Returns(Task.FromResult(expectedSession));

_credMgr.Setup(x => x.LoadCredentials(true)).Returns(new VssCredentials());

// Make sure the config is never called when settings are provided
_config.Setup(x => x.LoadSettings()).Throws(new InvalidOperationException("Should not be called"));

// Act.
// Use the constructor that accepts settings
BrokerMessageListener listener = new(_settings);
listener.Initialize(tc);

CreateSessionResult result = await listener.CreateSessionAsync(tokenSource.Token);
trace.Info("result: {0}", result);

// Assert.
Assert.Equal(CreateSessionResult.Success, result);
_brokerServer
.Verify(x => x.CreateSessionAsync(
It.Is<TaskAgentSession>(y => y != null),
tokenSource.Token), Times.Once());

// Verify LoadSettings was never called
_config.Verify(x => x.LoadSettings(), Times.Never());
}
}

private TestHostContext CreateTestContext([CallerMemberName] String testName = "")
{
TestHostContext tc = new(this, testName);
Expand Down