Skip to content

Commit 9167395

Browse files
feat: a2a & extension
1 parent a67f3bf commit 9167395

File tree

28 files changed

+3975
-181
lines changed

28 files changed

+3975
-181
lines changed

a2a/client/README.md

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
# A2A Client (Go)
2+
3+
This package provides a Go client implementation for the Agent-to-Agent (A2A) communication protocol.
4+
5+
## Features
6+
7+
- JSON-RPC 2.0 compliant client
8+
- Supports core A2A methods:
9+
- `tasks/send`: Send a new task
10+
- `tasks/get`: Get task status
11+
- `tasks/cancel`: Cancel a task
12+
- Streaming task updates with Server-Sent Events (SSE)
13+
- Error handling with A2A error codes
14+
- Type-safe request/response handling
15+
16+
## Usage
17+
18+
```go
19+
package main
20+
21+
import (
22+
"log"
23+
"a2a/client"
24+
"a2a/models"
25+
)
26+
27+
func main() {
28+
// Create a new client
29+
a2aClient := client.NewClient("http://localhost:8080")
30+
31+
// Create a task message
32+
message := models.Message{
33+
Role: "user",
34+
Parts: []models.Part{
35+
{
36+
Type: stringPtr("text"),
37+
Text: stringPtr("Hello, A2A agent!"),
38+
},
39+
},
40+
}
41+
42+
// Send a task
43+
response, err := a2aClient.SendTask(models.TaskSendParams{
44+
ID: "task-1",
45+
Message: message,
46+
})
47+
if err != nil {
48+
log.Fatalf("Failed to send task: %v", err)
49+
}
50+
51+
// Get the task from the response
52+
task, ok := response.Result.(*models.Task)
53+
if !ok {
54+
log.Fatalf("Expected result to be a Task")
55+
}
56+
57+
// Use the task...
58+
}
59+
```
60+
61+
## API
62+
63+
### NewClient
64+
65+
```go
66+
func NewClient(baseURL string) *Client
67+
```
68+
69+
Creates a new A2A client instance with the specified base URL.
70+
71+
### Client Methods
72+
73+
#### SendTask
74+
75+
```go
76+
func (c *Client) SendTask(params models.TaskSendParams) (*models.JSONRPCResponse, error)
77+
```
78+
79+
Sends a new task to the agent. Returns a JSON-RPC response containing the task or an error.
80+
81+
#### GetTask
82+
83+
```go
84+
func (c *Client) GetTask(params models.TaskQueryParams) (*models.JSONRPCResponse, error)
85+
```
86+
87+
Gets the status of a task. Returns a JSON-RPC response containing the task or an error.
88+
89+
#### CancelTask
90+
91+
```go
92+
func (c *Client) CancelTask(params models.TaskIDParams) (*models.JSONRPCResponse, error)
93+
```
94+
95+
Cancels a task. Returns a JSON-RPC response containing the task or an error.
96+
97+
## Streaming Support
98+
99+
The client supports streaming task updates using Server-Sent Events (SSE). To use streaming:
100+
101+
1. Set the `Accept` header to `text/event-stream` in your request
102+
2. The server will respond with a stream of task status updates
103+
3. Each update will be a JSON object containing:
104+
- Task ID
105+
- Current status
106+
- Whether it's the final update
107+
108+
Example streaming usage:
109+
```go
110+
// Create a task with streaming
111+
message := models.Message{
112+
Role: "user",
113+
Parts: []models.Part{
114+
{
115+
Type: stringPtr("text"),
116+
Text: stringPtr("Hello, A2A agent!"),
117+
},
118+
},
119+
}
120+
121+
// Send a task with streaming enabled
122+
response, err := a2aClient.SendTaskWithStreaming(models.TaskSendParams{
123+
ID: "task-1",
124+
Message: message,
125+
})
126+
if err != nil {
127+
log.Fatalf("Failed to send task: %v", err)
128+
}
129+
130+
// Process streaming updates
131+
for update := range response.Updates {
132+
if update.Error != nil {
133+
log.Printf("Error: %v", update.Error)
134+
continue
135+
}
136+
137+
// Process the update
138+
log.Printf("Task %s: %s", update.Result.ID, update.Result.Status.State)
139+
}
140+
```
141+
142+
## Testing
143+
144+
Run the tests with:
145+
146+
```bash
147+
go test ./...
148+
```
149+
150+
The test suite includes examples of:
151+
- Sending tasks
152+
- Getting task status
153+
- Canceling tasks
154+
- Streaming task updates
155+
- Error handling

