Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 64 additions & 53 deletions src/Runner.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,61 +246,17 @@ private async Task EnsureDispatchFinished(WorkerDispatcher jobDispatch, bool can
jobDispatch.WorkerCancellationTokenSource.Cancel();
// wait for worker process exit then return.
await jobDispatch.WorkerDispatch;

return;
}

if (this._isRunServiceJob)
else if (this._isRunServiceJob)
{
Trace.Error($"We are not yet checking the state of jobrequest {jobDispatch.JobId} status. Cancel running worker right away.");
jobDispatch.WorkerCancellationTokenSource.Cancel();
return;
}

// based on the current design, server will only send one job for a given runner at a time.
// if the runner received a new job request while a previous job request is still running, this typically indicates two situations
// 1. a runner bug caused a server and runner mismatch on the state of the job request, e.g. the runner didn't renew the jobrequest
// properly but thinks it still owns the job reqest, however the server has already abandoned the jobrequest.
// 2. a server bug or design change that allowed the server to send more than one job request to an given runner that hasn't finished
//. a previous job request.
var runnerServer = HostContext.GetService<IRunnerServer>();
TaskAgentJobRequest request = null;
try
{
request = await runnerServer.GetAgentRequestAsync(_poolId, jobDispatch.RequestId, CancellationToken.None);
}
catch (TaskAgentJobNotFoundException ex)
{
Trace.Error($"Catch job-not-found exception while checking jobrequest {jobDispatch.JobId} status. Cancel running worker right away.");
Trace.Error(ex);
jobDispatch.WorkerCancellationTokenSource.Cancel();
// make sure worker process exits before we return, otherwise we might leave an orphan worker process behind.
await jobDispatch.WorkerDispatch;
return;
}
catch (Exception ex)
{
// we can't even query for the jobrequest from server, something totally busted, stop runner/worker.
Trace.Error($"Catch exception while checking jobrequest {jobDispatch.JobId} status. Cancel running worker right away.");
Trace.Error(ex);

jobDispatch.WorkerCancellationTokenSource.Cancel();
// make sure the worker process exits before we rethrow, otherwise we might leave orphan worker process behind.
await jobDispatch.WorkerDispatch;

// rethrow original exception
throw;
}

if (request.Result != null)
{
// job request has been finished, the server already has the result.
// this means the runner is busted since it is still running that request.
// cancel the zombie worker, run next job request.
Trace.Error($"Received job request while previous job {jobDispatch.JobId} still running on worker. Cancel the previous job since the job request have been finished on server side with result: {request.Result.Value}.");
jobDispatch.WorkerCancellationTokenSource.Cancel();

// wait 45 sec for worker to finish.
// Wait for the previous worker process to fully exit (including
// TempDirectoryManager cleanup) before letting the new worker
// start. Without this wait the exiting worker's _temp cleanup
// races with the new worker and deletes files such as
// _runner_file_commands/* out from under the active job. (#4357)
Task completedTask = await Task.WhenAny(jobDispatch.WorkerDispatch, Task.Delay(TimeSpan.FromSeconds(45)));
if (completedTask != jobDispatch.WorkerDispatch)
{
Expand All @@ -311,9 +267,64 @@ private async Task EnsureDispatchFinished(WorkerDispatcher jobDispatch, bool can
}
else
{
// something seriously wrong on server side. stop runner from continue running.
// no need to localize the exception string should never happen.
throw new InvalidOperationException($"Server send a new job request while the previous job request {jobDispatch.JobId} haven't finished.");
// based on the current design, server will only send one job for a given runner at a time.
// if the runner received a new job request while a previous job request is still running, this typically indicates two situations
// 1. a runner bug caused a server and runner mismatch on the state of the job request, e.g. the runner didn't renew the jobrequest
// properly but thinks it still owns the job reqest, however the server has already abandoned the jobrequest.
// 2. a server bug or design change that allowed the server to send more than one job request to an given runner that hasn't finished
//. a previous job request.
var runnerServer = HostContext.GetService<IRunnerServer>();
TaskAgentJobRequest request = null;
try
{
request = await runnerServer.GetAgentRequestAsync(_poolId, jobDispatch.RequestId, CancellationToken.None);
}
catch (TaskAgentJobNotFoundException ex)
{
Trace.Error($"Catch job-not-found exception while checking jobrequest {jobDispatch.JobId} status. Cancel running worker right away.");
Trace.Error(ex);
jobDispatch.WorkerCancellationTokenSource.Cancel();
// make sure worker process exits before we return, otherwise we might leave an orphan worker process behind.
await jobDispatch.WorkerDispatch;
return;
}
catch (Exception ex)
{
// we can't even query for the jobrequest from server, something totally busted, stop runner/worker.
Trace.Error($"Catch exception while checking jobrequest {jobDispatch.JobId} status. Cancel running worker right away.");
Trace.Error(ex);

jobDispatch.WorkerCancellationTokenSource.Cancel();
// make sure the worker process exits before we rethrow, otherwise we might leave orphan worker process behind.
await jobDispatch.WorkerDispatch;

// rethrow original exception
throw;
}

if (request.Result != null)
{
// job request has been finished, the server already has the result.
// this means the runner is busted since it is still running that request.
// cancel the zombie worker, run next job request.
Trace.Error($"Received job request while previous job {jobDispatch.JobId} still running on worker. Cancel the previous job since the job request have been finished on server side with result: {request.Result.Value}.");
jobDispatch.WorkerCancellationTokenSource.Cancel();

// wait 45 sec for worker to finish.
Task completedTask = await Task.WhenAny(jobDispatch.WorkerDispatch, Task.Delay(TimeSpan.FromSeconds(45)));
if (completedTask != jobDispatch.WorkerDispatch)
{
// at this point, the job execution might encounter some dead lock and even not able to be cancelled.
// no need to localize the exception string should never happen.
throw new InvalidOperationException($"Job dispatch process for {jobDispatch.JobId} has encountered unexpected error, the dispatch task is not able to be cancelled within 45 seconds.");
}
}
else
{
// something seriously wrong on server side. stop runner from continue running.
// no need to localize the exception string should never happen.
throw new InvalidOperationException($"Server send a new job request while the previous job request {jobDispatch.JobId} haven't finished.");
}
}
}

Expand Down
83 changes: 83 additions & 0 deletions src/Test/L0/Listener/JobDispatcherL0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -745,6 +745,89 @@ public async void DispatchesOneTimeJobRequest()
}
}

