Skip to content

Commit 9f38fac

Browse files
zhouhaoA1xyz2277OrangeBaowuyingjun-lucky
committed
rabbitmq implements of middleware interface
Co-authored-by: zhangyongxi <[email protected]> Co-authored-by: baoyinghai <[email protected]> Co-authored-by: wuyingjun <[email protected]> Signed-off-by: zhouhaoA1 <[email protected]>
1 parent 189fc9d commit 9f38fac

31 files changed

+9099
-19
lines changed

go.mod

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ require (
1616
github.com/prometheus/exporter-toolkit v0.10.0
1717
github.com/spf13/cobra v1.7.0
1818
github.com/spf13/pflag v1.0.5
19+
github.com/streadway/amqp v1.1.0
1920
github.com/stretchr/testify v1.8.3
2021
go.uber.org/atomic v1.10.0
2122
gopkg.in/natefinch/lumberjack.v2 v2.2.1

go.sum

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,8 @@ github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
490490
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
491491
github.com/stoewer/go-strcase v1.2.0 h1:Z2iHWqGXH00XYgqDmNgQbIBxf3wrNq0F3feEy0ainaU=
492492
github.com/stoewer/go-strcase v1.2.0/go.mod h1:IBiWB2sKIp3wVVQ3Y035++gc+knqhUQag1KpM8ahLw8=
493+
github.com/streadway/amqp v1.1.0 h1:py12iX8XSyI7aN/3dUT8DFIDJazNJsVJdxNVEpnQTZM=
494+
github.com/streadway/amqp v1.1.0/go.mod h1:WYSrTEYHOXHd0nwFeUXAe2G2hRnQT+deZJJf88uS9Bg=
493495
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
494496
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
495497
github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE=

pkg/watcher/codec/event_codec.go

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
package codec
2+
3+
import (
4+
"bytes"
5+
"fmt"
6+
7+
"k8s.io/apimachinery/pkg/api/meta"
8+
"k8s.io/apimachinery/pkg/runtime"
9+
"k8s.io/apimachinery/pkg/watch"
10+
)
11+
12+
const EVENTTYPE = "EventTypeForLabel"
13+
14+
func EventEncode(eventType watch.EventType, obj runtime.Object, codec runtime.Codec) ([]byte, error) {
15+
accessor := meta.NewAccessor()
16+
labels, err := accessor.Labels(obj)
17+
if err != nil {
18+
return nil, err
19+
}
20+
if labels == nil {
21+
labels = make(map[string]string)
22+
}
23+
labels[EVENTTYPE] = string(eventType)
24+
err = accessor.SetLabels(obj, labels)
25+
if err != nil {
26+
return nil, err
27+
}
28+
29+
var buffer bytes.Buffer
30+
if err := codec.Encode(obj, &buffer); err != nil {
31+
return nil, err
32+
}
33+
34+
return buffer.Bytes(), nil
35+
}
36+
37+
func EventDecode(value []byte, codec runtime.Codec, newFunc func() runtime.Object) (*watch.Event, error) {
38+
into := newFunc()
39+
obj, _, err := codec.Decode(value, nil, into)
40+
if err != nil {
41+
return nil, err
42+
}
43+
44+
accessor := meta.NewAccessor()
45+
labels, err := accessor.Labels(obj)
46+
if err != nil {
47+
return nil, err
48+
}
49+
var eventType string
50+
if labels != nil {
51+
eventType = labels[EVENTTYPE]
52+
delete(labels, EVENTTYPE)
53+
if eventType == "" {
54+
return nil, fmt.Errorf("event can not find eventtype")
55+
}
56+
}
57+
err = accessor.SetLabels(obj, labels)
58+
if err != nil {
59+
return nil, err
60+
}
61+
62+
return &watch.Event{
63+
Type: watch.EventType(eventType),
64+
Object: obj,
65+
}, nil
66+
}
Lines changed: 315 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,315 @@
1+
package rabbitmq
2+
3+
import (
4+
"context"
5+
"fmt"
6+
"math/rand"
7+
"strings"
8+
"time"
9+
10+
"github.com/streadway/amqp"
11+
"k8s.io/apimachinery/pkg/runtime"
12+
"k8s.io/apimachinery/pkg/runtime/schema"
13+
"k8s.io/apimachinery/pkg/watch"
14+
"k8s.io/klog/v2"
15+
16+
"github.com/clusterpedia-io/clusterpedia/pkg/watcher/codec"
17+
watchcomponents "github.com/clusterpedia-io/clusterpedia/pkg/watcher/components"
18+
)
19+
20+
const (
21+
RoleConsumer = "consumer"
22+
RoleProducer = "producer"
23+
)
24+
25+
type QueueExchange struct {
26+
QueueName string
27+
RoutingKey string
28+
ExchangeName string
29+
ExchangeType string
30+
}
31+
32+
type RabbitClient struct {
33+
QueueExchange
34+
conn *RabbitConnection
35+
channel *amqp.Channel
36+
codec runtime.Codec
37+
started bool
38+
cliStopCh chan bool
39+
globalStopCh <-chan struct{}
40+
expiresPerSend int
41+
notifyConfirm chan amqp.Confirmation // msg send confirmed chan
42+
notifyClose chan *amqp.Error // channel closed chan
43+
role string
44+
newFunc func() runtime.Object // event decode
45+
queueExpires int64
46+
}
47+
48+
func NewProducer(queueEx QueueExchange, conn *RabbitConnection, codec runtime.Codec, expiresPerSend int, gStopCh <-chan struct{}) *RabbitClient {
49+
return &RabbitClient{
50+
QueueExchange: queueEx,
51+
conn: conn,
52+
codec: codec,
53+
cliStopCh: make(chan bool, 1),
54+
globalStopCh: gStopCh,
55+
expiresPerSend: expiresPerSend,
56+
role: RoleProducer,
57+
}
58+
}
59+
60+
func NewConsumer(queueEx QueueExchange, conn *RabbitConnection, codec runtime.Codec, gStopCh <-chan struct{}, newFunc func() runtime.Object, queueExpires int64) *RabbitClient {
61+
return &RabbitClient{
62+
QueueExchange: queueEx,
63+
conn: conn,
64+
codec: codec,
65+
cliStopCh: make(chan bool, 1),
66+
globalStopCh: gStopCh,
67+
role: RoleConsumer,
68+
newFunc: newFunc,
69+
queueExpires: queueExpires,
70+
}
71+
}
72+
73+
func NewQueue(quePrefix string, conn *RabbitConnection, queueExpires int64) string {
74+
for {
75+
ch := CreateChannel(conn)
76+
77+
randStr := fmt.Sprintf("%d%d%d%d", rand.Intn(10), rand.Intn(10), rand.Intn(10), rand.Intn(10))
78+
timeStr := time.Now().Format("2006-01-02 15-04-05")
79+
timeStr = strings.ReplaceAll(timeStr, "-", "")
80+
timeStr = strings.ReplaceAll(timeStr, " ", "")
81+
queue := fmt.Sprintf("%s_%s_%s", quePrefix, timeStr, randStr)
82+
args := make(amqp.Table, 1)
83+
args["x-expires"] = queueExpires
84+
_, err := ch.QueueDeclarePassive(queue, true, false, false, false, args)
85+
if err == nil { // queue already exist
86+
_ = ch.Close()
87+
continue
88+
} else { // declare the queue
89+
_ = ch.Close()
90+
ch := CreateChannel(conn)
91+
_, err = ch.QueueDeclare(queue, true, false, false, false, args)
92+
if err != nil {
93+
klog.Errorf("rabbitmq queueDeclare failed: %v", err)
94+
_ = ch.Close()
95+
continue
96+
} else {
97+
_ = ch.Close()
98+
return queue
99+
}
100+
}
101+
}
102+
}
103+
104+
// CreateChannel open a channel until success
105+
func CreateChannel(conn *RabbitConnection) *amqp.Channel {
106+
for {
107+
conn.tryConnect()
108+
ch, err := conn.NewChannel()
109+
if err != nil {
110+
klog.Error("open channel failed. ", err, ". retry after 1 second")
111+
time.Sleep(1 * time.Second)
112+
continue
113+
} else {
114+
return ch
115+
}
116+
}
117+
}
118+
119+
func (r *RabbitClient) Destroy() (err error) {
120+
r.cliStopCh <- true
121+
return nil
122+
}
123+
124+
func (r *RabbitClient) DestroyGvr() {
125+
klog.Info("consume stopped for client stop cmd. delete queue: ", r.QueueName)
126+
_, err := r.channel.QueueDelete(r.QueueName, false, false, true)
127+
if err != nil {
128+
klog.Errorf("delete %s queue fail. %v", r.QueueName, err.Error())
129+
} else {
130+
klog.Info("deleted queue ", r.QueueName)
131+
}
132+
_ = r.closeChannel()
133+
}
134+
135+
func (r *RabbitClient) initChannel() {
136+
for {
137+
r.channel = CreateChannel(r.conn)
138+
err := r.initQuExchange()
139+
if err != nil {
140+
klog.Error("init channel failed. ", err.Error())
141+
_ = r.closeChannel()
142+
continue
143+
} else {
144+
return
145+
}
146+
}
147+
}
148+
149+
func (r *RabbitClient) initQuExchange() error {
150+
args := make(amqp.Table, 1)
151+
args["x-expires"] = r.queueExpires
152+
err := r.channel.ExchangeDeclare(r.ExchangeName, r.ExchangeType, true, false, false, false, args)
153+
if err != nil {
154+
return fmt.Errorf("rabbitmq exchangeDeclare failed: %v", err)
155+
}
156+
157+
r.notifyClose = r.channel.NotifyClose(make(chan *amqp.Error, 1)) // listen channel close event
158+
159+
if r.role == RoleProducer {
160+
err = r.channel.Confirm(false) // set msg confirm mode
161+
if err != nil {
162+
return fmt.Errorf("rabbitmq confirm error. %v", err)
163+
}
164+
r.notifyConfirm = r.channel.NotifyPublish(make(chan amqp.Confirmation, 1))
165+
} else {
166+
_, err = r.channel.QueueDeclare(r.QueueName, true, false, false, false, args)
167+
if err != nil {
168+
return fmt.Errorf("rabbitmq queueDeclare failed: %v", err)
169+
}
170+
171+
err = r.channel.QueueBind(r.QueueName, r.RoutingKey, r.ExchangeName, false, nil)
172+
if err != nil {
173+
return fmt.Errorf("rabbitmq queueBind failed: %v", err)
174+
}
175+
176+
err = r.channel.Qos(1, 0, false)
177+
if err != nil {
178+
return fmt.Errorf("rabbitmq Qos failed: %v", err)
179+
}
180+
}
181+
return nil
182+
}
183+
184+
func (r *RabbitClient) closeChannel() (err error) {
185+
r.channel.Close()
186+
if err != nil {
187+
return fmt.Errorf("close rabbitmq channel failed: %v", err)
188+
}
189+
return
190+
}
191+
192+
// sendEventSynchro send message until success
193+
func (r *RabbitClient) sendEventSynchro(event *watch.Event, expiresPerTry int) error {
194+
msgBytes, err := codec.EventEncode(event.Type, event.Object, r.codec)
195+
if err != nil {
196+
return fmt.Errorf("event encode failed. error: %v", err.Error())
197+
}
198+
ticker := time.NewTicker(time.Duration(expiresPerTry) * time.Second)
199+
defer ticker.Stop()
200+
for {
201+
_ = r.channel.Publish(
202+
r.ExchangeName,
203+
r.RoutingKey,
204+
false,
205+
false,
206+
amqp.Publishing{
207+
ContentType: "text/plain",
208+
Body: msgBytes,
209+
})
210+
211+
select {
212+
case c := <-r.notifyConfirm:
213+
if !c.Ack {
214+
klog.Errorf("rabbit confirm ack false. retry init channel and send. exchange: %s", r.ExchangeName)
215+
} else {
216+
return nil
217+
}
218+
case <-ticker.C:
219+
klog.Errorf("send event timeout. retry init channel and send. exchange: %s", r.ExchangeName)
220+
}
221+
222+
_ = r.closeChannel()
223+
r.initChannel()
224+
}
225+
}
226+
227+
func (r *RabbitClient) Produce(eventChan chan *watchcomponents.EventWithCluster, publishEvent func(context.Context, *watchcomponents.EventWithCluster),
228+
ctx context.Context, genCrv2Event func(event *watch.Event)) {
229+
for {
230+
r.initChannel()
231+
232+
LOOP:
233+
for {
234+
select {
235+
case e := <-r.notifyClose:
236+
klog.Warningf("channel notifyClose: %v. exchange: %s. retry channel connecting", e.Error(), r.ExchangeName)
237+
break LOOP
238+
case event := <-eventChan:
239+
genCrv2Event(event.Event)
240+
err := r.sendEventSynchro(event.Event, r.expiresPerSend)
241+
if err != nil {
242+
klog.Errorf("send event error %v. exchange: %s. this should not happen normally", err.Error(), r.ExchangeName)
243+
} else {
244+
publishEvent(ctx, event)
245+
}
246+
case <-r.cliStopCh:
247+
klog.Info("produce stopped for client stop cmd. exchange: ", r.ExchangeName)
248+
_ = r.closeChannel()
249+
close(r.cliStopCh)
250+
return
251+
case <-r.globalStopCh:
252+
klog.Info("produce stopped for global publisher stopped. exchange: ", r.ExchangeName)
253+
_ = r.closeChannel()
254+
return
255+
}
256+
}
257+
}
258+
}
259+
260+
func (r *RabbitClient) Consume(enqueueFunc func(event *watch.Event), clearfunc func()) {
261+
for {
262+
r.initChannel()
263+
msgList, err := r.channel.Consume(r.QueueName, "", false, false, false, false, nil)
264+
if err != nil {
265+
klog.Errorf("consume err: ", err.Error())
266+
_ = r.closeChannel()
267+
continue
268+
}
269+
270+
LOOP:
271+
for {
272+
select {
273+
case <-r.cliStopCh:
274+
klog.Info("consume stopped for client stop cmd. delete queue: ", r.QueueName)
275+
_, err = r.channel.QueueDelete(r.QueueName, false, false, true)
276+
if err != nil {
277+
klog.Errorf("delete %s queue fail. %v", r.QueueName, err.Error())
278+
} else {
279+
klog.Info("deleted queue ", r.QueueName)
280+
}
281+
_ = r.closeChannel()
282+
close(r.cliStopCh)
283+
return
284+
case msg := <-msgList:
285+
//处理数据
286+
event, _ := codec.EventDecode(msg.Body, r.codec, r.newFunc)
287+
klog.V(7).Infof("Event in to cache %v : %v \n", event.Type, event.Object.GetObjectKind().GroupVersionKind())
288+
err = msg.Ack(true)
289+
if err != nil {
290+
klog.Errorf("msg ack error: %v. event: %v, queue: %s. retry init channel and consume...", err.Error(), event.Type, r.QueueName)
291+
break LOOP
292+
}
293+
enqueueFunc(event)
294+
case e := <-r.notifyClose:
295+
klog.Warningf("channel notifyClose: %v. queue: %s. retry channel connecting", e.Error(), r.QueueName)
296+
break LOOP
297+
case <-r.globalStopCh:
298+
klog.Info("consume stopped for global publisher stopped. delete queue: ", r.QueueName)
299+
_, err = r.channel.QueueDelete(r.QueueName, false, false, true)
300+
if err != nil {
301+
klog.Errorf("delete %s queue fail. %v", r.QueueName, err.Error())
302+
} else {
303+
klog.Info("deleted queue ", r.QueueName)
304+
}
305+
_ = r.closeChannel()
306+
return
307+
}
308+
}
309+
}
310+
}
311+
312+
func GvrString(gvr schema.GroupVersionResource) string {
313+
group := strings.ReplaceAll(gvr.Group, ".", "_")
314+
return fmt.Sprintf("%s_%s_%s", group, gvr.Version, gvr.Resource)
315+
}

0 commit comments

Comments
 (0)