a2a/client/client.go

Lines changed: 92 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
package client
2+
3+
import (
4+
"context"
5+
"errors"
6+
7+
"github.com/cloudwego/eino-ext/a2a/models"
8+
"github.com/cloudwego/eino-ext/a2a/transport"
9+
)
10+
11+
// A2AClient represents an A2A protocol client
12+
type A2AClient struct {
13+
cli transport.ClientTransport
14+
}
15+
16+
type Config struct {
17+
Transport transport.ClientTransport
18+
}
19+
20+
// NewA2AClient creates a new A2A client
21+
func NewA2AClient(ctx context.Context, config *Config) (*A2AClient, error) {
22+
if config == nil {
23+
return nil, errors.New("config is required")
24+
}
25+
26+
return &A2AClient{cli: config.Transport}, nil
27+
}
28+
29+
func (c *A2AClient) AgentCard(ctx context.Context) (*models.AgentCard, error) {
30+
return c.cli.AgentCard(ctx)
31+
}
32+
33+
func (c *A2AClient) SendMessage(ctx context.Context, params *models.MessageSendParams) (*models.SendMessageResponseUnion, error) {
34+
if params != nil {
35+
(&params.Message).EnsureRequiredFields()
36+
}
37+
return c.cli.SendMessage(ctx, params)
38+
}
39+
40+
func (c *A2AClient) SendMessageStreaming(ctx context.Context, params *models.MessageSendParams) (*ServerStreamingWrapper, error) {
41+
if params != nil {
42+
(&params.Message).EnsureRequiredFields()
43+
}
44+
stream, err := c.cli.SendMessageStreaming(ctx, params)
45+
if err != nil {
46+
return nil, err
47+
}
48+
return &ServerStreamingWrapper{stream}, nil
49+
}
50+
51+
func (c *A2AClient) GetTask(ctx context.Context, params *models.TaskQueryParams) (*models.Task, error) {
52+
return c.cli.GetTask(ctx, params)
53+
}
54+
55+
func (c *A2AClient) CancelTask(ctx context.Context, params *models.TaskIDParams) (*models.Task, error) {
56+
return c.cli.CancelTask(ctx, params)
57+
}
58+
59+
func (c *A2AClient) ResubscribeTask(ctx context.Context, params *models.TaskIDParams) (*ServerStreamingWrapper, error) {
60+
stream, err := c.cli.ResubscribeTask(ctx, params)
61+
if err != nil {
62+
return nil, err
63+
}
64+
return &ServerStreamingWrapper{stream}, nil
65+
}
66+
67+
func (c *A2AClient) SetPushNotificationConfig(ctx context.Context, params *models.TaskPushNotificationConfig) (*models.TaskPushNotificationConfig, error) {
68+
return c.cli.SetPushNotificationConfig(ctx, params)
69+
}
70+
71+
func (c *A2AClient) GetPushNotificationConfig(ctx context.Context, params *models.GetTaskPushNotificationConfigParams) (*models.TaskPushNotificationConfig, error) {
72+
return c.cli.GetPushNotificationConfig(ctx, params)
73+
}
74+
75+
// todo: list/delete notification config & agent/authenticatedExtendedCard
76+
77+
type ServerStreamingWrapper struct {
78+
s models.ResponseReader
79+
}
80+
81+
func (s *ServerStreamingWrapper) Recv() (resp *models.SendMessageStreamingResponseUnion, err error) {
82+
defer func() {
83+
if err != nil {
84+
s.s.Close()
85+
}
86+
}()
87+
return s.s.Read()
88+
}
89+
90+
func (s *ServerStreamingWrapper) Close() error {
91+
return s.s.Close()
92+
}

a2a/client/client_test.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
package client

a2a/examples/a2a.go

Lines changed: 0 additions & 21 deletions
This file was deleted.

0 commit comments

Comments
 (0)