// Regression test for https://github.com/actions/runner/issues/4357.
//
// For run-service jobs, the previous EnsureDispatchFinished implementation
// cancelled the running worker and immediately returned without awaiting
// the worker process exit. This let the new Worker spawn while the old
// Worker was still running TempDirectoryManager.CleanupTempDirectory(),
// which then wiped _runner_file_commands/* out from under the new job.
//
// EnsureDispatchFinished must now await the previous WorkerDispatch task
// before returning so the new dispatch only proceeds once the previous
// worker (and its temp cleanup) has finished.
[Fact]
[Trait("Level", "L0")]
[Trait("Category", "Runner")]
public async void EnsureDispatchFinishedAwaitsPreviousWorkerForRunServiceJob()
{
using (var hc = new TestHostContext(this))
{
hc.SetSingleton<IConfigurationStore>(_configurationStore.Object);
hc.SetSingleton<IRunnerServer>(_runnerServer.Object);
_configurationStore.Setup(x => x.GetSettings()).Returns(new RunnerSettings { PoolId = 1 });

var jobDispatcher = new JobDispatcher();
jobDispatcher.Initialize(hc);
EnableRunServiceJobForJobDispatcher(jobDispatcher);

// Build a previous-job WorkerDispatcher whose worker process simulates
// a slow shutdown (representing TempDirectoryManager.CleanupTempDirectory
// running). We control completion via a TaskCompletionSource.
var workerDispatcherType = typeof(JobDispatcher).GetNestedType("WorkerDispatcher", BindingFlags.NonPublic);
Assert.NotNull(workerDispatcherType);
var ctor = workerDispatcherType.GetConstructor(
BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance,
null,
new[] { typeof(Guid), typeof(long) },
null);
Assert.NotNull(ctor);
object prev = null;
try
{
prev = ctor.Invoke(new object[] { Guid.NewGuid(), (long)42 });

var workerExitTcs = new TaskCompletionSource<int>();
workerDispatcherType.GetProperty("WorkerDispatch").SetValue(prev, workerExitTcs.Task);

var ensureDispatchFinishedMethod = typeof(JobDispatcher).GetMethod(
"EnsureDispatchFinished",
BindingFlags.NonPublic | BindingFlags.Instance);
Assert.NotNull(ensureDispatchFinishedMethod);

var ensureTask = (Task)ensureDispatchFinishedMethod.Invoke(jobDispatcher, new object[] { prev, false });

// Cancellation should already have been requested on the previous worker.
var cts = (CancellationTokenSource)workerDispatcherType
.GetProperty("WorkerCancellationTokenSource")
.GetValue(prev);
Assert.True(cts.IsCancellationRequested,
"EnsureDispatchFinished should cancel the previous worker for run-service jobs.");

// EnsureDispatchFinished MUST NOT complete until the previous worker exits.
// Give the continuation a chance to run, then assert it's still pending.
await Task.Delay(50);
Assert.False(ensureTask.IsCompleted,
"EnsureDispatchFinished must wait for the previous worker process to exit (including temp cleanup) before returning.");

// Simulate the previous worker (and its TempDirectoryManager cleanup) finishing.
workerExitTcs.SetResult(0);

// Now EnsureDispatchFinished should complete in a timely fashion.
var completed = await Task.WhenAny(ensureTask, Task.Delay(TimeSpan.FromSeconds(10)));
Assert.Same(ensureTask, completed);
await ensureTask;
}
finally
{
if (prev is IDisposable d)
{
d.Dispose();
}
}
}
}

private static void EnableRunServiceJobForJobDispatcher(JobDispatcher jobDispatcher)
{
// Set the value of the _isRunServiceJob field to true
Expand Down