Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
# Azure Functions local settings (may contain endpoints/secrets)
local.settings.json

## Ignore Visual Studio temporary files, build results, and
## files generated by popular Visual Studio add-ons.
##
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net10.0</TargetFramework>
<AzureFunctionsVersion>v4</AzureFunctionsVersion>
<OutputType>Exe</OutputType>
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
</PropertyGroup>
<ItemGroup>
<FrameworkReference Include="Microsoft.AspNetCore.App" />
<PackageReference Include="Microsoft.Azure.Functions.Worker" Version="2.1.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask" Version="1.2.2" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.3.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="2.1.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.7" />
<PackageReference Include="Microsoft.ApplicationInsights.WorkerService" Version="2.22.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="1.2.0" />
<PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.DurableTask.AzureManaged" Version="0.4.1-alpha" />
<PackageReference Include="Contrib.Grpc.Core.M1" Version="2.41.0" />
<PackageReference Include="Grpc.Net.Client" Version="2.71.0" />
<PackageReference Include="Azure.Identity" Version="1.17.1" />
<PackageReference Include="Azure.AI.OpenAI" Version="2.3.0-beta.1" />
<PackageReference Include="Microsoft.Extensions.AI.OpenAI" Version="10.3.0" />
<PackageReference Include="StackExchange.Redis" Version="2.8.41" />
</ItemGroup>
<ItemGroup>
<None Update="host.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
</None>
<None Update="local.settings.json">
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
<CopyToPublishDirectory>Never</CopyToPublishDirectory>
</None>
</ItemGroup>
<ItemGroup>
<Using Include="System.Threading.ExecutionContext" Alias="ExecutionContext" />
</ItemGroup>

<Target Name="CopyGrpcNativeAssetsToOutDir" AfterTargets="Build">
<ItemGroup>
<NativeAssetToCopy Condition="$([MSBuild]::IsOSPlatform('OSX'))" Include="$(OutDir)runtimes/osx-arm64/native/*" />
</ItemGroup>
<Copy SourceFiles="@(NativeAssetToCopy)" DestinationFolder="$(OutDir).azurefunctions/runtimes/osx-arm64/native" />
</Target>
</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,257 @@
// ============================================================================
// Durable Agent Chat — powered by Durable Entities + Redis Streaming
//
// Each chat session is a durable entity that holds the full conversation
// history. When you send a message, the entity calls the LLM, executes any
// tool calls, and streams response chunks to Redis pub/sub in real-time.
// The HTTP layer subscribes to the Redis channel and forwards chunks as
// Server-Sent Events (SSE) to the client.
//
// Architecture:
// HTTP POST /api/chat/{sessionId}
// → subscribe to Redis channel "chat:{sessionId}:{correlationId}"
// → signal entity (fire-and-forget)
// → stream SSE from Redis subscription
// Entity receives signal:
// → runs agent loop (LLM ←→ tool execution)
// → publishes response chunks to Redis
// → publishes [DONE] when complete
// ============================================================================

using System.Text.Json;
using System.Threading.Channels;
using Microsoft.AspNetCore.Http;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.Functions.Worker;
using Microsoft.DurableTask.Client;
using Microsoft.DurableTask.Client.Entities;
using Microsoft.DurableTask.Entities;
using Microsoft.Extensions.AI;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;

namespace AgentDirectedWorkflows;

// ─── The Agent Entity ───
// Each entity instance is one chat session. The Durable Task framework
// automatically persists the entity's State after each operation.

