From 3ee4136ef008967f24d501f1c6311e99d2148b5a Mon Sep 17 00:00:00 2001 From: greenie-msft Date: Mon, 20 Apr 2026 11:53:21 -0700 Subject: [PATCH 1/4] Add agent-directed workflow samples for Durable Task SDK and Durable Functions (.NET) Demonstrates the entity-based agent loop pattern using durable entities with Redis pub/sub for real-time streaming. Both samples include: - ChatAgentEntity: durable entity holding conversation history - Agent loop with tool calling (get_weather) - SSE streaming and non-streaming (?stream=false) modes - Conversation history and reset endpoints - Echo fallback client when Azure OpenAI is not configured Durable Task SDK version: ASP.NET minimal API with direct DTS connection Durable Functions version: Azure Functions isolated worker with DTS backend Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../AgentDirectedWorkflows.csproj | 45 +++ .../AgentDirectedWorkflows/ChatAgentEntity.cs | 257 ++++++++++++++++++ .../AgentDirectedWorkflows/Models/Models.cs | 11 + .../dotnet/AgentDirectedWorkflows/Program.cs | 70 +++++ .../dotnet/AgentDirectedWorkflows/README.md | 207 ++++++++++++++ .../Tools/AgentTools.cs | 31 +++ .../dotnet/AgentDirectedWorkflows/host.json | 24 ++ .../AgentDirectedWorkflows.csproj | 19 ++ .../AgentDirectedWorkflows/ChatAgentEntity.cs | 120 ++++++++ .../AgentDirectedWorkflows/Models/Models.cs | 11 + .../Agents/AgentDirectedWorkflows/Program.cs | 196 +++++++++++++ .../Properties/launchSettings.json | 16 ++ .../Agents/AgentDirectedWorkflows/README.md | 207 ++++++++++++++ .../Tools/AgentTools.cs | 31 +++ 14 files changed, 1245 insertions(+) create mode 100644 samples/durable-functions/dotnet/AgentDirectedWorkflows/AgentDirectedWorkflows.csproj create mode 100644 samples/durable-functions/dotnet/AgentDirectedWorkflows/ChatAgentEntity.cs create mode 100644 samples/durable-functions/dotnet/AgentDirectedWorkflows/Models/Models.cs create mode 100644 samples/durable-functions/dotnet/AgentDirectedWorkflows/Program.cs create mode 100644 samples/durable-functions/dotnet/AgentDirectedWorkflows/README.md create mode 100644 samples/durable-functions/dotnet/AgentDirectedWorkflows/Tools/AgentTools.cs create mode 100644 samples/durable-functions/dotnet/AgentDirectedWorkflows/host.json create mode 100644 samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/AgentDirectedWorkflows.csproj create mode 100644 samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/ChatAgentEntity.cs create mode 100644 samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Models/Models.cs create mode 100644 samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Program.cs create mode 100644 samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Properties/launchSettings.json create mode 100644 samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/README.md create mode 100644 samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Tools/AgentTools.cs diff --git a/samples/durable-functions/dotnet/AgentDirectedWorkflows/AgentDirectedWorkflows.csproj b/samples/durable-functions/dotnet/AgentDirectedWorkflows/AgentDirectedWorkflows.csproj new file mode 100644 index 0000000..006d0f3 --- /dev/null +++ b/samples/durable-functions/dotnet/AgentDirectedWorkflows/AgentDirectedWorkflows.csproj @@ -0,0 +1,45 @@ + + + net10.0 + v4 + Exe + enable + enable + + + + + + + + + + + + + + + + + + + + + PreserveNewest + + + PreserveNewest + Never + + + + + + + + + + + + + diff --git a/samples/durable-functions/dotnet/AgentDirectedWorkflows/ChatAgentEntity.cs b/samples/durable-functions/dotnet/AgentDirectedWorkflows/ChatAgentEntity.cs new file mode 100644 index 0000000..b92add2 --- /dev/null +++ b/samples/durable-functions/dotnet/AgentDirectedWorkflows/ChatAgentEntity.cs @@ -0,0 +1,257 @@ +// ============================================================================ +// Durable Agent Chat — powered by Durable Entities + Redis Streaming +// +// Each chat session is a durable entity that holds the full conversation +// history. When you send a message, the entity calls the LLM, executes any +// tool calls, and streams response chunks to Redis pub/sub in real-time. +// The HTTP layer subscribes to the Redis channel and forwards chunks as +// Server-Sent Events (SSE) to the client. +// +// Architecture: +// HTTP POST /api/chat/{sessionId} +// → subscribe to Redis channel "chat:{sessionId}:{correlationId}" +// → signal entity (fire-and-forget) +// → stream SSE from Redis subscription +// Entity receives signal: +// → runs agent loop (LLM ←→ tool execution) +// → publishes response chunks to Redis +// → publishes [DONE] when complete +// ============================================================================ + +using System.Text.Json; +using System.Threading.Channels; +using Microsoft.AspNetCore.Http; +using Microsoft.AspNetCore.Mvc; +using Microsoft.Azure.Functions.Worker; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.Entities; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; + +namespace AgentDirectedWorkflows; + +// ─── The Agent Entity ─── +// Each entity instance is one chat session. The Durable Task framework +// automatically persists the entity's State after each operation. + +public class ChatAgentEntity : TaskEntity +{ + private readonly IChatClient _chatClient; + private readonly IConnectionMultiplexer _redis; + private readonly ILogger _logger; + + public ChatAgentEntity(IChatClient chatClient, IConnectionMultiplexer redis, ILogger logger) + { + _chatClient = chatClient; + _redis = redis; + _logger = logger; + } + + /// + /// The core agent loop: send user message to LLM, execute any tool calls, + /// repeat until the LLM gives a final text reply. Streams response chunks + /// to Redis pub/sub for real-time delivery to the client. + /// + public async Task Message(ChatRequest request) + { + var channel = RedisChannel.Literal($"chat:{Context.Id.Key}:{request.CorrelationId}"); + var pub = _redis.GetSubscriber(); + + try + { + State.Messages.Add(new ChatMsg("user", request.Message)); + + var messages = new List { new(ChatRole.System, "You are a helpful assistant.") }; + foreach (var m in State.Messages) + messages.Add(new ChatMessage(m.Role == "assistant" ? ChatRole.Assistant : ChatRole.User, m.Content)); + + var options = new ChatOptions { Tools = AgentTools.AsAITools() }; + + // Agent loop: stream from LLM → publish chunks or handle tool calls + while (true) + { + var fullText = new System.Text.StringBuilder(); + var toolCalls = new List(); + + await foreach (var update in _chatClient.GetStreamingResponseAsync(messages, options)) + { + foreach (var content in update.Contents.OfType()) + toolCalls.Add(content); + + if (!string.IsNullOrEmpty(update.Text)) + { + fullText.Append(update.Text); + var json = JsonSerializer.Serialize(new { type = "chunk", content = update.Text }); + await pub.PublishAsync(channel, json); + } + } + + if (toolCalls.Count > 0) + { + _logger.LogInformation("Executing tools: {Tools}", string.Join(", ", toolCalls.Select(t => t.Name))); + messages.Add(new ChatMessage(ChatRole.Assistant, + toolCalls.Select(tc => (AIContent)tc).ToList())); + + foreach (var tc in toolCalls) + { + var result = AgentTools.Execute(tc.Name, tc.Arguments); + messages.Add(new ChatMessage(ChatRole.Tool, [new FunctionResultContent(tc.CallId, result)])); + } + continue; + } + + var reply = fullText.ToString(); + State.Messages.Add(new ChatMsg("assistant", reply)); + await pub.PublishAsync(channel, JsonSerializer.Serialize(new { type = "done" })); + return; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Agent loop failed"); + var errorJson = JsonSerializer.Serialize(new { type = "error", content = ex.Message }); + await pub.PublishAsync(channel, errorJson); + } + } + + public List GetHistory() => State.Messages; + + public void Reset() => State.Messages.Clear(); + + [Function(nameof(ChatAgentEntity))] + public static Task Dispatch([EntityTrigger] TaskEntityDispatcher dispatcher) + => dispatcher.DispatchAsync(); +} + +// ─── HTTP Endpoints ─── + +public class ChatEndpoints +{ + private readonly IConnectionMultiplexer _redis; + + public ChatEndpoints(IConnectionMultiplexer redis) + { + _redis = redis; + } + + /// + /// Send a message to the agent. Streams SSE by default; add ?stream=false for a simple JSON response. + /// + [Function("SendMessage")] + public async Task SendMessage( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "chat/{sessionId}")] HttpRequest req, + string sessionId, + [DurableClient] DurableTaskClient client) + { + bool stream = !string.Equals(req.Query["stream"], "false", StringComparison.OrdinalIgnoreCase); + var body = await req.ReadFromJsonAsync(); + var message = body?.Message ?? "Hello"; + var correlationId = Guid.NewGuid().ToString("N"); + var channel = RedisChannel.Literal($"chat:{sessionId}:{correlationId}"); + + // Subscribe to Redis BEFORE signaling the entity to avoid missing messages + var sub = _redis.GetSubscriber(); + var queue = Channel.CreateUnbounded(); + await sub.SubscribeAsync(channel, (_, msg) => queue.Writer.TryWrite(msg!)); + + // Signal the entity (fire-and-forget) + var entityId = new EntityInstanceId(nameof(ChatAgentEntity), sessionId); + await client.Entities.SignalEntityAsync(entityId, "Message", + new ChatRequest(sessionId, message, correlationId)); + + var ct = req.HttpContext.RequestAborted; + using var timeout = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, timeout.Token); + + try + { + if (stream) + { + // Streaming mode: forward chunks as Server-Sent Events + req.HttpContext.Response.ContentType = "text/event-stream"; + req.HttpContext.Response.Headers.CacheControl = "no-cache"; + + await foreach (var chunk in queue.Reader.ReadAllAsync(linked.Token)) + { + await req.HttpContext.Response.WriteAsync($"data: {chunk}\n\n", linked.Token); + await req.HttpContext.Response.Body.FlushAsync(linked.Token); + + try + { + var doc = JsonDocument.Parse(chunk); + var type = doc.RootElement.GetProperty("type").GetString(); + if (type is "done" or "error") break; + } + catch { /* not JSON or missing type — keep streaming */ } + } + } + else + { + // Non-streaming mode: collect all chunks, return complete JSON response + var fullResponse = new System.Text.StringBuilder(); + await foreach (var chunk in queue.Reader.ReadAllAsync(linked.Token)) + { + try + { + var doc = JsonDocument.Parse(chunk); + var type = doc.RootElement.GetProperty("type").GetString(); + if (type == "chunk") + fullResponse.Append(doc.RootElement.GetProperty("content").GetString()); + if (type is "done" or "error") + { + if (type == "error") + { + req.HttpContext.Response.StatusCode = 500; + await req.HttpContext.Response.WriteAsJsonAsync(new + { + sessionId, + error = doc.RootElement.GetProperty("content").GetString() + }, linked.Token); + return; + } + break; + } + } + catch { /* keep reading */ } + } + + await req.HttpContext.Response.WriteAsJsonAsync(new + { + sessionId, + message = fullResponse.ToString() + }, linked.Token); + } + } + finally + { + await sub.UnsubscribeAsync(channel); + } + } + + /// Get the full conversation history for a session. + [Function("GetHistory")] + public async Task GetHistory( + [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = "chat/{sessionId}/history")] HttpRequest req, + string sessionId, + [DurableClient] DurableTaskClient client) + { + var entityId = new EntityInstanceId(nameof(ChatAgentEntity), sessionId); + var entity = await client.Entities.GetEntityAsync(entityId); + if (entity is null) return new NotFoundResult(); + return new OkObjectResult(new { sessionId, history = entity.State.Messages }); + } + + /// Reset a session's conversation history. + [Function("ResetSession")] + public async Task Reset( + [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "chat/{sessionId}/reset")] HttpRequest req, + string sessionId, + [DurableClient] DurableTaskClient client) + { + var entityId = new EntityInstanceId(nameof(ChatAgentEntity), sessionId); + await client.Entities.SignalEntityAsync(entityId, "Reset"); + return new OkObjectResult(new { sessionId, status = "reset" }); + } +} diff --git a/samples/durable-functions/dotnet/AgentDirectedWorkflows/Models/Models.cs b/samples/durable-functions/dotnet/AgentDirectedWorkflows/Models/Models.cs new file mode 100644 index 0000000..e2b9296 --- /dev/null +++ b/samples/durable-functions/dotnet/AgentDirectedWorkflows/Models/Models.cs @@ -0,0 +1,11 @@ +namespace AgentDirectedWorkflows; + +public record ChatMsg(string Role, string Content); +public record ChatRequest(string SessionId, string Message, string? CorrelationId = null); + +public class ChatAgentState +{ + public List Messages { get; set; } = []; +} + +public record ChatTool(string Name, string Description); diff --git a/samples/durable-functions/dotnet/AgentDirectedWorkflows/Program.cs b/samples/durable-functions/dotnet/AgentDirectedWorkflows/Program.cs new file mode 100644 index 0000000..a2a5e2f --- /dev/null +++ b/samples/durable-functions/dotnet/AgentDirectedWorkflows/Program.cs @@ -0,0 +1,70 @@ +using System.Runtime.CompilerServices; +using Microsoft.Azure.Functions.Worker; +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.AI; +using StackExchange.Redis; + +var host = new HostBuilder() + .ConfigureFunctionsWebApplication() + .ConfigureServices((context, services) => + { + services.AddApplicationInsightsTelemetryWorkerService(); + services.ConfigureFunctionsApplicationInsights(); + + // Register IChatClient backed by Azure OpenAI. + // Set AZURE_OPENAI_ENDPOINT and AZURE_OPENAI_DEPLOYMENT in local.settings.json. + string? aiEndpoint = context.Configuration["AZURE_OPENAI_ENDPOINT"]; + string? aiDeployment = context.Configuration["AZURE_OPENAI_DEPLOYMENT"]; + + if (!string.IsNullOrEmpty(aiEndpoint) && !string.IsNullOrEmpty(aiDeployment)) + { + services.AddSingleton(_ => + new Azure.AI.OpenAI.AzureOpenAIClient( + new Uri(aiEndpoint), new Azure.Identity.DefaultAzureCredential()) + .GetChatClient(aiDeployment) + .AsIChatClient()); + } + else + { + // Simple echo client for local dev without an Azure OpenAI deployment + services.AddSingleton(new EchoChatClient()); + } + + // Register Redis for streaming response chunks from entities to HTTP endpoints + string redisConnection = context.Configuration["REDIS_CONNECTION_STRING"] ?? "localhost:6379"; + services.AddSingleton(_ => ConnectionMultiplexer.Connect(redisConnection)); + }) + .Build(); + +host.Run(); + +/// +/// Trivial IChatClient that echoes back the user message with simulated streaming. +/// Set AZURE_OPENAI_ENDPOINT + AZURE_OPENAI_DEPLOYMENT env vars to use a real LLM. +/// +class EchoChatClient : IChatClient +{ + public Task GetResponseAsync( + IEnumerable messages, ChatOptions? options = null, CancellationToken ct = default) + { + var last = messages.LastOrDefault(m => m.Role == ChatRole.User)?.Text ?? "Hello"; + var reply = new ChatMessage(ChatRole.Assistant, $"Echo: {last}"); + return Task.FromResult(new ChatResponse([reply])); + } + + public async IAsyncEnumerable GetStreamingResponseAsync( + IEnumerable messages, ChatOptions? options = null, + [EnumeratorCancellation] CancellationToken ct = default) + { + var last = messages.LastOrDefault(m => m.Role == ChatRole.User)?.Text ?? "Hello"; + foreach (var word in $"Echo: {last}".Split(' ')) + { + yield return new ChatResponseUpdate(ChatRole.Assistant, word + " "); + await Task.Delay(50, ct); + } + } + + public void Dispose() { } + public object? GetService(Type serviceType, object? serviceKey = null) => null; +} diff --git a/samples/durable-functions/dotnet/AgentDirectedWorkflows/README.md b/samples/durable-functions/dotnet/AgentDirectedWorkflows/README.md new file mode 100644 index 0000000..e1b1b91 --- /dev/null +++ b/samples/durable-functions/dotnet/AgentDirectedWorkflows/README.md @@ -0,0 +1,207 @@ +# Agent-Directed Workflows — Durable Functions (.NET) + +This sample demonstrates how to build an **agent-directed workflow** (agent loop) using [durable entities](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-entities) in [Azure Durable Functions](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview) with the [Durable Task Scheduler](https://learn.microsoft.com/azure/durable-task/overview). + +It corresponds to the **"Build your own using orchestrations and entities"** approach described in the [Durable Task for AI Agents](https://learn.microsoft.com/azure/durable-task/sdks/durable-task-for-ai-agents) comparison table, using the **entity-based agent loop** pattern from [Agentic Application Patterns](https://learn.microsoft.com/azure/durable-task/sdks/durable-agents-patterns#entity-based-agent-loops). + +## What Is an Agent-Directed Workflow? + +In a typical deterministic workflow, *your code* controls the execution path — you define the sequence of steps ahead of time. In an **agent-directed workflow** (also called an agent loop), the **LLM drives the control flow**. You provide tools and instructions, but the agent decides which tools to call, in what order, and when the task is complete. The execution path isn't known until runtime. + +This pattern is well suited for conversational agents with tool-calling capabilities, open-ended research tasks, and any scenario where the number and order of steps can't be predicted. + +## How This Sample Works + +### The Core Idea + +Each chat session is a **durable entity** — a long-lived, stateful actor managed by the Durable Task Scheduler. The entity holds the full conversation history in its state. When you send it a message, it runs the agent loop internally and **streams the LLM's response in real-time** via Redis pub/sub: + +``` +User: "What's the weather in Seattle?" + │ + ▼ +┌──────────────────────────────────────────────────────┐ +│ HTTP POST /api/chat/{sessionId} │ +│ │ +│ 1. Subscribe to Redis channel │ +│ 2. Signal entity (fire-and-forget) │ +│ 3. Stream SSE events from Redis to client │ +└────────────────────┬─────────────────────────────────┘ + │ (signal) + ▼ +┌──────────────────────────────────────────────────────┐ +│ ChatAgentEntity (durable entity) │ +│ │ +│ State = conversation history ← auto-persisted │ +│ │ +│ 1. Add user message to state │ +│ 2. Stream LLM response │ +│ 3. LLM returns tool calls? │ +│ ├─ YES → execute tools, go to 2 │ +│ └─ NO → publish chunks to Redis as they arrive │ +│ 4. Save full reply to state │ +│ 5. Publish "done" event to Redis │ +└──────────────────────────────────────────────────────┘ +``` + +**Key properties:** +- **Durable state**: The entity's conversation history survives restarts, crashes, and scaling events. +- **Real-time streaming**: LLM response tokens are published to Redis and forwarded to the client as SSE events as they're generated. +- **No orchestration bridge**: The HTTP layer signals the entity (fire-and-forget) and reads the response from Redis — no intermediate orchestration needed. + +### Why Entities? + +Durable entities are a natural fit for AI agents because: + +- **Long-lived state**: An entity persists for as long as you need it. A chat session can last minutes, days, or weeks. +- **Automatic persistence**: You just read and write `State` — the framework handles serialization and storage. +- **Actor model**: Each entity processes one operation at a time, so there are no concurrency issues with the conversation history. +- **Addressable**: Each entity has a unique ID (the session ID), making it easy to route messages to the right agent. + +### Streaming via Redis + +Instead of request-response (which blocks until the full LLM reply is generated), this sample uses **Redis pub/sub** to stream response chunks in real-time: + +1. The HTTP handler subscribes to a Redis channel **before** signaling the entity +2. The entity runs the agent loop and publishes each token to Redis as the LLM generates it +3. The HTTP handler forwards each chunk to the client as a Server-Sent Event (SSE) +4. When the entity finishes, it publishes a `done` event + +Each request uses a unique `correlationId` in the channel name to isolate concurrent conversations. + +### Project Structure + +``` +AgentDirectedWorkflows/ +├── ChatAgentEntity.cs # The durable entity (agent) and HTTP endpoints +├── Program.cs # Host setup: IChatClient + Redis registration +├── Models/ +│ └── Models.cs # Data types: ChatMsg, ChatRequest, ChatAgentState, ChatTool +├── Tools/ +│ └── AgentTools.cs # Tool definitions and execution logic +├── host.json # Durable Functions config (storage provider, connection string) +└── local.settings.json # Connection strings, Redis, and Azure OpenAI settings +``` + +### Key Files Explained + +**`ChatAgentEntity.cs`** — Contains two things: +- `ChatAgentEntity` — The durable entity. Its `Message()` method is the agent loop: it streams from the LLM via `GetStreamingResponseAsync`, publishes text chunks to Redis, checks for tool calls, and repeats until the LLM gives a final text reply. State (conversation history) is persisted automatically. +- `ChatEndpoints` — HTTP triggers that expose the agent as a streaming API. The `SendMessage` endpoint sets up an SSE response, subscribes to the Redis channel, signals the entity, and streams chunks back. + +**`Tools/AgentTools.cs`** — Defines what tools the LLM can call. Currently includes `get_weather` (with a hardcoded response). To add new capabilities, add a definition and an execution case. The LLM discovers available tools automatically. + +**`Program.cs`** — Registers the `IChatClient` implementation via [Microsoft.Extensions.AI](https://learn.microsoft.com/dotnet/ai/microsoft-extensions-ai) and the Redis `IConnectionMultiplexer`. If Azure OpenAI environment variables are set, it uses a real LLM. Otherwise, it falls back to a simple echo client for local development. + +## Prerequisites + +1. [.NET 10 SDK](https://dotnet.microsoft.com/download/dotnet/10.0) or later +2. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler emulator and Redis) +3. [Azure Functions Core Tools](https://learn.microsoft.com/azure/azure-functions/functions-run-local) v4 + +## Running the Sample + +### 1. Start the Durable Task Scheduler emulator and Redis + +```bash +# Durable Task Scheduler emulator +docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + +# Redis +docker run --name redis -d -p 6379:6379 redis:latest +``` + +### 2. (Optional) Configure Azure OpenAI + +Update `local.settings.json` to use a real LLM instead of the echo fallback: + +```json +{ + "Values": { + "AZURE_OPENAI_ENDPOINT": "https://.openai.azure.com", + "AZURE_OPENAI_DEPLOYMENT": "gpt-4o" + } +} +``` + +The sample uses `DefaultAzureCredential` for authentication — make sure you're signed in via `az login`. + +### 3. Run the function app + +```bash +func start +``` + +### 4. Chat with the agent + +```bash +# Streaming (default) — response streams as Server-Sent Events +curl -N -X POST http://localhost:7071/api/chat/session1 \ + -H "Content-Type: application/json" \ + -d '{"message":"What is the weather in Seattle?"}' + +# SSE output: +# data: {"type":"chunk","content":"It's"} +# data: {"type":"chunk","content":" 72°F"} +# data: {"type":"chunk","content":" and sunny"} +# data: {"type":"chunk","content":" in Seattle."} +# data: {"type":"done"} + +# Non-streaming — waits for the full response and returns JSON +curl -X POST "http://localhost:7071/api/chat/session1?stream=false" \ + -H "Content-Type: application/json" \ + -d '{"message":"What is the weather in Seattle?"}' + +# JSON output: +# {"sessionId":"session1","message":"It's 72°F and sunny in Seattle."} + +# View the full conversation history +curl http://localhost:7071/api/chat/session1/history + +# Reset the conversation +curl -X POST http://localhost:7071/api/chat/session1/reset +``` + +> **Tip:** Use `curl -N` (no buffering) to see SSE events in real-time when streaming. + +## API Reference + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/api/chat/{sessionId}` | Send a message and stream the reply as SSE (default). Add `?stream=false` for a JSON response. Body: `{"message": "..."}` | +| GET | `/api/chat/{sessionId}/history` | Get the full conversation history for a session | +| POST | `/api/chat/{sessionId}/reset` | Clear a session's conversation history | + +### SSE Event Format + +Each SSE event is a JSON object with a `type` field: + +| Type | Description | Example | +|------|-------------|---------| +| `chunk` | A token from the LLM response | `{"type":"chunk","content":"Hello"}` | +| `done` | The response is complete | `{"type":"done"}` | +| `error` | An error occurred | `{"type":"error","content":"..."}` | + +## Adding New Tools + +To give the agent new capabilities, edit `Tools/AgentTools.cs`: + +1. Add a tool definition to the `Definitions` array +2. Add a case to the `Execute()` switch + +```csharp +// In Definitions: +new("search_web", "Search the web for information"), + +// In Execute(): +"search_web" => SearchTheWeb(args), +``` + +The LLM will automatically discover the new tool and call it when appropriate. + +## Learn More + +- [Durable Task for AI Agents](https://learn.microsoft.com/azure/durable-task/sdks/durable-task-for-ai-agents) — Overview of how durable execution solves production challenges for AI agents +- [Agentic Application Patterns](https://learn.microsoft.com/azure/durable-task/sdks/durable-agents-patterns) — All supported patterns (deterministic workflows, agent loops, orchestration-based vs entity-based) +- [Durable Entities](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-entities) — Deep dive on the entity programming model +- [Microsoft.Extensions.AI](https://learn.microsoft.com/dotnet/ai/microsoft-extensions-ai) — The AI abstraction layer used for LLM integration diff --git a/samples/durable-functions/dotnet/AgentDirectedWorkflows/Tools/AgentTools.cs b/samples/durable-functions/dotnet/AgentDirectedWorkflows/Tools/AgentTools.cs new file mode 100644 index 0000000..b60718d --- /dev/null +++ b/samples/durable-functions/dotnet/AgentDirectedWorkflows/Tools/AgentTools.cs @@ -0,0 +1,31 @@ +using Microsoft.Extensions.AI; + +namespace AgentDirectedWorkflows; + +/// +/// Tools that the agent can use. Add new tools here to give the agent new capabilities. +/// Each tool needs: a definition (name + description for the LLM) and an implementation. +/// +public static class AgentTools +{ + // Tool definitions — these are sent to the LLM so it knows what it can call. + public static readonly ChatTool[] Definitions = [ + new("get_weather", "Get current weather for a location"), + ]; + + /// Builds the AITool list that the LLM understands. + public static List AsAITools() => + Definitions.Select(t => + AIFunctionFactory.Create((string location) => "", t.Name, t.Description) as AITool).ToList(); + + /// Executes a tool by name and returns the result string. + public static string Execute(string name, IDictionary? args) + { + var location = args?.Values.FirstOrDefault()?.ToString() ?? "unknown"; + return name switch + { + "get_weather" => $"72°F and sunny in {location}", + _ => $"Unknown tool: {name}", + }; + } +} diff --git a/samples/durable-functions/dotnet/AgentDirectedWorkflows/host.json b/samples/durable-functions/dotnet/AgentDirectedWorkflows/host.json new file mode 100644 index 0000000..c7132fa --- /dev/null +++ b/samples/durable-functions/dotnet/AgentDirectedWorkflows/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + }, + "enableLiveMetricsFilters": true + }, + "logLevel": { + "DurableTask.AzureManagedBackend": "Information" + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DurableTaskSchedulerConnection" + }, + "hubName": "%TASKHUB_NAME%" + } + } +} \ No newline at end of file diff --git a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/AgentDirectedWorkflows.csproj b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/AgentDirectedWorkflows.csproj new file mode 100644 index 0000000..a1f8b49 --- /dev/null +++ b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/AgentDirectedWorkflows.csproj @@ -0,0 +1,19 @@ + + + + net10.0 + enable + enable + + + + + + + + + + + + + diff --git a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/ChatAgentEntity.cs b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/ChatAgentEntity.cs new file mode 100644 index 0000000..60947db --- /dev/null +++ b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/ChatAgentEntity.cs @@ -0,0 +1,120 @@ +// ============================================================================ +// Durable Agent Chat — powered by Durable Entities + Redis Streaming +// +// Each chat session is a durable entity that holds the full conversation +// history. When you send a message, the entity calls the LLM, executes any +// tool calls, and streams response chunks to Redis pub/sub in real-time. +// The HTTP layer subscribes to the Redis channel and forwards chunks as +// Server-Sent Events (SSE) to the client. +// +// Architecture: +// HTTP POST /chat/{sessionId} +// → subscribe to Redis channel "chat:{sessionId}:{correlationId}" +// → signal entity (fire-and-forget) +// → stream SSE from Redis subscription +// Entity receives signal: +// → runs agent loop (LLM ←→ tool execution) +// → publishes response chunks to Redis +// → publishes [DONE] when complete +// ============================================================================ + +using System.Text.Json; +using Microsoft.DurableTask.Entities; +using Microsoft.Extensions.AI; +using Microsoft.Extensions.Logging; +using StackExchange.Redis; + +namespace AgentDirectedWorkflows; + +// ─── The Agent Entity ─── +// Each entity instance is one chat session. The Durable Task framework +// automatically persists the entity's State after each operation. + +public class ChatAgentEntity : TaskEntity +{ + private readonly IChatClient _chatClient; + private readonly IConnectionMultiplexer _redis; + private readonly ILogger _logger; + + public ChatAgentEntity(IChatClient chatClient, IConnectionMultiplexer redis, ILogger logger) + { + _chatClient = chatClient; + _redis = redis; + _logger = logger; + } + + /// + /// The core agent loop: send user message to LLM, execute any tool calls, + /// repeat until the LLM gives a final text reply. Streams response chunks + /// to Redis pub/sub for real-time delivery to the client. + /// + public async Task Message(ChatRequest request) + { + var channel = RedisChannel.Literal($"chat:{Context.Id.Key}:{request.CorrelationId}"); + var pub = _redis.GetSubscriber(); + + try + { + State.Messages.Add(new ChatMsg("user", request.Message)); + + var messages = new List { new(ChatRole.System, "You are a helpful assistant.") }; + foreach (var m in State.Messages) + messages.Add(new ChatMessage(m.Role == "assistant" ? ChatRole.Assistant : ChatRole.User, m.Content)); + + var options = new ChatOptions { Tools = AgentTools.AsAITools() }; + + // Agent loop: stream from LLM → publish chunks or handle tool calls + while (true) + { + var fullText = new System.Text.StringBuilder(); + var toolCalls = new List(); + + await foreach (var update in _chatClient.GetStreamingResponseAsync(messages, options)) + { + // Accumulate tool calls + foreach (var content in update.Contents.OfType()) + toolCalls.Add(content); + + // Stream text chunks to Redis immediately + if (!string.IsNullOrEmpty(update.Text)) + { + fullText.Append(update.Text); + var json = JsonSerializer.Serialize(new { type = "chunk", content = update.Text }); + await pub.PublishAsync(channel, json); + } + } + + if (toolCalls.Count > 0) + { + // LLM wants to call tools — execute and loop + _logger.LogInformation("Executing tools: {Tools}", string.Join(", ", toolCalls.Select(t => t.Name))); + messages.Add(new ChatMessage(ChatRole.Assistant, + toolCalls.Select(tc => (AIContent)tc).ToList())); + + foreach (var tc in toolCalls) + { + var result = AgentTools.Execute(tc.Name, tc.Arguments); + messages.Add(new ChatMessage(ChatRole.Tool, [new FunctionResultContent(tc.CallId, result)])); + } + continue; + } + + // Final text reply — save to durable state + var reply = fullText.ToString(); + State.Messages.Add(new ChatMsg("assistant", reply)); + await pub.PublishAsync(channel, JsonSerializer.Serialize(new { type = "done" })); + return; + } + } + catch (Exception ex) + { + _logger.LogError(ex, "Agent loop failed"); + var errorJson = JsonSerializer.Serialize(new { type = "error", content = ex.Message }); + await pub.PublishAsync(channel, errorJson); + } + } + + public List GetHistory() => State.Messages; + + public void Reset() => State.Messages.Clear(); +} diff --git a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Models/Models.cs b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Models/Models.cs new file mode 100644 index 0000000..e2b9296 --- /dev/null +++ b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Models/Models.cs @@ -0,0 +1,11 @@ +namespace AgentDirectedWorkflows; + +public record ChatMsg(string Role, string Content); +public record ChatRequest(string SessionId, string Message, string? CorrelationId = null); + +public class ChatAgentState +{ + public List Messages { get; set; } = []; +} + +public record ChatTool(string Name, string Description); diff --git a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Program.cs b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Program.cs new file mode 100644 index 0000000..8106ab2 --- /dev/null +++ b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Program.cs @@ -0,0 +1,196 @@ +using System.Runtime.CompilerServices; +using System.Text.Json; +using System.Threading.Channels; +using AgentDirectedWorkflows; +using Microsoft.DurableTask.Client; +using Microsoft.DurableTask.Client.AzureManaged; +using Microsoft.DurableTask.Entities; +using Microsoft.DurableTask.Worker; +using Microsoft.DurableTask.Worker.AzureManaged; +using Microsoft.Extensions.AI; +using StackExchange.Redis; + +var builder = WebApplication.CreateBuilder(args); + +string connectionString = builder.Configuration["DURABLE_TASK_SCHEDULER_CONNECTION_STRING"] + ?? "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None"; + +string redisConnection = builder.Configuration["REDIS_CONNECTION_STRING"] ?? "localhost:6379"; + +// Configure Azure OpenAI if available, otherwise use a simple echo client for demo purposes. +string? aiEndpoint = builder.Configuration["AZURE_OPENAI_ENDPOINT"]; +string? aiDeployment = builder.Configuration["AZURE_OPENAI_DEPLOYMENT"]; + +if (!string.IsNullOrEmpty(aiEndpoint) && !string.IsNullOrEmpty(aiDeployment)) +{ + builder.Services.AddSingleton(_ => + new Azure.AI.OpenAI.AzureOpenAIClient(new Uri(aiEndpoint), new Azure.Identity.DefaultAzureCredential()) + .GetChatClient(aiDeployment) + .AsIChatClient()); +} +else +{ + builder.Services.AddSingleton(new EchoChatClient()); +} + +// Register Redis +builder.Services.AddSingleton(_ => ConnectionMultiplexer.Connect(redisConnection)); + +// Register Durable Task worker (entity only — no orchestration bridge needed) +builder.Services.AddDurableTaskWorker(b => +{ + b.AddTasks(r => + { + r.AddEntity(nameof(ChatAgentEntity), sp => + ActivatorUtilities.CreateInstance(sp)); + }); + b.UseDurableTaskScheduler(connectionString); +}); +builder.Services.AddDurableTaskClient(b => b.UseDurableTaskScheduler(connectionString)); + +builder.Services.AddLogging(l => l.AddSimpleConsole(o => +{ + o.SingleLine = true; + o.UseUtcTimestamp = true; + o.TimestampFormat = "yyyy-MM-ddTHH:mm:ss.fffZ "; +}).SetMinimumLevel(LogLevel.Warning).AddFilter("AgentDirectedWorkflows", LogLevel.Information)); + +var app = builder.Build(); + +// ─── HTTP Endpoints ─── +// Each sessionId maps to a durable entity instance — a persistent agent with durable state. + +// Send a message to the agent. Streams SSE by default; add ?stream=false for a simple JSON response. +app.MapPost("/chat/{sessionId}", async (HttpContext httpContext, DurableTaskClient client, + IConnectionMultiplexer redis, string sessionId, ChatRequest req) => +{ + bool stream = !string.Equals(httpContext.Request.Query["stream"], "false", StringComparison.OrdinalIgnoreCase); + var correlationId = Guid.NewGuid().ToString("N"); + var channel = RedisChannel.Literal($"chat:{sessionId}:{correlationId}"); + + // Subscribe to Redis BEFORE signaling the entity to avoid missing messages + var sub = redis.GetSubscriber(); + var queue = Channel.CreateUnbounded(); + await sub.SubscribeAsync(channel, (_, message) => queue.Writer.TryWrite(message!)); + + // Signal the entity (fire-and-forget) — it will publish chunks to Redis + var entityId = new EntityInstanceId(nameof(ChatAgentEntity), sessionId); + await client.Entities.SignalEntityAsync(entityId, "Message", + new ChatRequest(sessionId, req.Message, correlationId)); + + var ct = httpContext.RequestAborted; + using var timeout = new CancellationTokenSource(TimeSpan.FromMinutes(2)); + using var linked = CancellationTokenSource.CreateLinkedTokenSource(ct, timeout.Token); + + try + { + if (stream) + { + // Streaming mode: forward chunks as Server-Sent Events + httpContext.Response.ContentType = "text/event-stream"; + httpContext.Response.Headers.CacheControl = "no-cache"; + + await foreach (var message in queue.Reader.ReadAllAsync(linked.Token)) + { + await httpContext.Response.WriteAsync($"data: {message}\n\n", linked.Token); + await httpContext.Response.Body.FlushAsync(linked.Token); + + try + { + var doc = JsonDocument.Parse(message); + var type = doc.RootElement.GetProperty("type").GetString(); + if (type is "done" or "error") break; + } + catch { /* not JSON or no type field — keep streaming */ } + } + } + else + { + // Non-streaming mode: collect all chunks, return complete JSON response + var fullResponse = new System.Text.StringBuilder(); + await foreach (var message in queue.Reader.ReadAllAsync(linked.Token)) + { + try + { + var doc = JsonDocument.Parse(message); + var type = doc.RootElement.GetProperty("type").GetString(); + if (type == "chunk") + fullResponse.Append(doc.RootElement.GetProperty("content").GetString()); + if (type is "done" or "error") + { + if (type == "error") + { + httpContext.Response.StatusCode = 500; + await httpContext.Response.WriteAsJsonAsync(new + { + sessionId, + error = doc.RootElement.GetProperty("content").GetString() + }, linked.Token); + return; + } + break; + } + } + catch { /* keep reading */ } + } + + await httpContext.Response.WriteAsJsonAsync(new + { + sessionId, + message = fullResponse.ToString() + }, linked.Token); + } + } + finally + { + await sub.UnsubscribeAsync(channel); + } +}); + +// Get conversation history +app.MapGet("/chat/{sessionId}/history", async (DurableTaskClient client, string sessionId) => +{ + var entityId = new EntityInstanceId(nameof(ChatAgentEntity), sessionId); + var entity = await client.Entities.GetEntityAsync(entityId); + if (entity is null) return Results.NotFound(); + return Results.Ok(new { sessionId, history = entity.State.Messages }); +}); + +// Reset a conversation +app.MapPost("/chat/{sessionId}/reset", async (DurableTaskClient client, string sessionId) => +{ + var entityId = new EntityInstanceId(nameof(ChatAgentEntity), sessionId); + await client.Entities.SignalEntityAsync(entityId, "Reset"); + return Results.Ok(new { sessionId, status = "reset" }); +}); + +app.Run(); + +/// +/// Trivial IChatClient that echoes back the user message with simulated streaming. +/// Set AZURE_OPENAI_ENDPOINT + AZURE_OPENAI_DEPLOYMENT env vars to use a real LLM. +/// +class EchoChatClient : IChatClient +{ + public Task GetResponseAsync( + IEnumerable messages, ChatOptions? options = null, CancellationToken ct = default) + { + var last = messages.LastOrDefault(m => m.Role == ChatRole.User)?.Text ?? "Hello"; + return Task.FromResult(new ChatResponse([new ChatMessage(ChatRole.Assistant, $"Echo: {last}")])); + } + + public async IAsyncEnumerable GetStreamingResponseAsync( + IEnumerable messages, ChatOptions? options = null, + [EnumeratorCancellation] CancellationToken ct = default) + { + var last = messages.LastOrDefault(m => m.Role == ChatRole.User)?.Text ?? "Hello"; + foreach (var word in $"Echo: {last}".Split(' ')) + { + yield return new ChatResponseUpdate(ChatRole.Assistant, word + " "); + await Task.Delay(50, ct); + } + } + + public void Dispose() { } + public object? GetService(Type serviceType, object? serviceKey = null) => null; +} diff --git a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Properties/launchSettings.json b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Properties/launchSettings.json new file mode 100644 index 0000000..2304e00 --- /dev/null +++ b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Properties/launchSettings.json @@ -0,0 +1,16 @@ +{ + "$schema": "http://json.schemastore.org/launchsettings.json", + "profiles": { + "http": { + "commandName": "Project", + "dotnetRunMessages": true, + "launchBrowser": false, + "applicationUrl": "http://localhost:5000", + "environmentVariables": { + "ASPNETCORE_ENVIRONMENT": "Development", + "AZURE_OPENAI_ENDPOINT": "https://YOUR-RESOURCE.openai.azure.com/", + "AZURE_OPENAI_DEPLOYMENT": "gpt-4o" + } + } + } +} diff --git a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/README.md b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/README.md new file mode 100644 index 0000000..2a2cafb --- /dev/null +++ b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/README.md @@ -0,0 +1,207 @@ +# Agent-Directed Workflows — Durable Task SDK (.NET) + +This sample demonstrates how to build an **agent-directed workflow** (agent loop) using [durable entities](https://learn.microsoft.com/azure/durable-task/concepts/entities) from the [Durable Task SDK for .NET](https://github.com/microsoft/durabletask-dotnet) with the [Durable Task Scheduler](https://learn.microsoft.com/azure/durable-task/overview). + +It corresponds to the **"Build your own using orchestrations and entities"** approach described in the [Durable Task for AI Agents](https://learn.microsoft.com/azure/durable-task/sdks/durable-task-for-ai-agents) comparison table, using the **entity-based agent loop** pattern from [Agentic Application Patterns](https://learn.microsoft.com/azure/durable-task/sdks/durable-agents-patterns#entity-based-agent-loops). + +> **Note:** A [Durable Functions version](../../../durable-functions/dotnet/AgentDirectedWorkflows/) of this sample is also available. This version uses the Durable Task SDK directly (no Azure Functions dependency), which gives you full control over the web host and API surface. + +## What Is an Agent-Directed Workflow? + +In a typical deterministic workflow, *your code* controls the execution path — you define the sequence of steps ahead of time. In an **agent-directed workflow** (also called an agent loop), the **LLM drives the control flow**. You provide tools and instructions, but the agent decides which tools to call, in what order, and when the task is complete. The execution path isn't known until runtime. + +This pattern is well suited for conversational agents with tool-calling capabilities, open-ended research tasks, and any scenario where the number and order of steps can't be predicted. + +## How This Sample Works + +### The Core Idea + +Each chat session is a **durable entity** — a long-lived, stateful actor managed by the Durable Task Scheduler. The entity holds the full conversation history in its state. When you send it a message, it runs the agent loop internally and **streams the LLM's response in real-time** via Redis pub/sub: + +``` +User: "What's the weather in Seattle?" + │ + ▼ +┌──────────────────────────────────────────────────────┐ +│ HTTP POST /chat/{sessionId} │ +│ │ +│ 1. Subscribe to Redis channel │ +│ 2. Signal entity (fire-and-forget) │ +│ 3. Stream SSE events from Redis to client │ +└────────────────────┬─────────────────────────────────┘ + │ (signal) + ▼ +┌──────────────────────────────────────────────────────┐ +│ ChatAgentEntity (durable entity) │ +│ │ +│ State = conversation history ← auto-persisted │ +│ │ +│ 1. Add user message to state │ +│ 2. Stream LLM response │ +│ 3. LLM returns tool calls? │ +│ ├─ YES → execute tools, go to 2 │ +│ └─ NO → publish chunks to Redis as they arrive │ +│ 4. Save full reply to state │ +│ 5. Publish "done" event to Redis │ +└──────────────────────────────────────────────────────┘ +``` + +**Key properties:** +- **Durable state**: The entity's conversation history survives restarts, crashes, and scaling events. +- **Real-time streaming**: LLM response tokens are published to Redis and forwarded to the client as SSE events as they're generated. +- **No orchestration bridge**: The HTTP layer signals the entity (fire-and-forget) and reads the response from Redis — no intermediate orchestration needed. + +### Why Entities? + +Durable entities are a natural fit for AI agents because: + +- **Long-lived state**: An entity persists for as long as you need it. A chat session can last minutes, days, or weeks. +- **Automatic persistence**: You just read and write `State` — the framework handles serialization and storage. +- **Actor model**: Each entity processes one operation at a time, so there are no concurrency issues with the conversation history. +- **Addressable**: Each entity has a unique ID (the session ID), making it easy to route messages to the right agent. + +### Streaming via Redis + +Instead of request-response (which blocks until the full LLM reply is generated), this sample uses **Redis pub/sub** to stream response chunks in real-time: + +1. The HTTP handler subscribes to a Redis channel **before** signaling the entity +2. The entity runs the agent loop and publishes each token to Redis as the LLM generates it +3. The HTTP handler forwards each chunk to the client as a Server-Sent Event (SSE) +4. When the entity finishes, it publishes a `done` event + +Each request uses a unique `correlationId` in the channel name to isolate concurrent conversations. + +### Project Structure + +``` +AgentDirectedWorkflows/ +├── ChatAgentEntity.cs # The durable entity (agent) +├── Program.cs # Web host, SSE endpoints, DI registration, DTS connection +├── Models/ +│ └── Models.cs # Data types: ChatMsg, ChatRequest, ChatAgentState, ChatTool +├── Tools/ +│ └── AgentTools.cs # Tool definitions and execution logic +└── appsettings.json # Default configuration +``` + +### Key Files Explained + +**`ChatAgentEntity.cs`** — The durable entity. Its `Message()` method is the agent loop: it streams from the LLM via `GetStreamingResponseAsync`, publishes text chunks to Redis, checks for tool calls, and repeats until the LLM gives a final text reply. State (conversation history) is persisted automatically. + +**`Program.cs`** — A standard ASP.NET minimal API that: +1. Registers the `IChatClient` (Azure OpenAI if configured, echo fallback otherwise) +2. Registers the Redis `IConnectionMultiplexer` for pub/sub streaming +3. Registers the Durable Task worker with the entity +4. Defines SSE streaming endpoints for interacting with agent sessions + +**`Tools/AgentTools.cs`** — Defines what tools the LLM can call. Currently includes `get_weather` (with a hardcoded response). To add new capabilities, add a definition and an execution case. The LLM discovers available tools automatically. + +## Prerequisites + +1. [.NET 10 SDK](https://dotnet.microsoft.com/download/dotnet/10.0) or later +2. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler emulator and Redis) + +## Running the Sample + +### 1. Start the Durable Task Scheduler emulator and Redis + +```bash +# Durable Task Scheduler emulator +docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + +# Redis +docker run --name redis -d -p 6379:6379 redis:latest +``` + +### 2. (Optional) Configure Azure OpenAI + +Set environment variables to use a real LLM instead of the echo fallback: + +```bash +export AZURE_OPENAI_ENDPOINT="https://.openai.azure.com" +export AZURE_OPENAI_DEPLOYMENT="gpt-4o" +``` + +The sample uses `DefaultAzureCredential` for authentication — make sure you're signed in via `az login`. + +### 3. Run the app + +```bash +dotnet run +``` + +The app connects to the local emulator at `http://localhost:8080` by default. + +### 4. Chat with the agent + +```bash +# Streaming (default) — response streams as Server-Sent Events +curl -N -X POST http://localhost:5000/chat/session1 \ + -H "Content-Type: application/json" \ + -d '{"message":"What is the weather in Seattle?"}' + +# SSE output: +# data: {"type":"chunk","content":"It's"} +# data: {"type":"chunk","content":" 72°F"} +# data: {"type":"chunk","content":" and sunny"} +# data: {"type":"chunk","content":" in Seattle."} +# data: {"type":"done"} + +# Non-streaming — waits for the full response and returns JSON +curl -X POST "http://localhost:5000/chat/session1?stream=false" \ + -H "Content-Type: application/json" \ + -d '{"message":"What is the weather in Seattle?"}' + +# JSON output: +# {"sessionId":"session1","message":"It's 72°F and sunny in Seattle."} + +# View the full conversation history +curl http://localhost:5000/chat/session1/history + +# Reset the conversation +curl -X POST http://localhost:5000/chat/session1/reset +``` + +> **Tip:** Use `curl -N` (no buffering) to see SSE events in real-time when streaming. + +## API Reference + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/chat/{sessionId}` | Send a message and stream the reply as SSE (default). Add `?stream=false` for a JSON response. Body: `{"message": "..."}` | +| GET | `/chat/{sessionId}/history` | Get the full conversation history for a session | +| POST | `/chat/{sessionId}/reset` | Clear a session's conversation history | + +### SSE Event Format + +Each SSE event is a JSON object with a `type` field: + +| Type | Description | Example | +|------|-------------|---------| +| `chunk` | A token from the LLM response | `{"type":"chunk","content":"Hello"}` | +| `done` | The response is complete | `{"type":"done"}` | +| `error` | An error occurred | `{"type":"error","content":"..."}` | + +## Adding New Tools + +To give the agent new capabilities, edit `Tools/AgentTools.cs`: + +1. Add a tool definition to the `Definitions` array +2. Add a case to the `Execute()` switch + +```csharp +// In Definitions: +new("search_web", "Search the web for information"), + +// In Execute(): +"search_web" => SearchTheWeb(args), +``` + +The LLM will automatically discover the new tool and call it when appropriate. + +## Learn More + +- [Durable Task for AI Agents](https://learn.microsoft.com/azure/durable-task/sdks/durable-task-for-ai-agents) — Overview of how durable execution solves production challenges for AI agents +- [Agentic Application Patterns](https://learn.microsoft.com/azure/durable-task/sdks/durable-agents-patterns) — All supported patterns (deterministic workflows, agent loops, orchestration-based vs entity-based) +- [Durable Entities](https://learn.microsoft.com/azure/durable-task/concepts/entities) — Deep dive on the entity programming model +- [Microsoft.Extensions.AI](https://learn.microsoft.com/dotnet/ai/microsoft-extensions-ai) — The AI abstraction layer used for LLM integration diff --git a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Tools/AgentTools.cs b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Tools/AgentTools.cs new file mode 100644 index 0000000..b60718d --- /dev/null +++ b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Tools/AgentTools.cs @@ -0,0 +1,31 @@ +using Microsoft.Extensions.AI; + +namespace AgentDirectedWorkflows; + +/// +/// Tools that the agent can use. Add new tools here to give the agent new capabilities. +/// Each tool needs: a definition (name + description for the LLM) and an implementation. +/// +public static class AgentTools +{ + // Tool definitions — these are sent to the LLM so it knows what it can call. + public static readonly ChatTool[] Definitions = [ + new("get_weather", "Get current weather for a location"), + ]; + + /// Builds the AITool list that the LLM understands. + public static List AsAITools() => + Definitions.Select(t => + AIFunctionFactory.Create((string location) => "", t.Name, t.Description) as AITool).ToList(); + + /// Executes a tool by name and returns the result string. + public static string Execute(string name, IDictionary? args) + { + var location = args?.Values.FirstOrDefault()?.ToString() ?? "unknown"; + return name switch + { + "get_weather" => $"72°F and sunny in {location}", + _ => $"Unknown tool: {name}", + }; + } +} From 8ea0b8b594d5a0fb85fe825fcaed1d0fd2f5e7d9 Mon Sep 17 00:00:00 2001 From: greenie-msft Date: Mon, 20 Apr 2026 11:55:21 -0700 Subject: [PATCH 2/4] Add local.settings.json to .gitignore Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .gitignore | 3 +++ 1 file changed, 3 insertions(+) diff --git a/.gitignore b/.gitignore index 2ae3bab..f0e2457 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,6 @@ +# Azure Functions local settings (may contain endpoints/secrets) +local.settings.json + ## Ignore Visual Studio temporary files, build results, and ## files generated by popular Visual Studio add-ons. ## From 42742bbfbab3dc922ce76cb1c7942352ccd356b2 Mon Sep 17 00:00:00 2001 From: greenie-msft Date: Mon, 20 Apr 2026 11:58:13 -0700 Subject: [PATCH 3/4] Replace hardcoded model names with placeholders Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../durable-functions/dotnet/AgentDirectedWorkflows/README.md | 2 +- .../AgentDirectedWorkflows/Properties/launchSettings.json | 4 ++-- .../dotnet/Agents/AgentDirectedWorkflows/README.md | 2 +- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/samples/durable-functions/dotnet/AgentDirectedWorkflows/README.md b/samples/durable-functions/dotnet/AgentDirectedWorkflows/README.md index e1b1b91..3d4e30b 100644 --- a/samples/durable-functions/dotnet/AgentDirectedWorkflows/README.md +++ b/samples/durable-functions/dotnet/AgentDirectedWorkflows/README.md @@ -119,7 +119,7 @@ Update `local.settings.json` to use a real LLM instead of the echo fallback: { "Values": { "AZURE_OPENAI_ENDPOINT": "https://.openai.azure.com", - "AZURE_OPENAI_DEPLOYMENT": "gpt-4o" + "AZURE_OPENAI_DEPLOYMENT": "" } } ``` diff --git a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Properties/launchSettings.json b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Properties/launchSettings.json index 2304e00..c54d579 100644 --- a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Properties/launchSettings.json +++ b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/Properties/launchSettings.json @@ -8,8 +8,8 @@ "applicationUrl": "http://localhost:5000", "environmentVariables": { "ASPNETCORE_ENVIRONMENT": "Development", - "AZURE_OPENAI_ENDPOINT": "https://YOUR-RESOURCE.openai.azure.com/", - "AZURE_OPENAI_DEPLOYMENT": "gpt-4o" + "AZURE_OPENAI_ENDPOINT": "", + "AZURE_OPENAI_DEPLOYMENT": "" } } } diff --git a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/README.md b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/README.md index 2a2cafb..49089ee 100644 --- a/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/README.md +++ b/samples/durable-task-sdks/dotnet/Agents/AgentDirectedWorkflows/README.md @@ -119,7 +119,7 @@ Set environment variables to use a real LLM instead of the echo fallback: ```bash export AZURE_OPENAI_ENDPOINT="https://.openai.azure.com" -export AZURE_OPENAI_DEPLOYMENT="gpt-4o" +export AZURE_OPENAI_DEPLOYMENT="" ``` The sample uses `DefaultAzureCredential` for authentication — make sure you're signed in via `az login`. From ba752d9088c526cf3b4753a7c373af3eeca13c2d Mon Sep 17 00:00:00 2001 From: greenie-msft Date: Mon, 20 Apr 2026 14:22:38 -0700 Subject: [PATCH 4/4] Add Python agent-directed workflow samples for DT SDK and Durable Functions - Durable Task SDK sample: FastAPI app with durable entity-based chat agent, SSE streaming via Redis pub/sub, echo fallback when no Azure OpenAI configured - Durable Functions sample: Azure Functions v2 (Python) with DFApp, entity-based chat agent, async Redis pub/sub collection via asyncio.to_thread() - Both samples tested end-to-end with DTS emulator and Redis Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- .../python/agent-directed-workflows/README.md | 126 ++++++++ .../agent-directed-workflows/function_app.py | 295 ++++++++++++++++++ .../python/agent-directed-workflows/host.json | 24 ++ .../agent-directed-workflows/requirements.txt | 5 + .../python/agent-directed-workflows/tools.py | 28 ++ .../python/agent-directed-workflows/README.md | 195 ++++++++++++ .../python/agent-directed-workflows/app.py | 198 ++++++++++++ .../chat_agent_entity.py | 182 +++++++++++ .../agent-directed-workflows/requirements.txt | 7 + .../python/agent-directed-workflows/tools.py | 28 ++ 10 files changed, 1088 insertions(+) create mode 100644 samples/durable-functions/python/agent-directed-workflows/README.md create mode 100644 samples/durable-functions/python/agent-directed-workflows/function_app.py create mode 100644 samples/durable-functions/python/agent-directed-workflows/host.json create mode 100644 samples/durable-functions/python/agent-directed-workflows/requirements.txt create mode 100644 samples/durable-functions/python/agent-directed-workflows/tools.py create mode 100644 samples/durable-task-sdks/python/agent-directed-workflows/README.md create mode 100644 samples/durable-task-sdks/python/agent-directed-workflows/app.py create mode 100644 samples/durable-task-sdks/python/agent-directed-workflows/chat_agent_entity.py create mode 100644 samples/durable-task-sdks/python/agent-directed-workflows/requirements.txt create mode 100644 samples/durable-task-sdks/python/agent-directed-workflows/tools.py diff --git a/samples/durable-functions/python/agent-directed-workflows/README.md b/samples/durable-functions/python/agent-directed-workflows/README.md new file mode 100644 index 0000000..ec02a48 --- /dev/null +++ b/samples/durable-functions/python/agent-directed-workflows/README.md @@ -0,0 +1,126 @@ +# Agent-Directed Workflows — Durable Functions (Python) + +This sample demonstrates how to build an **agent-directed workflow** (agent loop) using [durable entities](https://learn.microsoft.com/azure/durable-functions/durable-functions-entities) in [Azure Durable Functions for Python](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-overview?tabs=isolated-process%2Cnodejs-v3&pivots=python) backed by the [Durable Task Scheduler](https://learn.microsoft.com/azure/durable-task/overview). + +It corresponds to the **"Build your own using orchestrations and entities"** approach described in the [Durable Task for AI Agents](https://learn.microsoft.com/azure/durable-task/sdks/durable-task-for-ai-agents) comparison table. + +> **Note:** A [Durable Task SDK version](../../../durable-task-sdks/python/agent-directed-workflows/) of this sample is also available. That version uses FastAPI with SSE streaming and gives you full control over the web host. This version uses the standard Azure Functions HTTP model (non-streaming responses). + +## What Is an Agent-Directed Workflow? + +In a typical deterministic workflow, *your code* controls the execution path. In an **agent-directed workflow**, the **LLM drives the control flow** — it decides which tools to call, in what order, and when the task is complete. The execution path isn't known until runtime. + +## How This Sample Works + +Each chat session is a **durable entity** that holds the full conversation history. When you send a message, the entity runs the agent loop (call LLM → execute tools → repeat) and streams response chunks to Redis pub/sub. The HTTP function subscribes to the Redis channel and collects the full response before returning it as JSON. + +### Project Structure + +``` +agent-directed-workflows/ +├── function_app.py # Entity, HTTP functions, and agent loop logic +├── tools.py # Tool definitions and execution logic +├── host.json # Functions host configuration (Durable Task Scheduler) +├── requirements.txt # Python dependencies +└── README.md +``` + +## Prerequisites + +1. [Python 3.10+](https://www.python.org/downloads/) +2. [Azure Functions Core Tools v4+](https://learn.microsoft.com/azure/azure-functions/functions-run-local) +3. [Docker](https://www.docker.com/products/docker-desktop/) (for the Durable Task Scheduler emulator and Redis) + +## Running the Sample + +### 1. Start the Durable Task Scheduler emulator and Redis + +```bash +# Durable Task Scheduler emulator +docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + +# Redis +docker run --name redis -d -p 6379:6379 redis:latest +``` + +### 2. Install dependencies + +```bash +pip install -r requirements.txt +``` + +### 3. Configure local settings + +Create a `local.settings.json` file: + +```json +{ + "IsEncrypted": false, + "Values": { + "AzureWebJobsStorage": "UseDevelopmentStorage=true", + "FUNCTIONS_WORKER_RUNTIME": "python", + "DTS_CONNECTION_STRING": "Endpoint=http://localhost:8080;Authentication=None", + "TASKHUB_NAME": "default", + "REDIS_CONNECTION_STRING": "localhost:6379" + } +} +``` + +### 4. (Optional) Configure Azure OpenAI + +Add these to your `local.settings.json` `Values` to use a real LLM instead of the echo fallback: + +```json +{ + "AZURE_OPENAI_ENDPOINT": "https://.openai.azure.com", + "AZURE_OPENAI_DEPLOYMENT": "" +} +``` + +The sample uses `DefaultAzureCredential` — make sure you're signed in via `az login`. + +### 5. Run the function app + +```bash +func start +``` + +### 6. Chat with the agent + +```bash +# Send a message (returns JSON with full response) +curl -X POST http://localhost:7071/api/chat/session1 \ + -H "Content-Type: application/json" \ + -d '{"message":"What is the weather in Seattle?"}' + +# View conversation history +curl http://localhost:7071/api/chat/session1/history + +# Reset the conversation +curl -X POST http://localhost:7071/api/chat/session1/reset +``` + +## API Reference + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/api/chat/{sessionId}` | Send a message and get the full response as JSON. Body: `{"message": "..."}` | +| GET | `/api/chat/{sessionId}/history` | Get the full conversation history for a session | +| POST | `/api/chat/{sessionId}/reset` | Clear a session's conversation history | + +## Key Differences from the Durable Task SDK Version + +| Aspect | Durable Task SDK | Durable Functions | +|--------|-----------------|-------------------| +| **Web host** | FastAPI (full control) | Azure Functions HTTP triggers | +| **Streaming** | SSE via `StreamingResponse` | Non-streaming (JSON response) | +| **Entity registration** | `worker.add_entity()` | `@bp.entity_trigger` decorator | +| **Client** | `DurableTaskSchedulerClient` | `DurableOrchestrationClient` | +| **Configuration** | Environment variables | `host.json` + `local.settings.json` | + +## Learn More + +- [Durable Task for AI Agents](https://learn.microsoft.com/azure/durable-task/sdks/durable-task-for-ai-agents) +- [Agentic Application Patterns](https://learn.microsoft.com/azure/durable-task/sdks/durable-agents-patterns) +- [Durable Entities in Azure Functions](https://learn.microsoft.com/azure/azure-functions/durable/durable-functions-entities) +- [Durable Task Scheduler](https://learn.microsoft.com/azure/durable-task/overview) diff --git a/samples/durable-functions/python/agent-directed-workflows/function_app.py b/samples/durable-functions/python/agent-directed-workflows/function_app.py new file mode 100644 index 0000000..c10fe57 --- /dev/null +++ b/samples/durable-functions/python/agent-directed-workflows/function_app.py @@ -0,0 +1,295 @@ +# ============================================================================ +# Agent-Directed Workflows — Durable Functions (Python) +# +# Each chat session is a durable entity that holds the full conversation +# history. When you send a message, the entity calls the LLM, executes any +# tool calls, and publishes response chunks to Redis pub/sub in real-time. +# The HTTP layer subscribes to the Redis channel and forwards chunks. +# ============================================================================ + +import asyncio +import json +import logging +import os +import uuid + +import azure.functions as func +import azure.durable_functions as df +import redis + +import tools + +app = df.DFApp(http_auth_level=func.AuthLevel.ANONYMOUS) + +logger = logging.getLogger("AgentDirectedWorkflows") + + +# ─── Helpers ─── + +def _get_chat_client(): + """Create an OpenAI client, or None if not configured.""" + endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT", "") + deployment = os.environ.get("AZURE_OPENAI_DEPLOYMENT", "") + if endpoint and deployment: + from openai import AzureOpenAI + from azure.identity import DefaultAzureCredential, get_bearer_token_provider + token_provider = get_bearer_token_provider( + DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default" + ) + return AzureOpenAI( + azure_endpoint=endpoint, + azure_ad_token_provider=token_provider, + api_version="2024-10-21", + ), deployment + return None, None + + +def _get_redis(): + """Create a Redis connection.""" + conn_str = os.environ.get("REDIS_CONNECTION_STRING", "localhost:6379") + return redis.Redis.from_url(f"redis://{conn_str}") + + +def _echo_response(user_message: str): + """Simple echo fallback — yields words one at a time.""" + words = f"Echo: {user_message}".split(" ") + for word in words: + yield word + " " + + +def _run_agent_loop(state: dict, user_message: str, channel: str, r): + """The core agent loop: call LLM, handle tool calls, publish chunks to Redis.""" + messages = state.get("messages", []) + messages.append({"role": "user", "content": user_message}) + + client, deployment = _get_chat_client() + + llm_messages = [{"role": "system", "content": "You are a helpful assistant."}] + llm_messages.extend(messages) + + if client is None: + full_text = "" + for chunk in _echo_response(user_message): + full_text += chunk + r.publish(channel, json.dumps({"type": "chunk", "content": chunk})) + messages.append({"role": "assistant", "content": full_text.strip()}) + r.publish(channel, json.dumps({"type": "done"})) + return messages + + while True: + response = client.chat.completions.create( + model=deployment, + messages=llm_messages, + tools=tools.TOOL_DEFINITIONS if tools.TOOL_DEFINITIONS else None, + stream=True, + ) + + full_text = "" + tool_calls_acc = {} + + for chunk in response: + if not chunk.choices: + continue + delta = chunk.choices[0].delta + + if delta.tool_calls: + for tc in delta.tool_calls: + idx = tc.index + if idx not in tool_calls_acc: + tool_calls_acc[idx] = {"id": "", "name": "", "arguments": ""} + if tc.id: + tool_calls_acc[idx]["id"] = tc.id + if tc.function and tc.function.name: + tool_calls_acc[idx]["name"] = tc.function.name + if tc.function and tc.function.arguments: + tool_calls_acc[idx]["arguments"] += tc.function.arguments + + if delta.content: + full_text += delta.content + r.publish(channel, json.dumps({"type": "chunk", "content": delta.content})) + + if tool_calls_acc: + tool_names = [tc["name"] for tc in tool_calls_acc.values()] + logger.info("Executing tools: %s", ", ".join(tool_names)) + + llm_messages.append({ + "role": "assistant", + "tool_calls": [ + { + "id": tc["id"], + "type": "function", + "function": {"name": tc["name"], "arguments": tc["arguments"]}, + } + for tc in tool_calls_acc.values() + ], + }) + + for tc in tool_calls_acc.values(): + try: + args = json.loads(tc["arguments"]) if tc["arguments"] else {} + except json.JSONDecodeError: + args = {} + result = tools.execute(tc["name"], args) + llm_messages.append({ + "role": "tool", + "tool_call_id": tc["id"], + "content": result, + }) + continue + + messages.append({"role": "assistant", "content": full_text}) + r.publish(channel, json.dumps({"type": "done"})) + return messages + + +# ─── Entity ─── + +@app.entity_trigger(context_name="context") +def chat_agent_entity(context): + """Durable entity for chat agent sessions. + + Operations: + - message: Run the agent loop with a user message + - get_history: Return the conversation history + - reset: Clear the conversation + """ + state = context.get_state(lambda: {"messages": []}) + operation = context.operation_name + + if operation == "message": + request = context.get_input() + user_message = request.get("message", "Hello") + correlation_id = request.get("correlation_id", "unknown") + session_id = context.entity_key + + channel = f"chat:{session_id}:{correlation_id}" + r = _get_redis() + + try: + messages = _run_agent_loop(state, user_message, channel, r) + state["messages"] = messages + context.set_state(state) + except Exception as ex: + logger.error("Agent loop failed: %s", ex) + r.publish(channel, json.dumps({"type": "error", "content": str(ex)})) + + elif operation == "get_history": + context.set_result(state.get("messages", [])) + + elif operation == "reset": + context.set_state({"messages": []}) + + +# ─── HTTP Endpoints ─── + +@app.route(route="chat/{sessionId}", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def send_message(req: func.HttpRequest, client) -> func.HttpResponse: + """Send a message to the agent. Returns JSON with the full response.""" + session_id = req.route_params.get("sessionId", "default") + durable_client = client + + try: + body = req.get_json() + user_message = body.get("message", "Hello") + except ValueError: + user_message = "Hello" + + correlation_id = uuid.uuid4().hex + channel = f"chat:{session_id}:{correlation_id}" + + # Subscribe to Redis BEFORE signaling the entity + r = _get_redis() + pubsub = r.pubsub() + pubsub.subscribe(channel) + + # Signal the entity + entity_id = df.EntityId("chat_agent_entity", session_id) + await durable_client.signal_entity( + entity_id, + "message", + {"message": user_message, "correlation_id": correlation_id}, + ) + + # Collect response chunks (non-streaming for Functions HTTP model) + # NOTE: Redis pubsub uses blocking I/O, so we run it in a thread to + # avoid blocking the event loop (which would prevent entity dispatch). + def _collect_response(): + result = {"response": "", "error": None} + try: + while True: + msg = pubsub.get_message(ignore_subscribe_messages=True, timeout=120) + if msg and msg["type"] == "message": + data = msg["data"].decode("utf-8") if isinstance(msg["data"], bytes) else msg["data"] + try: + parsed = json.loads(data) + if parsed.get("type") == "chunk": + result["response"] += parsed.get("content", "") + if parsed.get("type") == "error": + result["error"] = parsed.get("content", "") + break + if parsed.get("type") == "done": + break + except json.JSONDecodeError: + pass + finally: + pubsub.unsubscribe(channel) + pubsub.close() + r.close() + return result + + collected = await asyncio.to_thread(_collect_response) + + if collected["error"]: + return func.HttpResponse( + json.dumps({"sessionId": session_id, "error": collected["error"]}), + mimetype="application/json", + status_code=500, + ) + + return func.HttpResponse( + json.dumps({"sessionId": session_id, "message": collected["response"]}), + mimetype="application/json", + ) + + +@app.route(route="chat/{sessionId}/history", methods=["GET"]) +@app.durable_client_input(client_name="client") +async def get_history(req: func.HttpRequest, client) -> func.HttpResponse: + """Get the full conversation history for a session.""" + session_id = req.route_params.get("sessionId", "default") + durable_client = client + + entity_id = df.EntityId("chat_agent_entity", session_id) + state = await durable_client.read_entity_state(entity_id) + + if not state.entity_exists: + return func.HttpResponse( + json.dumps({"error": "Session not found"}), + mimetype="application/json", + status_code=404, + ) + + entity_state = state.entity_state or {"messages": []} + if isinstance(entity_state, str): + entity_state = json.loads(entity_state) + return func.HttpResponse( + json.dumps({"sessionId": session_id, "history": entity_state.get("messages", [])}), + mimetype="application/json", + ) + + +@app.route(route="chat/{sessionId}/reset", methods=["POST"]) +@app.durable_client_input(client_name="client") +async def reset_session(req: func.HttpRequest, client) -> func.HttpResponse: + """Clear a session's conversation history.""" + session_id = req.route_params.get("sessionId", "default") + durable_client = client + + entity_id = df.EntityId("chat_agent_entity", session_id) + await durable_client.signal_entity(entity_id, "reset") + + return func.HttpResponse( + json.dumps({"sessionId": session_id, "status": "reset"}), + mimetype="application/json", + ) diff --git a/samples/durable-functions/python/agent-directed-workflows/host.json b/samples/durable-functions/python/agent-directed-workflows/host.json new file mode 100644 index 0000000..a1239c0 --- /dev/null +++ b/samples/durable-functions/python/agent-directed-workflows/host.json @@ -0,0 +1,24 @@ +{ + "version": "2.0", + "logging": { + "applicationInsights": { + "samplingSettings": { + "isEnabled": true, + "excludedTypes": "Request" + } + } + }, + "extensions": { + "durableTask": { + "storageProvider": { + "type": "azureManaged", + "connectionStringName": "DURABLE_TASK_SCHEDULER_CONNECTION_STRING" + }, + "hubName": "%TASKHUB_NAME%" + } + }, + "extensionBundle": { + "id": "Microsoft.Azure.Functions.ExtensionBundle.Preview", + "version": "[4.29.0, 5.0.0)" + } +} diff --git a/samples/durable-functions/python/agent-directed-workflows/requirements.txt b/samples/durable-functions/python/agent-directed-workflows/requirements.txt new file mode 100644 index 0000000..93eed88 --- /dev/null +++ b/samples/durable-functions/python/agent-directed-workflows/requirements.txt @@ -0,0 +1,5 @@ +azure-functions +azure-functions-durable +redis +openai>=1.0.0 +azure-identity>=1.15.0 diff --git a/samples/durable-functions/python/agent-directed-workflows/tools.py b/samples/durable-functions/python/agent-directed-workflows/tools.py new file mode 100644 index 0000000..bee4131 --- /dev/null +++ b/samples/durable-functions/python/agent-directed-workflows/tools.py @@ -0,0 +1,28 @@ +"""Tools that the agent can use. Add new tools here to give the agent new capabilities.""" + + +# Tool definitions — sent to the LLM so it knows what it can call. +TOOL_DEFINITIONS = [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get current weather for a location", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "City or location name"} + }, + "required": ["location"], + }, + }, + } +] + + +def execute(name: str, args: dict) -> str: + """Execute a tool by name and return the result string.""" + location = args.get("location", "unknown") + if name == "get_weather": + return f"72°F and sunny in {location}" + return f"Unknown tool: {name}" diff --git a/samples/durable-task-sdks/python/agent-directed-workflows/README.md b/samples/durable-task-sdks/python/agent-directed-workflows/README.md new file mode 100644 index 0000000..bf0e4f4 --- /dev/null +++ b/samples/durable-task-sdks/python/agent-directed-workflows/README.md @@ -0,0 +1,195 @@ +# Agent-Directed Workflows — Durable Task SDK (Python) + +This sample demonstrates how to build an **agent-directed workflow** (agent loop) using [durable entities](https://learn.microsoft.com/azure/durable-task/concepts/entities) from the [Durable Task SDK for Python](https://github.com/microsoft/durabletask-python) with the [Durable Task Scheduler](https://learn.microsoft.com/azure/durable-task/overview). + +It corresponds to the **"Build your own using orchestrations and entities"** approach described in the [Durable Task for AI Agents](https://learn.microsoft.com/azure/durable-task/sdks/durable-task-for-ai-agents) comparison table, using the **entity-based agent loop** pattern from [Agentic Application Patterns](https://learn.microsoft.com/azure/durable-task/sdks/durable-agents-patterns#entity-based-agent-loops). + +> **Note:** A [Durable Functions version](../../../durable-functions/python/agent-directed-workflows/) of this sample is also available. This version uses the Durable Task SDK directly (no Azure Functions dependency), which gives you full control over the web host and API surface. + +## What Is an Agent-Directed Workflow? + +In a typical deterministic workflow, *your code* controls the execution path — you define the sequence of steps ahead of time. In an **agent-directed workflow** (also called an agent loop), the **LLM drives the control flow**. You provide tools and instructions, but the agent decides which tools to call, in what order, and when the task is complete. The execution path isn't known until runtime. + +This pattern is well suited for conversational agents with tool-calling capabilities, open-ended research tasks, and any scenario where the number and order of steps can't be predicted. + +## How This Sample Works + +### The Core Idea + +Each chat session is a **durable entity** — a long-lived, stateful actor managed by the Durable Task Scheduler. The entity holds the full conversation history in its state. When you send it a message, it runs the agent loop internally and **streams the LLM's response in real-time** via Redis pub/sub: + +``` +User: "What's the weather in Seattle?" + │ + ▼ +┌──────────────────────────────────────────────────────┐ +│ HTTP POST /chat/{sessionId} │ +│ │ +│ 1. Subscribe to Redis channel │ +│ 2. Signal entity (fire-and-forget) │ +│ 3. Stream SSE events from Redis to client │ +└────────────────────┬─────────────────────────────────┘ + │ (signal) + ▼ +┌──────────────────────────────────────────────────────┐ +│ chat_agent_entity (durable entity) │ +│ │ +│ State = conversation history ← auto-persisted │ +│ │ +│ 1. Add user message to state │ +│ 2. Stream LLM response │ +│ 3. LLM returns tool calls? │ +│ ├─ YES → execute tools, go to 2 │ +│ └─ NO → publish chunks to Redis as they arrive │ +│ 4. Save full reply to state │ +│ 5. Publish "done" event to Redis │ +└──────────────────────────────────────────────────────┘ +``` + +**Key properties:** +- **Durable state**: The entity's conversation history survives restarts, crashes, and scaling events. +- **Real-time streaming**: LLM response tokens are published to Redis and forwarded to the client as SSE events as they're generated. +- **No orchestration bridge**: The HTTP layer signals the entity (fire-and-forget) and reads the response from Redis — no intermediate orchestration needed. + +### Why Entities? + +Durable entities are a natural fit for AI agents because: + +- **Long-lived state**: An entity persists for as long as you need it. A chat session can last minutes, days, or weeks. +- **Automatic persistence**: You just read and write state — the framework handles serialization and storage. +- **Actor model**: Each entity processes one operation at a time, so there are no concurrency issues with the conversation history. +- **Addressable**: Each entity has a unique ID (the session ID), making it easy to route messages to the right agent. + +### Project Structure + +``` +agent-directed-workflows/ +├── app.py # FastAPI web host, SSE endpoints, DTS worker lifecycle +├── chat_agent_entity.py # The durable entity (agent loop + Redis streaming) +├── tools.py # Tool definitions and execution logic +├── requirements.txt # Python dependencies +└── README.md +``` + +## Prerequisites + +1. [Python 3.10+](https://www.python.org/downloads/) +2. [Docker](https://www.docker.com/products/docker-desktop/) (for running the Durable Task Scheduler emulator and Redis) + +## Running the Sample + +### 1. Start the Durable Task Scheduler emulator and Redis + +```bash +# Durable Task Scheduler emulator +docker run --name dtsemulator -d -p 8080:8080 -p 8082:8082 mcr.microsoft.com/dts/dts-emulator:latest + +# Redis +docker run --name redis -d -p 6379:6379 redis:latest +``` + +### 2. Install dependencies + +```bash +pip install -r requirements.txt +``` + +### 3. (Optional) Configure Azure OpenAI + +Set environment variables to use a real LLM instead of the echo fallback: + +```bash +export AZURE_OPENAI_ENDPOINT="https://.openai.azure.com" +export AZURE_OPENAI_DEPLOYMENT="" +``` + +The sample uses `DefaultAzureCredential` for authentication — make sure you're signed in via `az login`. + +### 4. Run the app + +```bash +python app.py +``` + +The app connects to the local emulator at `http://localhost:8080` by default. + +### 5. Chat with the agent + +```bash +# Streaming (default) — response streams as Server-Sent Events +curl -N -X POST http://localhost:5000/chat/session1 \ + -H "Content-Type: application/json" \ + -d '{"message":"What is the weather in Seattle?"}' + +# SSE output: +# data: {"type":"chunk","content":"Echo: "} +# data: {"type":"chunk","content":"What "} +# data: {"type":"chunk","content":"is "} +# ... +# data: {"type":"done"} + +# Non-streaming — waits for the full response and returns JSON +curl -X POST "http://localhost:5000/chat/session1?stream=false" \ + -H "Content-Type: application/json" \ + -d '{"message":"What is the weather in Seattle?"}' + +# JSON output: +# {"sessionId":"session1","message":"Echo: What is the weather in Seattle? "} + +# View the full conversation history +curl http://localhost:5000/chat/session1/history + +# Reset the conversation +curl -X POST http://localhost:5000/chat/session1/reset +``` + +> **Tip:** Use `curl -N` (no buffering) to see SSE events in real-time when streaming. + +## API Reference + +| Method | Endpoint | Description | +|--------|----------|-------------| +| POST | `/chat/{sessionId}` | Send a message and stream the reply as SSE (default). Add `?stream=false` for a JSON response. Body: `{"message": "..."}` | +| GET | `/chat/{sessionId}/history` | Get the full conversation history for a session | +| POST | `/chat/{sessionId}/reset` | Clear a session's conversation history | + +### SSE Event Format + +Each SSE event is a JSON object with a `type` field: + +| Type | Description | Example | +|------|-------------|---------| +| `chunk` | A token from the LLM response | `{"type":"chunk","content":"Hello"}` | +| `done` | The response is complete | `{"type":"done"}` | +| `error` | An error occurred | `{"type":"error","content":"..."}` | + +## Adding New Tools + +To give the agent new capabilities, edit `tools.py`: + +1. Add a tool definition to the `TOOL_DEFINITIONS` list +2. Add a case to the `execute()` function + +```python +# In TOOL_DEFINITIONS: +{ + "type": "function", + "function": { + "name": "search_web", + "description": "Search the web for information", + "parameters": { ... } + } +} + +# In execute(): +if name == "search_web": + return search_the_web(args) +``` + +The LLM will automatically discover the new tool and call it when appropriate. + +## Learn More + +- [Durable Task for AI Agents](https://learn.microsoft.com/azure/durable-task/sdks/durable-task-for-ai-agents) — Overview of how durable execution solves production challenges for AI agents +- [Agentic Application Patterns](https://learn.microsoft.com/azure/durable-task/sdks/durable-agents-patterns) — All supported patterns (deterministic workflows, agent loops, orchestration-based vs entity-based) +- [Durable Entities](https://learn.microsoft.com/azure/durable-task/concepts/entities) — Deep dive on the entity programming model diff --git a/samples/durable-task-sdks/python/agent-directed-workflows/app.py b/samples/durable-task-sdks/python/agent-directed-workflows/app.py new file mode 100644 index 0000000..93fb878 --- /dev/null +++ b/samples/durable-task-sdks/python/agent-directed-workflows/app.py @@ -0,0 +1,198 @@ +# ============================================================================ +# Agent-Directed Workflows — Durable Task SDK (Python) +# +# FastAPI web app with a Durable Task worker running in-process. +# Each chat session is a durable entity — a persistent agent with durable state. +# ============================================================================ + +import asyncio +import json +import logging +import os +import threading +import uuid + +from contextlib import asynccontextmanager + +import redis as redis_lib +import uvicorn +from fastapi import FastAPI, Request +from fastapi.responses import JSONResponse, StreamingResponse +from pydantic import BaseModel + +from durabletask.azuremanaged.client import DurableTaskSchedulerClient +from durabletask.azuremanaged.worker import DurableTaskSchedulerWorker +from durabletask.entities import EntityInstanceId + +from chat_agent_entity import chat_agent_entity + +# ─── Configuration ─── + +CONNECTION_STRING = os.environ.get( + "DURABLE_TASK_SCHEDULER_CONNECTION_STRING", + "Endpoint=http://localhost:8080;TaskHub=default;Authentication=None", +) +REDIS_CONNECTION = os.environ.get("REDIS_CONNECTION_STRING", "localhost:6379") + +# Parse connection string +def _parse_connection_string(conn_str: str) -> dict: + parts = {} + for part in conn_str.split(";"): + if "=" in part: + key, _, value = part.partition("=") + # Handle values that contain '=' (like URLs with query params) + parts[key.strip()] = value.strip() if key.strip() != "Endpoint" else part[len("Endpoint="):].strip() + return parts + +_config = _parse_connection_string(CONNECTION_STRING) +_endpoint = _config.get("Endpoint", "http://localhost:8080") +_taskhub = _config.get("TaskHub", "default") +_is_local = "localhost" in _endpoint or "127.0.0.1" in _endpoint +_credential = None if _is_local else __import__("azure.identity", fromlist=["DefaultAzureCredential"]).DefaultAzureCredential() + +logging.basicConfig( + level=logging.WARNING, + format="%(asctime)s %(name)s %(levelname)s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%S", +) +logging.getLogger("AgentDirectedWorkflows").setLevel(logging.INFO) +logger = logging.getLogger("AgentDirectedWorkflows") + +# ─── Durable Task Worker ─── + +worker = DurableTaskSchedulerWorker( + host_address=_endpoint, + secure_channel=not _is_local, + taskhub=_taskhub, + token_credential=_credential, +) +worker.add_entity(chat_agent_entity) + +client = DurableTaskSchedulerClient( + host_address=_endpoint, + secure_channel=not _is_local, + taskhub=_taskhub, + token_credential=_credential, +) + +# ─── FastAPI App ─── + +@asynccontextmanager +async def lifespan(app): + """Start the Durable Task worker on startup, stop on shutdown.""" + worker.start() + logger.info("Durable Task worker started") + yield + worker.stop() + + +app = FastAPI(title="Agent-Directed Workflows", lifespan=lifespan) + + +class ChatRequest(BaseModel): + message: str + + +@app.post("/chat/{session_id}") +async def send_message(session_id: str, req: ChatRequest, request: Request): + """Send a message to the agent. Streams SSE by default; add ?stream=false for JSON.""" + stream = request.query_params.get("stream", "true").lower() != "false" + correlation_id = uuid.uuid4().hex + channel = f"chat:{session_id}:{correlation_id}" + + r = redis_lib.Redis.from_url(f"redis://{REDIS_CONNECTION}") + pubsub = r.pubsub() + pubsub.subscribe(channel) + + # Signal the entity (fire-and-forget) — run in thread since it's blocking gRPC + entity_id = EntityInstanceId("chat_agent_entity", session_id) + await asyncio.to_thread( + lambda: client.signal_entity( + entity_id, + "message", + input={"message": req.message, "correlation_id": correlation_id}, + ) + ) + + def _poll_next(): + """Block until we get a Redis message (runs in thread pool).""" + return pubsub.get_message(ignore_subscribe_messages=True, timeout=120) + + if stream: + async def event_stream(): + try: + while True: + msg = await asyncio.to_thread(_poll_next) + if msg and msg["type"] == "message": + data = msg["data"].decode("utf-8") if isinstance(msg["data"], bytes) else msg["data"] + yield f"data: {data}\n\n" + try: + parsed = json.loads(data) + if parsed.get("type") in ("done", "error"): + break + except json.JSONDecodeError: + pass + finally: + pubsub.unsubscribe(channel) + pubsub.close() + r.close() + + return StreamingResponse(event_stream(), media_type="text/event-stream", + headers={"Cache-Control": "no-cache"}) + else: + # Non-streaming: collect all chunks and return JSON + full_response = "" + try: + while True: + msg = await asyncio.to_thread(_poll_next) + if msg and msg["type"] == "message": + data = msg["data"].decode("utf-8") if isinstance(msg["data"], bytes) else msg["data"] + try: + parsed = json.loads(data) + if parsed.get("type") == "chunk": + full_response += parsed.get("content", "") + if parsed.get("type") == "error": + return JSONResponse( + {"sessionId": session_id, "error": parsed.get("content", "")}, + status_code=500, + ) + if parsed.get("type") in ("done", "error"): + break + except json.JSONDecodeError: + pass + finally: + pubsub.unsubscribe(channel) + pubsub.close() + r.close() + + return JSONResponse({"sessionId": session_id, "message": full_response}) + + +@app.get("/chat/{session_id}/history") +async def get_history(session_id: str): + """Get the full conversation history for a session.""" + entity_id = EntityInstanceId("chat_agent_entity", session_id) + entity = await asyncio.to_thread(client.get_entity, entity_id) + if entity is None: + return JSONResponse({"error": "Session not found"}, status_code=404) + raw_state = entity.get_state() + if isinstance(raw_state, str): + import json as _json + state = _json.loads(raw_state) + elif isinstance(raw_state, dict): + state = raw_state + else: + state = {"messages": []} + return JSONResponse({"sessionId": session_id, "history": state.get("messages", [])}) + + +@app.post("/chat/{session_id}/reset") +async def reset_session(session_id: str): + """Clear a session's conversation history.""" + entity_id = EntityInstanceId("chat_agent_entity", session_id) + await asyncio.to_thread(client.signal_entity, entity_id, "reset") + return JSONResponse({"sessionId": session_id, "status": "reset"}) + + +if __name__ == "__main__": + uvicorn.run(app, host="0.0.0.0", port=5000, log_level="warning") diff --git a/samples/durable-task-sdks/python/agent-directed-workflows/chat_agent_entity.py b/samples/durable-task-sdks/python/agent-directed-workflows/chat_agent_entity.py new file mode 100644 index 0000000..2202f94 --- /dev/null +++ b/samples/durable-task-sdks/python/agent-directed-workflows/chat_agent_entity.py @@ -0,0 +1,182 @@ +# ============================================================================ +# Durable Agent Chat — powered by Durable Entities + Redis Streaming +# +# Each chat session is a durable entity that holds the full conversation +# history. When you send a message, the entity calls the LLM, executes any +# tool calls, and publishes response chunks to Redis pub/sub in real-time. +# The HTTP layer subscribes to the Redis channel and forwards chunks as +# Server-Sent Events (SSE) to the client. +# ============================================================================ + +import json +import logging +import os + +import redis + +from durabletask.entities import EntityContext + +import tools + +logger = logging.getLogger("AgentDirectedWorkflows") + + +def _get_chat_client(): + """Create an OpenAI client, or None if not configured.""" + endpoint = os.environ.get("AZURE_OPENAI_ENDPOINT", "") + deployment = os.environ.get("AZURE_OPENAI_DEPLOYMENT", "") + if endpoint and deployment: + from openai import AzureOpenAI + from azure.identity import DefaultAzureCredential, get_bearer_token_provider + + token_provider = get_bearer_token_provider( + DefaultAzureCredential(), "https://cognitiveservices.azure.com/.default" + ) + return AzureOpenAI( + azure_endpoint=endpoint, + azure_ad_token_provider=token_provider, + api_version="2024-10-21", + ), deployment + return None, None + + +def _get_redis(): + """Create a Redis connection.""" + conn_str = os.environ.get("REDIS_CONNECTION_STRING", "localhost:6379") + return redis.Redis.from_url(f"redis://{conn_str}") + + +def _echo_response(user_message: str): + """Simple echo fallback that simulates streaming word by word.""" + words = f"Echo: {user_message}".split(" ") + for word in words: + yield word + " " + + +def _run_agent_loop(state: dict, user_message: str, channel: str, r: redis.Redis): + """The core agent loop: call LLM, handle tool calls, publish chunks to Redis.""" + messages = state.get("messages", []) + messages.append({"role": "user", "content": user_message}) + + client, deployment = _get_chat_client() + + # Build the messages list for the LLM + llm_messages = [{"role": "system", "content": "You are a helpful assistant."}] + llm_messages.extend(messages) + + if client is None: + # Echo fallback — no Azure OpenAI configured + full_text = "" + for chunk in _echo_response(user_message): + full_text += chunk + r.publish(channel, json.dumps({"type": "chunk", "content": chunk})) + + messages.append({"role": "assistant", "content": full_text.strip()}) + r.publish(channel, json.dumps({"type": "done"})) + return messages + + # Real LLM loop with tool calling + while True: + response = client.chat.completions.create( + model=deployment, + messages=llm_messages, + tools=tools.TOOL_DEFINITIONS if tools.TOOL_DEFINITIONS else None, + stream=True, + ) + + full_text = "" + tool_calls_acc = {} # id -> {name, arguments} + + for chunk in response: + if not chunk.choices: + continue + delta = chunk.choices[0].delta + + # Accumulate tool calls + if delta.tool_calls: + for tc in delta.tool_calls: + idx = tc.index + if idx not in tool_calls_acc: + tool_calls_acc[idx] = {"id": "", "name": "", "arguments": ""} + if tc.id: + tool_calls_acc[idx]["id"] = tc.id + if tc.function and tc.function.name: + tool_calls_acc[idx]["name"] = tc.function.name + if tc.function and tc.function.arguments: + tool_calls_acc[idx]["arguments"] += tc.function.arguments + + # Stream text chunks to Redis + if delta.content: + full_text += delta.content + r.publish(channel, json.dumps({"type": "chunk", "content": delta.content})) + + if tool_calls_acc: + # LLM wants to call tools — execute and loop + tool_names = [tc["name"] for tc in tool_calls_acc.values()] + logger.info("Executing tools: %s", ", ".join(tool_names)) + + # Add assistant message with tool calls + llm_messages.append({ + "role": "assistant", + "tool_calls": [ + { + "id": tc["id"], + "type": "function", + "function": {"name": tc["name"], "arguments": tc["arguments"]}, + } + for tc in tool_calls_acc.values() + ], + }) + + # Execute each tool and add results + for tc in tool_calls_acc.values(): + try: + args = json.loads(tc["arguments"]) if tc["arguments"] else {} + except json.JSONDecodeError: + args = {} + result = tools.execute(tc["name"], args) + llm_messages.append({ + "role": "tool", + "tool_call_id": tc["id"], + "content": result, + }) + continue + + # Final text reply — save to durable state + messages.append({"role": "assistant", "content": full_text}) + r.publish(channel, json.dumps({"type": "done"})) + return messages + + +def chat_agent_entity(ctx: EntityContext, input): + """Function-based durable entity for the chat agent. + + Operations: + - message: Run the agent loop with a user message + - get_history: Return the conversation history + - reset: Clear the conversation + """ + state = ctx.get_state(dict, {"messages": []}) + + if ctx.operation == "message": + request = input if isinstance(input, dict) else {} + user_message = request.get("message", "Hello") + correlation_id = request.get("correlation_id", "unknown") + session_id = ctx.entity_id.key + + channel = f"chat:{session_id}:{correlation_id}" + r = _get_redis() + + try: + messages = _run_agent_loop(state, user_message, channel, r) + state["messages"] = messages + ctx.set_state(state) + except Exception as ex: + logger.error("Agent loop failed: %s", ex) + r.publish(channel, json.dumps({"type": "error", "content": str(ex)})) + + elif ctx.operation == "get_history": + return state.get("messages", []) + + elif ctx.operation == "reset": + ctx.set_state({"messages": []}) diff --git a/samples/durable-task-sdks/python/agent-directed-workflows/requirements.txt b/samples/durable-task-sdks/python/agent-directed-workflows/requirements.txt new file mode 100644 index 0000000..778f7d0 --- /dev/null +++ b/samples/durable-task-sdks/python/agent-directed-workflows/requirements.txt @@ -0,0 +1,7 @@ +durabletask>=1.4.0 +durabletask-azuremanaged>=1.4.0 +fastapi>=0.110.0 +uvicorn>=0.27.0 +redis>=5.0.0 +openai>=1.0.0 +azure-identity>=1.15.0 diff --git a/samples/durable-task-sdks/python/agent-directed-workflows/tools.py b/samples/durable-task-sdks/python/agent-directed-workflows/tools.py new file mode 100644 index 0000000..bee4131 --- /dev/null +++ b/samples/durable-task-sdks/python/agent-directed-workflows/tools.py @@ -0,0 +1,28 @@ +"""Tools that the agent can use. Add new tools here to give the agent new capabilities.""" + + +# Tool definitions — sent to the LLM so it knows what it can call. +TOOL_DEFINITIONS = [ + { + "type": "function", + "function": { + "name": "get_weather", + "description": "Get current weather for a location", + "parameters": { + "type": "object", + "properties": { + "location": {"type": "string", "description": "City or location name"} + }, + "required": ["location"], + }, + }, + } +] + + +def execute(name: str, args: dict) -> str: + """Execute a tool by name and return the result string.""" + location = args.get("location", "unknown") + if name == "get_weather": + return f"72°F and sunny in {location}" + return f"Unknown tool: {name}"