diff --git a/src/Runner.Listener/JobDispatcher.cs b/src/Runner.Listener/JobDispatcher.cs index 10076441eff..f5a1311c7e7 100644 --- a/src/Runner.Listener/JobDispatcher.cs +++ b/src/Runner.Listener/JobDispatcher.cs @@ -246,61 +246,17 @@ private async Task EnsureDispatchFinished(WorkerDispatcher jobDispatch, bool can jobDispatch.WorkerCancellationTokenSource.Cancel(); // wait for worker process exit then return. await jobDispatch.WorkerDispatch; - - return; } - - if (this._isRunServiceJob) + else if (this._isRunServiceJob) { Trace.Error($"We are not yet checking the state of jobrequest {jobDispatch.JobId} status. Cancel running worker right away."); jobDispatch.WorkerCancellationTokenSource.Cancel(); - return; - } - // based on the current design, server will only send one job for a given runner at a time. - // if the runner received a new job request while a previous job request is still running, this typically indicates two situations - // 1. a runner bug caused a server and runner mismatch on the state of the job request, e.g. the runner didn't renew the jobrequest - // properly but thinks it still owns the job reqest, however the server has already abandoned the jobrequest. - // 2. a server bug or design change that allowed the server to send more than one job request to an given runner that hasn't finished - //. a previous job request. - var runnerServer = HostContext.GetService(); - TaskAgentJobRequest request = null; - try - { - request = await runnerServer.GetAgentRequestAsync(_poolId, jobDispatch.RequestId, CancellationToken.None); - } - catch (TaskAgentJobNotFoundException ex) - { - Trace.Error($"Catch job-not-found exception while checking jobrequest {jobDispatch.JobId} status. Cancel running worker right away."); - Trace.Error(ex); - jobDispatch.WorkerCancellationTokenSource.Cancel(); - // make sure worker process exits before we return, otherwise we might leave an orphan worker process behind. - await jobDispatch.WorkerDispatch; - return; - } - catch (Exception ex) - { - // we can't even query for the jobrequest from server, something totally busted, stop runner/worker. - Trace.Error($"Catch exception while checking jobrequest {jobDispatch.JobId} status. Cancel running worker right away."); - Trace.Error(ex); - - jobDispatch.WorkerCancellationTokenSource.Cancel(); - // make sure the worker process exits before we rethrow, otherwise we might leave orphan worker process behind. - await jobDispatch.WorkerDispatch; - - // rethrow original exception - throw; - } - - if (request.Result != null) - { - // job request has been finished, the server already has the result. - // this means the runner is busted since it is still running that request. - // cancel the zombie worker, run next job request. - Trace.Error($"Received job request while previous job {jobDispatch.JobId} still running on worker. Cancel the previous job since the job request have been finished on server side with result: {request.Result.Value}."); - jobDispatch.WorkerCancellationTokenSource.Cancel(); - - // wait 45 sec for worker to finish. + // Wait for the previous worker process to fully exit (including + // TempDirectoryManager cleanup) before letting the new worker + // start. Without this wait the exiting worker's _temp cleanup + // races with the new worker and deletes files such as + // _runner_file_commands/* out from under the active job. (#4357) Task completedTask = await Task.WhenAny(jobDispatch.WorkerDispatch, Task.Delay(TimeSpan.FromSeconds(45))); if (completedTask != jobDispatch.WorkerDispatch) { @@ -311,9 +267,64 @@ private async Task EnsureDispatchFinished(WorkerDispatcher jobDispatch, bool can } else { - // something seriously wrong on server side. stop runner from continue running. - // no need to localize the exception string should never happen. - throw new InvalidOperationException($"Server send a new job request while the previous job request {jobDispatch.JobId} haven't finished."); + // based on the current design, server will only send one job for a given runner at a time. + // if the runner received a new job request while a previous job request is still running, this typically indicates two situations + // 1. a runner bug caused a server and runner mismatch on the state of the job request, e.g. the runner didn't renew the jobrequest + // properly but thinks it still owns the job reqest, however the server has already abandoned the jobrequest. + // 2. a server bug or design change that allowed the server to send more than one job request to an given runner that hasn't finished + //. a previous job request. + var runnerServer = HostContext.GetService(); + TaskAgentJobRequest request = null; + try + { + request = await runnerServer.GetAgentRequestAsync(_poolId, jobDispatch.RequestId, CancellationToken.None); + } + catch (TaskAgentJobNotFoundException ex) + { + Trace.Error($"Catch job-not-found exception while checking jobrequest {jobDispatch.JobId} status. Cancel running worker right away."); + Trace.Error(ex); + jobDispatch.WorkerCancellationTokenSource.Cancel(); + // make sure worker process exits before we return, otherwise we might leave an orphan worker process behind. + await jobDispatch.WorkerDispatch; + return; + } + catch (Exception ex) + { + // we can't even query for the jobrequest from server, something totally busted, stop runner/worker. + Trace.Error($"Catch exception while checking jobrequest {jobDispatch.JobId} status. Cancel running worker right away."); + Trace.Error(ex); + + jobDispatch.WorkerCancellationTokenSource.Cancel(); + // make sure the worker process exits before we rethrow, otherwise we might leave orphan worker process behind. + await jobDispatch.WorkerDispatch; + + // rethrow original exception + throw; + } + + if (request.Result != null) + { + // job request has been finished, the server already has the result. + // this means the runner is busted since it is still running that request. + // cancel the zombie worker, run next job request. + Trace.Error($"Received job request while previous job {jobDispatch.JobId} still running on worker. Cancel the previous job since the job request have been finished on server side with result: {request.Result.Value}."); + jobDispatch.WorkerCancellationTokenSource.Cancel(); + + // wait 45 sec for worker to finish. + Task completedTask = await Task.WhenAny(jobDispatch.WorkerDispatch, Task.Delay(TimeSpan.FromSeconds(45))); + if (completedTask != jobDispatch.WorkerDispatch) + { + // at this point, the job execution might encounter some dead lock and even not able to be cancelled. + // no need to localize the exception string should never happen. + throw new InvalidOperationException($"Job dispatch process for {jobDispatch.JobId} has encountered unexpected error, the dispatch task is not able to be cancelled within 45 seconds."); + } + } + else + { + // something seriously wrong on server side. stop runner from continue running. + // no need to localize the exception string should never happen. + throw new InvalidOperationException($"Server send a new job request while the previous job request {jobDispatch.JobId} haven't finished."); + } } } diff --git a/src/Test/L0/Listener/JobDispatcherL0.cs b/src/Test/L0/Listener/JobDispatcherL0.cs index a160ffba2c0..c13e09889c8 100644 --- a/src/Test/L0/Listener/JobDispatcherL0.cs +++ b/src/Test/L0/Listener/JobDispatcherL0.cs @@ -745,6 +745,89 @@ public async void DispatchesOneTimeJobRequest() } } + // Regression test for https://github.com/actions/runner/issues/4357. + // + // For run-service jobs, the previous EnsureDispatchFinished implementation + // cancelled the running worker and immediately returned without awaiting + // the worker process exit. This let the new Worker spawn while the old + // Worker was still running TempDirectoryManager.CleanupTempDirectory(), + // which then wiped _runner_file_commands/* out from under the new job. + // + // EnsureDispatchFinished must now await the previous WorkerDispatch task + // before returning so the new dispatch only proceeds once the previous + // worker (and its temp cleanup) has finished. + [Fact] + [Trait("Level", "L0")] + [Trait("Category", "Runner")] + public async void EnsureDispatchFinishedAwaitsPreviousWorkerForRunServiceJob() + { + using (var hc = new TestHostContext(this)) + { + hc.SetSingleton(_configurationStore.Object); + hc.SetSingleton(_runnerServer.Object); + _configurationStore.Setup(x => x.GetSettings()).Returns(new RunnerSettings { PoolId = 1 }); + + var jobDispatcher = new JobDispatcher(); + jobDispatcher.Initialize(hc); + EnableRunServiceJobForJobDispatcher(jobDispatcher); + + // Build a previous-job WorkerDispatcher whose worker process simulates + // a slow shutdown (representing TempDirectoryManager.CleanupTempDirectory + // running). We control completion via a TaskCompletionSource. + var workerDispatcherType = typeof(JobDispatcher).GetNestedType("WorkerDispatcher", BindingFlags.NonPublic); + Assert.NotNull(workerDispatcherType); + var ctor = workerDispatcherType.GetConstructor( + BindingFlags.NonPublic | BindingFlags.Public | BindingFlags.Instance, + null, + new[] { typeof(Guid), typeof(long) }, + null); + Assert.NotNull(ctor); + object prev = null; + try + { + prev = ctor.Invoke(new object[] { Guid.NewGuid(), (long)42 }); + + var workerExitTcs = new TaskCompletionSource(); + workerDispatcherType.GetProperty("WorkerDispatch").SetValue(prev, workerExitTcs.Task); + + var ensureDispatchFinishedMethod = typeof(JobDispatcher).GetMethod( + "EnsureDispatchFinished", + BindingFlags.NonPublic | BindingFlags.Instance); + Assert.NotNull(ensureDispatchFinishedMethod); + + var ensureTask = (Task)ensureDispatchFinishedMethod.Invoke(jobDispatcher, new object[] { prev, false }); + + // Cancellation should already have been requested on the previous worker. + var cts = (CancellationTokenSource)workerDispatcherType + .GetProperty("WorkerCancellationTokenSource") + .GetValue(prev); + Assert.True(cts.IsCancellationRequested, + "EnsureDispatchFinished should cancel the previous worker for run-service jobs."); + + // EnsureDispatchFinished MUST NOT complete until the previous worker exits. + // Give the continuation a chance to run, then assert it's still pending. + await Task.Delay(50); + Assert.False(ensureTask.IsCompleted, + "EnsureDispatchFinished must wait for the previous worker process to exit (including temp cleanup) before returning."); + + // Simulate the previous worker (and its TempDirectoryManager cleanup) finishing. + workerExitTcs.SetResult(0); + + // Now EnsureDispatchFinished should complete in a timely fashion. + var completed = await Task.WhenAny(ensureTask, Task.Delay(TimeSpan.FromSeconds(10))); + Assert.Same(ensureTask, completed); + await ensureTask; + } + finally + { + if (prev is IDisposable d) + { + d.Dispose(); + } + } + } + } + private static void EnableRunServiceJobForJobDispatcher(JobDispatcher jobDispatcher) { // Set the value of the _isRunServiceJob field to true