public class ChatAgentEntity : TaskEntity<ChatAgentState>
{
private readonly IChatClient _chatClient;
private readonly IConnectionMultiplexer _redis;
private readonly ILogger<ChatAgentEntity> _logger;

public ChatAgentEntity(IChatClient chatClient, IConnectionMultiplexer redis, ILogger<ChatAgentEntity> logger)
{
_chatClient = chatClient;
_redis = redis;
_logger = logger;
}

/// <summary>
/// The core agent loop: send user message to LLM, execute any tool calls,
/// repeat until the LLM gives a final text reply. Streams response chunks
/// to Redis pub/sub for real-time delivery to the client.
/// </summary>
public async Task Message(ChatRequest request)
{
var channel = RedisChannel.Literal($"chat:{Context.Id.Key}:{request.CorrelationId}");
var pub = _redis.GetSubscriber();

try
{
State.Messages.Add(new ChatMsg("user", request.Message));

var messages = new List<ChatMessage> { new(ChatRole.System, "You are a helpful assistant.") };
foreach (var m in State.Messages)
messages.Add(new ChatMessage(m.Role == "assistant" ? ChatRole.Assistant : ChatRole.User, m.Content));

var options = new ChatOptions { Tools = AgentTools.AsAITools() };

// Agent loop: stream from LLM → publish chunks or handle tool calls
while (true)
{
var fullText = new System.Text.StringBuilder();
var toolCalls = new List<FunctionCallContent>();

await foreach (var update in _chatClient.GetStreamingResponseAsync(messages, options))
{
foreach (var content in update.Contents.OfType<FunctionCallContent>())
toolCalls.Add(content);

if (!string.IsNullOrEmpty(update.Text))
{
fullText.Append(update.Text);
var json = JsonSerializer.Serialize(new { type = "chunk", content = update.Text });
await pub.PublishAsync(channel, json);
}
}

if (toolCalls.Count > 0)
{
_logger.LogInformation("Executing tools: {Tools}", string.Join(", ", toolCalls.Select(t => t.Name)));
messages.Add(new ChatMessage(ChatRole.Assistant,
toolCalls.Select(tc => (AIContent)tc).ToList()));

foreach (var tc in toolCalls)
{
var result = AgentTools.Execute(tc.Name, tc.Arguments);
messages.Add(new ChatMessage(ChatRole.Tool, [new FunctionResultContent(tc.CallId, result)]));
}
continue;
}

var reply = fullText.ToString();
State.Messages.Add(new ChatMsg("assistant", reply));
await pub.PublishAsync(channel, JsonSerializer.Serialize(new { type = "done" }));
return;
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Agent loop failed");
var errorJson = JsonSerializer.Serialize(new { type = "error", content = ex.Message });
await pub.PublishAsync(channel, errorJson);
}
}

public List<ChatMsg> GetHistory() => State.Messages;

public void Reset() => State.Messages.Clear();

[Function(nameof(ChatAgentEntity))]
public static Task Dispatch([EntityTrigger] TaskEntityDispatcher dispatcher)
=> dispatcher.DispatchAsync<ChatAgentEntity>();
}

// ─── HTTP Endpoints ───

public class ChatEndpoints
{
private readonly IConnectionMultiplexer _redis;

public ChatEndpoints(IConnectionMultiplexer redis)
{
_redis = redis;
}

/// <summary>
/// Send a message to the agent. Streams SSE by default; add ?stream=false for a simple JSON response.
/// </summary>
[Function("SendMessage")]
public async Task SendMessage(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "chat/{sessionId}")] HttpRequest req,
string sessionId,
[DurableClient] DurableTaskClient client)
{
bool stream = !string.Equals(req.Query["stream"], "false", StringComparison.OrdinalIgnoreCase);
var body = await req.ReadFromJsonAsync<ChatRequest>();
var message = body?.Message ?? "Hello";
var correlationId = Guid.NewGuid().ToString("N");
var channel = RedisChannel.Literal($"chat:{sessionId}:{correlationId}");

// Subscribe to Redis BEFORE signaling the entity to avoid missing messages
var sub = _redis.GetSubscriber();
var queue = Channel.CreateUnbounded<string>();
await sub.SubscribeAsync(channel, (_, msg) => queue.Writer.TryWrite(msg!));

// Signal the entity (fire-and-forget)
var entityId = new EntityInstanceId(nameof(ChatAgentEntity), sessionId);
await client.Entities.SignalEntityAsync(entityId, "Message",
new ChatRequest(sessionId, message, correlationId));

var ct = req.HttpContext.RequestAborted;
using var timeout = new CancellationTokenSource(TimeSpan.FromMinutes(2));
using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, timeout.Token);

try
{
if (stream)
{
// Streaming mode: forward chunks as Server-Sent Events
req.HttpContext.Response.ContentType = "text/event-stream";
req.HttpContext.Response.Headers.CacheControl = "no-cache";

await foreach (var chunk in queue.Reader.ReadAllAsync(linked.Token))
{
await req.HttpContext.Response.WriteAsync($"data: {chunk}\n\n", linked.Token);
await req.HttpContext.Response.Body.FlushAsync(linked.Token);

try
{
var doc = JsonDocument.Parse(chunk);
var type = doc.RootElement.GetProperty("type").GetString();
if (type is "done" or "error") break;
}
catch { /* not JSON or missing type — keep streaming */ }
}
}
else
{
// Non-streaming mode: collect all chunks, return complete JSON response
var fullResponse = new System.Text.StringBuilder();
await foreach (var chunk in queue.Reader.ReadAllAsync(linked.Token))
{
try
{
var doc = JsonDocument.Parse(chunk);
var type = doc.RootElement.GetProperty("type").GetString();
if (type == "chunk")
fullResponse.Append(doc.RootElement.GetProperty("content").GetString());
if (type is "done" or "error")
{
if (type == "error")
{
req.HttpContext.Response.StatusCode = 500;
await req.HttpContext.Response.WriteAsJsonAsync(new
{
sessionId,
error = doc.RootElement.GetProperty("content").GetString()
}, linked.Token);
return;
}
break;
}
}
catch { /* keep reading */ }
}

await req.HttpContext.Response.WriteAsJsonAsync(new
{
sessionId,
message = fullResponse.ToString()
}, linked.Token);
}
}
finally
{
await sub.UnsubscribeAsync(channel);
}
}

