Skip to content

Commit f48c51f

Browse files
authored
新增基于grpc的双向通信DEMO (#113)
1 parent a197fe9 commit f48c51f

File tree

6 files changed

+609
-0
lines changed

6 files changed

+609
-0
lines changed

practise/grpc/README.md

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,83 @@
1+
# gRPC Usage
2+
3+
### 目标
4+
基于grpc的双向通信demo
5+
6+
### protoc (Protocol buffer compiler) 安装
7+
8+
Install `protoc` from [protobuf](https://github.com/protocolbuffers/protobuf/releases)
9+
```shell
10+
protoc --version # Ensure compiler version is 3+
11+
```
12+
13+
Install `Go plugins` for the `protoc`
14+
15+
```shell
16+
# Install the protocol compiler plugins for Go using the following commands:
17+
go install google.golang.org/protobuf/cmd/[email protected]
18+
go install google.golang.org/grpc/cmd/[email protected]
19+
20+
# Update your PATH so that the protoc compiler can find the plugins:
21+
export PATH="$PATH:$(go env GOPATH)/bin"
22+
```
23+
24+
### 构建 gRPC 服务
25+
26+
定义 `.proto` 文件,本文以 `pixiu.proto` 为例
27+
28+
```protobuf
29+
syntax="proto3";
30+
31+
option go_package = "go-learning/practise/grpc-practise/tunnel/tunnel";
32+
33+
package tunnel;
34+
35+
service Tunnel {
36+
// Client 调用此方法建立连接
37+
rpc Connect(stream Request) returns (stream Response);
38+
}
39+
40+
message Request {
41+
string type = 1; // "client_call" 或 "server_call"
42+
bytes payload = 2;
43+
}
44+
45+
message Response {
46+
bytes result = 1;
47+
}
48+
```
49+
50+
### 生成 `gRPC` 代码
51+
52+
```shell
53+
protoc --go_out=. --go_opt=paths=source_relative \
54+
--go-grpc_out=. --go-grpc_opt=paths=source_relative \
55+
tunnel/tunnel.proto
56+
```
57+
58+
执行命令之后,会在 `tunnel` 目录生成 `tunnel.pb.go``tunnel_grpc.pb.go` 代码文件
59+
- `tunnel.pb.go` 结构体
60+
- `tunnel_grpc.pb.go`: 客户端和服务端代码
61+
62+
### 实现 gRPC 服务端
63+
```
64+
```
65+
66+
### 实现 gRPC 客户端
67+
```
68+
```
69+
70+
### 执行
71+
72+
启动 `gRPC server`
73+
``` shell
74+
go run server.go
75+
```
76+
77+
执行 `gRPC client`
78+
``` shell
79+
go run client.go
80+
81+
# 回显
82+
2022/03/20 19:43:13 say hello caoyingjun 12345
83+
```

practise/grpc/client.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
package main
2+
3+
import (
4+
"context"
5+
6+
"log"
7+
"time"
8+
9+
"google.golang.org/grpc"
10+
"google.golang.org/grpc/credentials/insecure"
11+
12+
pd "go-learning/practise/grpc/tunnel"
13+
)
14+
15+
var (
16+
addr = "127.0.0.1:8092"
17+
)
18+
19+
func main() {
20+
conn, err := grpc.Dial(addr, grpc.WithTransportCredentials(insecure.NewCredentials()))
21+
if err != nil {
22+
log.Fatalf("failed to connect rpc server %v", err)
23+
}
24+
defer conn.Close()
25+
26+
c := pd.NewTunnelClient(conn)
27+
28+
stream, err := c.Connect(context.Background())
29+
if err != nil {
30+
log.Fatalf("%v", err)
31+
}
32+
33+
clientId := "node2"
34+
35+
// 启动协程,接受服务段回调 client 的请求
36+
go func() {
37+
for {
38+
msg, err := stream.Recv()
39+
if err != nil {
40+
log.Printf("Receive error: %v", err)
41+
return
42+
}
43+
log.Printf("Received from server: %s", msg.Result)
44+
}
45+
}()
46+
47+
// 启动客户端定时测试DEMO
48+
ticker := time.NewTicker(10 * time.Second)
49+
defer ticker.Stop()
50+
51+
for range ticker.C {
52+
ts := time.Now().String()
53+
if err = stream.Send(&pd.Request{
54+
Type: clientId,
55+
Payload: []byte(ts),
56+
}); err != nil {
57+
log.Println("调用服务端失败", err)
58+
}
59+
}
60+
}

practise/grpc/server.go

Lines changed: 90 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
package main
2+
3+
import (
4+
"fmt"
5+
"io"
6+
"log"
7+
"net"
8+
"sync"
9+
10+
"github.com/gin-gonic/gin"
11+
"google.golang.org/grpc"
12+
13+
pd "go-learning/practise/grpc/tunnel"
14+
)
15+
16+
type server struct {
17+
pd.UnimplementedTunnelServer
18+
19+
clients map[string]pd.Tunnel_ConnectServer
20+
lock sync.RWMutex
21+
}
22+
23+
func (s *server) Connect(stream pd.Tunnel_ConnectServer) error {
24+
for {
25+
req, err := stream.Recv()
26+
if err == io.EOF {
27+
return nil
28+
}
29+
if err != nil {
30+
log.Printf("tream.Recv %v", err)
31+
return err
32+
}
33+
34+
s.lock.Lock()
35+
_, ok := s.clients[req.Type]
36+
if !ok {
37+
s.clients[req.Type] = stream
38+
}
39+
s.lock.Unlock()
40+
41+
// TODO 目前是DEMO
42+
log.Printf("Received from %s %s", req.Type, string(req.Payload))
43+
}
44+
}
45+
func (s *server) Call(c *gin.Context) {
46+
_, _ = s.CallClient(c.Query("clientId"), nil)
47+
}
48+
49+
func (s *server) CallClient(clientId string, data []byte) ([]byte, error) {
50+
stream, ok := s.clients[clientId]
51+
if !ok {
52+
return nil, fmt.Errorf("client not connected")
53+
}
54+
55+
// 发送调用请求
56+
err := stream.Send(&pd.Response{
57+
Result: []byte(clientId + " server callback"),
58+
})
59+
if err != nil {
60+
return nil, err
61+
}
62+
63+
return nil, err
64+
}
65+
66+
func main() {
67+
listener, err := net.Listen("tcp", ":8092")
68+
if err != nil {
69+
log.Fatalf("failed to listen %v", err)
70+
}
71+
72+
cs := &server{clients: make(map[string]pd.Tunnel_ConnectServer)}
73+
74+
s := grpc.NewServer()
75+
pd.RegisterTunnelServer(s, cs)
76+
77+
go func() {
78+
log.Printf("grpc listening at %v", listener.Addr())
79+
if err = s.Serve(listener); err != nil {
80+
log.Fatalf("failed to serve %v", err)
81+
}
82+
}()
83+
84+
r := gin.Default()
85+
r.GET("/ping", cs.Call)
86+
log.Printf("http listening at %v", ":8093")
87+
if err = r.Run(":8093"); err != nil {
88+
log.Fatalf("failed to start http: %v", err)
89+
}
90+
}

0 commit comments

Comments
 (0)