Skip to content

Commit efc8fc9

Browse files
author
renxiangyu
committed
fix: network connectivity speed
Signed-off-by: renxiangyu <[email protected]>
1 parent 88a3efe commit efc8fc9

File tree

3 files changed

+235
-29
lines changed

3 files changed

+235
-29
lines changed

pkg/kosmosctl/floater/check.go

Lines changed: 52 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,8 @@ type CommandCheckOptions struct {
4545

4646
SrcFloater *Floater
4747
DstFloater *Floater
48+
49+
routinesMaxNum int
4850
}
4951

5052
type PrintCheckData struct {
@@ -92,6 +94,7 @@ func NewCmdCheck() *cobra.Command {
9294
flags.StringVar(&o.Port, "port", "8889", "Port used by floater.")
9395
flags.IntVarP(&o.PodWaitTime, "pod-wait-time", "w", 30, "Time for wait pod(floater) launch.")
9496
flags.StringVar(&o.Protocol, "protocol", string(TCP), "Protocol for the network problem.")
97+
flags.IntVarP(&o.routinesMaxNum, "routines-max-number", "", 5, "Number of goroutines to use.")
9598

9699
return cmd
97100
}
@@ -199,60 +202,80 @@ func (o *CommandCheckOptions) Run() error {
199202
func (o *CommandCheckOptions) RunRange(iPodInfos []*FloatInfo, jPodInfos []*FloatInfo) []*PrintCheckData {
200203
var resultData []*PrintCheckData
201204

205+
goroutinePool := utils.NewGoroutinePool(o.routinesMaxNum)
206+
202207
if len(iPodInfos) > 0 && len(jPodInfos) > 0 {
203208
for _, iPodInfo := range iPodInfos {
204209
for _, jPodInfo := range jPodInfos {
205210
for _, ip := range jPodInfo.PodIPs {
206-
var targetIP string
207-
var err error
208-
var cmdResult *command.Result
209-
if o.DstFloater != nil {
210-
targetIP, err = netmap.NetMap(ip, o.DstFloater.CIDRsMap)
211-
} else {
212-
targetIP = ip
213-
}
214-
if err != nil {
215-
cmdResult = command.ParseError(err)
216-
} else {
217-
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
218-
cmdObj := &command.Ping{
219-
TargetIP: targetIP,
211+
routineIPodInfo := iPodInfo
212+
routineJPodInfo := jPodInfo
213+
routineIp := ip
214+
goroutinePool.Submit(func(args ...interface{}) {
215+
var targetIP string
216+
var err error
217+
var cmdResult *command.Result
218+
if o.DstFloater != nil {
219+
targetIP, err = netmap.NetMap(routineIp, o.DstFloater.CIDRsMap)
220+
} else {
221+
targetIP = routineIp
222+
}
223+
if err != nil {
224+
cmdResult = command.ParseError(err)
225+
} else {
226+
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
227+
cmdObj := &command.Ping{
228+
TargetIP: targetIP,
229+
}
230+
cmdResult = o.SrcFloater.CommandExec(routineIPodInfo, cmdObj)
220231
}
221-
cmdResult = o.SrcFloater.CommandExec(iPodInfo, cmdObj)
222-
}
223-
resultData = append(resultData, &PrintCheckData{
224-
*cmdResult,
225-
iPodInfo.NodeName, jPodInfo.NodeName, targetIP,
226-
})
232+
resultData = append(resultData, &PrintCheckData{
233+
*cmdResult,
234+
routineIPodInfo.NodeName, routineJPodInfo.NodeName, targetIP,
235+
})
236+
}, routineIPodInfo, routineJPodInfo, routineIp)
227237
}
228238
}
229239
}
230240
}
231241

242+
goroutinePool.Wait()
243+
goroutinePool.Shutdown()
244+
232245
return resultData
233246
}
234247

235248
func (o *CommandCheckOptions) RunNative(iNodeInfos []*FloatInfo, jNodeInfos []*FloatInfo) []*PrintCheckData {
236249
var resultData []*PrintCheckData
237250

251+
goroutinePool := utils.NewGoroutinePool(o.routinesMaxNum)
252+
238253
if len(iNodeInfos) > 0 && len(jNodeInfos) > 0 {
239254
for _, iNodeInfo := range iNodeInfos {
240255
for _, jNodeInfo := range jNodeInfos {
241256
for _, ip := range jNodeInfo.NodeIPs {
242-
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
243-
cmdObj := &command.Ping{
244-
TargetIP: ip,
245-
}
246-
cmdResult := o.SrcFloater.CommandExec(iNodeInfo, cmdObj)
247-
resultData = append(resultData, &PrintCheckData{
248-
*cmdResult,
249-
iNodeInfo.NodeName, jNodeInfo.NodeName, ip,
250-
})
257+
routineINodeInfo := iNodeInfo
258+
routineJNodeInfo := jNodeInfo
259+
routineIp := ip
260+
goroutinePool.Submit(func(args ...interface{}) {
261+
// ToDo RunRange && RunNative func support multiple commands, and the code needs to be optimized
262+
cmdObj := &command.Ping{
263+
TargetIP: routineIp,
264+
}
265+
cmdResult := o.SrcFloater.CommandExec(routineINodeInfo, cmdObj)
266+
resultData = append(resultData, &PrintCheckData{
267+
*cmdResult,
268+
routineINodeInfo.NodeName, routineJNodeInfo.NodeName, routineIp,
269+
})
270+
}, routineINodeInfo, routineJNodeInfo, routineIp)
251271
}
252272
}
253273
}
254274
}
255275

276+
goroutinePool.Wait()
277+
goroutinePool.Shutdown()
278+
256279
return resultData
257280
}
258281

pkg/utils/goroutine_pool.go

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
package utils
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
)
7+
8+
type GoroutinePool struct {
9+
pool chan int
10+
funcChan chan RoutineFunc
11+
waitGroup *sync.WaitGroup
12+
}
13+
14+
type RoutineFunc struct {
15+
f interface{}
16+
args []interface{}
17+
}
18+
19+
func NewGoroutinePool(size int) *GoroutinePool {
20+
return &GoroutinePool{
21+
pool: make(chan int, size),
22+
funcChan: make(chan RoutineFunc, size),
23+
waitGroup: &sync.WaitGroup{},
24+
}
25+
}
26+
27+
func (g *GoroutinePool) Submit(f interface{}, args ...interface{}) {
28+
g.funcChan <- RoutineFunc{f: f, args: args}
29+
g.pool <- 1
30+
g.waitGroup.Add(1)
31+
32+
go func() {
33+
task := <-g.funcChan
34+
switch f := task.f.(type) {
35+
case func():
36+
f()
37+
case func(args ...interface{}):
38+
f(task.args...)
39+
default:
40+
fmt.Println("Invalid task type")
41+
}
42+
defer g.Done()
43+
}()
44+
}
45+
46+
func (g *GoroutinePool) Wait() {
47+
g.waitGroup.Wait()
48+
}
49+
50+
func (g *GoroutinePool) Done() {
51+
<-g.pool
52+
g.waitGroup.Done()
53+
}
54+
55+
func (g *GoroutinePool) Shutdown() {
56+
close(g.pool)
57+
}

pkg/utils/goroutine_pool_test.go

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
package utils
2+
3+
import (
4+
"fmt"
5+
"sync"
6+
"testing"
7+
"time"
8+
)
9+
10+
func TestGoroutinePool_Submit(t *testing.T) {
11+
type fields struct {
12+
pool chan int
13+
funcChan chan RoutineFunc
14+
waitGroup *sync.WaitGroup
15+
}
16+
type args struct {
17+
f interface{}
18+
args []interface{}
19+
}
20+
tests := []struct {
21+
name string
22+
fields fields
23+
args []args
24+
}{
25+
// func has no parameters
26+
{"Test1", fields{
27+
pool: make(chan int, 5),
28+
funcChan: make(chan RoutineFunc, 5),
29+
waitGroup: &sync.WaitGroup{},
30+
}, []args{
31+
{
32+
f: func() {
33+
for i := 0; i < 10; i++ {
34+
fmt.Println(i)
35+
time.Sleep(1000)
36+
}
37+
},
38+
args: nil,
39+
}, {
40+
f: func() {
41+
for i := 10; i < 20; i++ {
42+
fmt.Println(i)
43+
time.Sleep(1000)
44+
}
45+
},
46+
args: nil,
47+
},
48+
}},
49+
// func has parameters
50+
{"Test2", fields{
51+
pool: make(chan int, 5),
52+
funcChan: make(chan RoutineFunc, 5),
53+
waitGroup: &sync.WaitGroup{},
54+
}, []args{
55+
{
56+
f: func(args ...interface{}) {
57+
for _, arg := range args {
58+
fmt.Println(arg)
59+
time.Sleep(1000)
60+
}
61+
},
62+
args: []interface{}{"a", "b", "c", "d"},
63+
}, {
64+
f: func(args ...interface{}) {
65+
for _, arg := range args {
66+
fmt.Println(arg)
67+
time.Sleep(1000)
68+
}
69+
},
70+
args: []interface{}{"e", "f", "g", "h"},
71+
},
72+
}},
73+
// the thread capacity is 1
74+
{"Test3", fields{
75+
pool: make(chan int, 1),
76+
funcChan: make(chan RoutineFunc, 1),
77+
waitGroup: &sync.WaitGroup{},
78+
}, []args{
79+
{
80+
f: func() {
81+
for i := 0; i < 10; i++ {
82+
fmt.Println(i)
83+
time.Sleep(1000)
84+
}
85+
},
86+
args: nil,
87+
}, {
88+
f: func() {
89+
for i := 10; i < 20; i++ {
90+
fmt.Println(i)
91+
time.Sleep(1000)
92+
}
93+
},
94+
args: nil,
95+
},
96+
}},
97+
//incorrect func parameter
98+
{"Test4", fields{
99+
pool: make(chan int, 5),
100+
funcChan: make(chan RoutineFunc, 5),
101+
waitGroup: &sync.WaitGroup{},
102+
}, []args{
103+
{
104+
f: func(a int) {
105+
fmt.Println(a)
106+
},
107+
args: []interface{}{"hello"},
108+
},
109+
}},
110+
}
111+
for _, tt := range tests {
112+
t.Run(tt.name, func(t *testing.T) {
113+
g := &GoroutinePool{
114+
pool: tt.fields.pool,
115+
funcChan: tt.fields.funcChan,
116+
waitGroup: tt.fields.waitGroup,
117+
}
118+
for _, arg := range tt.args {
119+
g.Submit(arg.f, arg.args...)
120+
}
121+
g.Wait()
122+
g.Shutdown()
123+
fmt.Println("success!")
124+
})
125+
}
126+
}

0 commit comments

Comments
 (0)