Skip to content

Commit 9dbb5e1

Browse files
committed
Fix async queue processing issues
1 parent f5df2f8 commit 9dbb5e1

7 files changed

Lines changed: 28 additions & 34 deletions

File tree

src/InEngine.Core/Commands/AlwaysFail.cs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System;
2+
using System.Threading.Tasks;
23
using InEngine.Core.Exceptions;
34

45
namespace InEngine.Core.Commands;
@@ -8,7 +9,7 @@ namespace InEngine.Core.Commands;
89
/// </summary>
910
public class AlwaysFail : AbstractCommand
1011
{
11-
public override void Run()
12+
public override async Task RunAsync()
1213
{
1314
throw new CommandFailedException("This command always fails.");
1415
}

src/InEngine.Core/Commands/Sleep.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ namespace InEngine.Core.Commands;
88
public class Sleep : AbstractCommand
99
{
1010
[Option("duration", HelpText = "The number of seconds to sleep.")]
11-
public int DurationInSeconds { get; set; }
11+
public int DurationInSeconds { get; set; } = 3;
1212

1313
public override async Task RunAsync()
1414
{

src/InEngine.Core/InEngine.Core.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
<PackageReference Include="Microsoft.Extensions.Configuration.Json" Version="2.0.0" />
2828
<PackageReference Include="Microsoft.Extensions.Configuration.Binder" Version="2.0.0" />
2929
<PackageReference Include="Serilog" Version="2.11.0" />
30-
<PackageReference Include="StackExchange.Redis" Version="1.2.4" />
30+
<PackageReference Include="StackExchange.Redis" Version="2.6.48" />
3131
<PackageReference Include="Goblinfactory.Konsole" Version="6.2.2" />
3232
<PackageReference Include="RestSharp" Version="106.12.0" />
3333
<PackageReference Include="MailKit" Version="1.22.0" />

src/InEngine.Core/Queuing/Clients/FileClient.cs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,8 +64,7 @@ public async Task Consume(CancellationToken cancellationToken)
6464
{
6565
try
6666
{
67-
if (await Consume() == null)
68-
Thread.Sleep(5000);
67+
await Consume();
6968
}
7069
catch (Exception exception)
7170
{
@@ -108,7 +107,7 @@ public async Task<ICommandEnvelope> Consume()
108107

109108
ConsumeLock.ReleaseMutex();
110109

111-
var commandEnvelope = File.ReadAllText(inProgressFilePath).DeserializeFromJson<CommandEnvelope>();
110+
var commandEnvelope = (await File.ReadAllTextAsync(inProgressFilePath)).DeserializeFromJson<CommandEnvelope>();
112111
var command = commandEnvelope.GetCommandInstanceAndIncrementRetry(() =>
113112
{
114113
File.Move(inProgressFilePath, Path.Combine(FailedQueuePath, fileInfo.Name));

src/InEngine.Core/Queuing/Clients/RedisClient.cs

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Collections.Generic;
33
using System.Linq;
44
using System.Threading;
5+
using System.Threading.Channels;
56
using System.Threading.Tasks;
67
using InEngine.Core.Exceptions;
78
using InEngine.Core.IO;
@@ -29,17 +30,16 @@ public class RedisClient : IQueueClient
2930

3031
public string FailedQueueName => QueueBaseName + $":{QueueName}:{QueueNames.Failed}";
3132

32-
public static Lazy<ConnectionMultiplexer> lazyConnection = new Lazy<ConnectionMultiplexer>(() =>
33+
public static readonly Lazy<ConnectionMultiplexer> LazyConnection = new(() =>
3334
{
3435
var redisConfig = ConfigurationOptions.Parse($"{ClientSettings.Host}:{ClientSettings.Port}");
3536
redisConfig.Password = string.IsNullOrWhiteSpace(ClientSettings.Password) ? null : ClientSettings.Password;
3637
redisConfig.AbortOnConnectFail = false;
3738
return ConnectionMultiplexer.Connect(redisConfig);
3839
});
3940

40-
public static ConnectionMultiplexer Connection => lazyConnection.Value;
41+
public static ConnectionMultiplexer Connection => LazyConnection.Value;
4142

42-
public ConnectionMultiplexer _connectionMultiplexer;
4343
private bool isDisposed;
4444

4545
public IDatabase Redis => Connection.GetDatabase(ClientSettings.Database);
@@ -87,12 +87,11 @@ public async Task Consume(CancellationToken cancellationToken)
8787
try
8888
{
8989
InitChannel();
90-
Connection.GetSubscriber().Subscribe(RedisChannel,
91-
delegate
92-
{
93-
Task.Factory.StartNew(Consume, cancellationToken, TaskCreationOptions.LongRunning,
94-
TaskScheduler.Default);
95-
});
90+
var channelMessageQueue = await Connection.GetSubscriber().SubscribeAsync(RedisChannel);
91+
channelMessageQueue.OnMessage(async _ =>
92+
{
93+
await Consume();
94+
});
9695
}
9796
catch (OperationCanceledException exception)
9897
{
@@ -108,8 +107,7 @@ public async Task<ICommandEnvelope> Consume()
108107
{
109108
var rawRedisMessageValue = Redis.ListRightPopLeftPush(PendingQueueName, InProgressQueueName);
110109
var serializedMessage = rawRedisMessageValue.ToString();
111-
if (serializedMessage == null)
112-
return null;
110+
113111
var commandEnvelope = serializedMessage.DeserializeFromJson<CommandEnvelope>();
114112
if (commandEnvelope == null)
115113
throw new CommandFailedException("Could not deserialize the command.");
@@ -189,11 +187,10 @@ public void Dispose()
189187

190188
private void Dispose(bool disposing)
191189
{
192-
if (isDisposed)
190+
if (isDisposed)
193191
return;
194-
if (!disposing)
192+
if (!disposing)
195193
return;
196-
_connectionMultiplexer?.Dispose();
197194
isDisposed = true;
198195
}
199196
}

src/InEngine.Core/Queuing/Dequeue.cs

Lines changed: 10 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -25,29 +25,26 @@ public Dequeue()
2525

2626
public async Task StartAsync()
2727
{
28-
var allTasks = new List<Task>();
2928
Log.LogDebug("Start dequeue tasks for primary queue...");
30-
allTasks.AddRange(MakeTasks(true, QueueSettings.PrimaryQueueConsumers));
29+
await AddConsumers(false, QueueSettings.PrimaryQueueConsumers);
3130
Log.LogDebug("Start dequeue tasks for secondary queue...");
32-
allTasks.AddRange(MakeTasks(false, QueueSettings.SecondaryQueueConsumers));
33-
await Task.WhenAll(allTasks);
31+
await AddConsumers(true, QueueSettings.SecondaryQueueConsumers);
3432

3533
// Recover from restart, if necessary.
3634
QueueAdapter.Make(false, QueueSettings, MailSettings).Recover();
3735
QueueAdapter.Make(true, QueueSettings, MailSettings).Recover();
3836
}
3937

40-
IList<Task> MakeTasks(bool useSecondaryQueue = false, int numberOfTasks = 0)
38+
private async Task AddConsumers(bool useSecondaryQueue = false, int numberOfTasks = 0)
4139
{
42-
return Enumerable.Range(0, numberOfTasks).Select((i) => {
40+
for (var i = 0; i < numberOfTasks; i++)
41+
{
4342
Log.LogDebug("Registering Dequeuer {I}", i);
44-
return Task.Factory.StartNew(() => {
45-
var queue = QueueAdapter.Make(useSecondaryQueue, QueueSettings, MailSettings);
46-
queue.Id = i;
47-
queueAdapters.Add(queue);
48-
queue.Consume(CancellationTokenSource.Token);
49-
}, TaskCreationOptions.LongRunning);
50-
}).ToList();
43+
var queue = QueueAdapter.Make(useSecondaryQueue, QueueSettings, MailSettings);
44+
queue.Id = i;
45+
queueAdapters.Add(queue);
46+
await queue.Consume(CancellationTokenSource.Token);
47+
}
5148
}
5249

5350
public void Dispose()

src/InEngine.Core/Queuing/Message/CommandEnvelope.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ public AbstractCommand GetCommandInstanceAndIncrementRetry(Action actionOnFail =
2424
}
2525
catch (Exception exception)
2626
{
27-
actionOnFail.Invoke();
27+
actionOnFail?.Invoke();
2828
throw new CommandNotExtractableFromEnvelopeException(CommandClassName, exception);
2929
}
3030
}

0 commit comments

Comments
 (0)