/// <summary>Get the full conversation history for a session.</summary>
[Function("GetHistory")]
public async Task<IActionResult> GetHistory(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "chat/{sessionId}/history")] HttpRequest req,
string sessionId,
[DurableClient] DurableTaskClient client)
{
var entityId = new EntityInstanceId(nameof(ChatAgentEntity), sessionId);
var entity = await client.Entities.GetEntityAsync<ChatAgentState>(entityId);
if (entity is null) return new NotFoundResult();
return new OkObjectResult(new { sessionId, history = entity.State.Messages });
}

/// <summary>Reset a session's conversation history.</summary>
[Function("ResetSession")]
public async Task<IActionResult> Reset(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "chat/{sessionId}/reset")] HttpRequest req,
string sessionId,
[DurableClient] DurableTaskClient client)
{
var entityId = new EntityInstanceId(nameof(ChatAgentEntity), sessionId);
await client.Entities.SignalEntityAsync(entityId, "Reset");
return new OkObjectResult(new { sessionId, status = "reset" });
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
namespace AgentDirectedWorkflows;

public record ChatMsg(string Role, string Content);
public record ChatRequest(string SessionId, string Message, string? CorrelationId = null);

public class ChatAgentState
{
public List<ChatMsg> Messages { get; set; } = [];
}

public record ChatTool(string Name, string Description);
70 changes: 70 additions & 0 deletions samples/durable-functions/dotnet/AgentDirectedWorkflows/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
using System.Runtime.CompilerServices;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.AI;
using StackExchange.Redis;

var host = new HostBuilder()
.ConfigureFunctionsWebApplication()
.ConfigureServices((context, services) =>
{
services.AddApplicationInsightsTelemetryWorkerService();
services.ConfigureFunctionsApplicationInsights();

// Register IChatClient backed by Azure OpenAI.
// Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_DEPLOYMENT in local.settings.json.
string? aiEndpoint = context.Configuration["AZURE_OPENAI_ENDPOINT"];
string? aiDeployment = context.Configuration["AZURE_OPENAI_DEPLOYMENT"];

if (!string.IsNullOrEmpty(aiEndpoint) && !string.IsNullOrEmpty(aiDeployment))
{
services.AddSingleton<IChatClient>(_ =>
new Azure.AI.OpenAI.AzureOpenAIClient(
new Uri(aiEndpoint), new Azure.Identity.DefaultAzureCredential())
.GetChatClient(aiDeployment)
.AsIChatClient());
}
else
{
// Simple echo client for local dev without an Azure OpenAI deployment
services.AddSingleton<IChatClient>(new EchoChatClient());
}

// Register Redis for streaming response chunks from entities to HTTP endpoints
string redisConnection = context.Configuration["REDIS_CONNECTION_STRING"] ?? "localhost:6379";
services.AddSingleton<IConnectionMultiplexer>(_ => ConnectionMultiplexer.Connect(redisConnection));
})
.Build();

host.Run();

/// <summary>
/// Trivial IChatClient that echoes back the user message with simulated streaming.
/// Set AZURE_OPENAI_ENDPOINT + AZURE_OPENAI_DEPLOYMENT env vars to use a real LLM.
/// </summary>
class EchoChatClient : IChatClient
{
public Task<ChatResponse> GetResponseAsync(
IEnumerable<ChatMessage> messages, ChatOptions? options = null, CancellationToken ct = default)
{
var last = messages.LastOrDefault(m => m.Role == ChatRole.User)?.Text ?? "Hello";
var reply = new ChatMessage(ChatRole.Assistant, $"Echo: {last}");
return Task.FromResult(new ChatResponse([reply]));
}

public async IAsyncEnumerable<ChatResponseUpdate> GetStreamingResponseAsync(
IEnumerable<ChatMessage> messages, ChatOptions? options = null,
[EnumeratorCancellation] CancellationToken ct = default)
{
var last = messages.LastOrDefault(m => m.Role == ChatRole.User)?.Text ?? "Hello";
foreach (var word in $"Echo: {last}".Split(' '))
{
yield return new ChatResponseUpdate(ChatRole.Assistant, word + " ");
await Task.Delay(50, ct);
}
}

public void Dispose() { }
public object? GetService(Type serviceType, object? serviceKey = null) => null;
}
Loading
Loading