Skip to content

Commit 6ff0fb6

Browse files
authored
Add pipeline clean (#966)
1 parent 3eefa52 commit 6ff0fb6

File tree

9 files changed

+88
-22
lines changed

9 files changed

+88
-22
lines changed

pkg/container/vector/vector.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -395,17 +395,23 @@ func (v *Vector) Dup(proc *process.Process) (*Vector, error) {
395395
Ref: v.Ref,
396396
}, nil
397397
case types.T_char, types.T_varchar, types.T_json:
398+
var err error
399+
var data []byte
400+
398401
vs := v.Col.(*types.Bytes)
399-
data, err := proc.Alloc(int64(len(vs.Data)))
400-
if err != nil {
401-
return nil, err
402-
}
403402
ws := &types.Bytes{
404-
Data: data,
405403
Offsets: make([]uint32, len(vs.Offsets)),
406404
Lengths: make([]uint32, len(vs.Lengths)),
407405
}
408-
copy(ws.Data, vs.Data)
406+
if len(vs.Data) > 0 {
407+
if data, err = proc.Alloc(int64(len(vs.Data))); err != nil {
408+
return nil, err
409+
}
410+
ws.Data = data
411+
copy(ws.Data, vs.Data)
412+
} else {
413+
ws.Data = make([]byte, 0)
414+
}
409415
copy(ws.Offsets, vs.Offsets)
410416
copy(ws.Lengths, vs.Lengths)
411417
return &Vector{

pkg/sql/colexec/merge/merge.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,9 @@ func Call(proc *process.Process, _ interface{}) (bool, error) {
3434
}
3535
for i := 0; i < len(proc.Reg.MergeReceivers); i++ {
3636
reg := proc.Reg.MergeReceivers[i]
37+
if reg.Ch == nil {
38+
continue
39+
}
3740
v := <-reg.Ch
3841
if v == nil {
3942
reg.Ch = nil

pkg/sql/colexec/transfer/transfer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ func Call(proc *process.Process, arg interface{}) (bool, error) {
6969
vecs := n.vecs[:0]
7070
for i := range bat.Vecs {
7171
if bat.Vecs[i].Or {
72-
vec, err := bat.Vecs[i].Dup(n.Proc)
72+
vec, err := bat.Vecs[i].Dup(proc)
7373
if err != nil {
7474
clean(vecs, n.Proc)
7575
return false, err

pkg/sql/compile/compile.go

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,8 +71,9 @@ func (c *compile) Build() ([]*Exec, error) {
7171
es := make([]*Exec, len(stmts))
7272
for i, stmt := range stmts {
7373
es[i] = &Exec{
74-
c: c,
75-
stmt: stmt,
74+
c: c,
75+
stmt: stmt,
76+
affectRows: 0,
7677
}
7778
}
7879
return es, nil
@@ -147,6 +148,18 @@ func (e *Exec) Columns() []*Col {
147148
return e.resultCols
148149
}
149150

151+
func (e *Exec) IncreaseAffectedRows(n uint64) {
152+
e.affectRows += n
153+
}
154+
155+
func (e *Exec) SetAffectedRows(n uint64) {
156+
e.affectRows = n
157+
}
158+
159+
func (e *Exec) GetAffectedRows() uint64 {
160+
return e.affectRows
161+
}
162+
150163
// Run applies the scopes to the specified data object
151164
// and run through the instruction in each of the scope.
152165
func (e *Exec) Run(ts uint64) error {
@@ -179,8 +192,10 @@ func (e *Exec) Run(ts uint64) error {
179192
case Insert:
180193
wg.Add(1)
181194
go func(s *Scope) {
182-
if err := s.Insert(ts); err != nil {
195+
if rows, err := s.Insert(ts); err != nil {
183196
e.err = err
197+
} else {
198+
e.SetAffectedRows(rows)
184199
}
185200
wg.Done()
186201
}(e.scopes[i])
@@ -242,7 +257,6 @@ func (e *Exec) Run(ts uint64) error {
242257
}(e.scopes[i])
243258
}
244259
}
245-
246260
wg.Wait()
247261
return e.err
248262
}

pkg/sql/compile/scope.go

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,9 +128,11 @@ func (s *Scope) RemoteRun(e engine.Engine) error {
128128

129129
arg := s.Instructions[len(s.Instructions)-1].Arg.(*transfer.Argument)
130130
defer func() {
131-
arg.Reg.Wg.Add(1)
132-
arg.Reg.Ch <- nil
133-
arg.Reg.Wg.Wait()
131+
if arg.Reg.Ch != nil {
132+
arg.Reg.Wg.Add(1)
133+
arg.Reg.Ch <- nil
134+
arg.Reg.Wg.Wait()
135+
}
134136
}()
135137
encoder, decoder := rpcserver.NewCodec(1 << 30)
136138
conn := goetty.NewIOSession(goetty.WithCodec(encoder, decoder))
@@ -161,17 +163,23 @@ func (s *Scope) RemoteRun(e engine.Engine) error {
161163
if err != nil {
162164
return err
163165
}
166+
if arg.Reg.Ch == nil {
167+
if bat != nil {
168+
bat.Clean(s.Proc)
169+
}
170+
continue
171+
}
164172
arg.Reg.Wg.Add(1)
165173
arg.Reg.Ch <- bat
166174
arg.Reg.Wg.Wait()
167175
}
168176
return nil
169177
}
170178

171-
func (s *Scope) Insert(ts uint64) error {
179+
func (s *Scope) Insert(ts uint64) (uint64, error) {
172180
o, _ := s.Operator.(*insert.Insert)
173181
defer o.R.Close()
174-
return o.R.Write(ts, o.Bat)
182+
return uint64(o.Bat.Vecs[0].Length()), o.R.Write(ts, o.Bat)
175183
}
176184

177185
func (s *Scope) Explain(u interface{}, fill func(interface{}, *batch.Batch) error) error {

pkg/sql/compile/top.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (c *compile) compileTopOutput(o *top.Top, mp map[string]uint64) ([]*Scope,
3232
mp[g.Name]++
3333
}
3434
}
35-
ss, err := c.compile(o.Prev.(*projection.Projection), mp)
35+
ss, err := c.compileOutput(o.Prev.(*projection.Projection), mp)
3636
if err != nil {
3737
return nil, err
3838
}

pkg/sql/compile/types.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,8 @@ type Exec struct {
8585
resultCols []*Col
8686
scopes []*Scope
8787
c *compile
88+
//affectRows stores the number of rows affected while insert / update / delete
89+
affectRows uint64
8890
//e a dbengine instance
8991
e engine.Engine
9092
//stmt ast of a single sql

pkg/sql/protocol/protocol.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1126,8 +1126,10 @@ func EncodeVector(v *vector.Vector, buf *bytes.Buffer) error {
11261126
size += uint64(v)
11271127
}
11281128
buf.Write(encoding.EncodeUint64(size))
1129-
for i, j := int64(0), int64(cnt); i < j; i++ {
1130-
buf.Write(vs.Get(i))
1129+
if size > 0 {
1130+
for i, j := int64(0), int64(cnt); i < j; i++ {
1131+
buf.Write(vs.Get(i))
1132+
}
11311133
}
11321134
}
11331135
case types.T_tuple:

pkg/vm/pipeline/pipeline.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ package pipeline
1616

1717
import (
1818
"bytes"
19+
"fmt"
20+
"matrixone/pkg/container/batch"
1921
"matrixone/pkg/vm"
2022
"matrixone/pkg/vm/engine"
2123
"matrixone/pkg/vm/mempool"
@@ -86,6 +88,9 @@ func (p *Pipeline) Run(segs []engine.Segment, proc *process.Process) (bool, erro
8688
}
8789

8890
func (p *Pipeline) RunMerge(proc *process.Process) (bool, error) {
91+
var end bool
92+
var err error
93+
8994
proc.Mp = mempool.New()
9095
defer func() {
9196
proc.Reg.InputBatch = nil
@@ -96,11 +101,13 @@ func (p *Pipeline) RunMerge(proc *process.Process) (bool, error) {
96101
vm.Clean(p.instructions, proc)
97102
return false, err
98103
}
99-
i := 0
100104
for {
101-
i++
102105
proc.Reg.InputBatch = nil
103-
if end, err := vm.Run(p.instructions, proc); err != nil || end {
106+
if end, err = vm.Run(p.instructions, proc); err != nil || end {
107+
{
108+
fmt.Printf("+++%p begin clean\n", p)
109+
}
110+
p.clean(proc)
104111
return end, err
105112
}
106113
}
@@ -121,6 +128,30 @@ func (p *Pipeline) prefetch(segs []engine.Segment, proc *process.Process) *queue
121128
return q
122129
}
123130

131+
func (p *Pipeline) clean(proc *process.Process) {
132+
for _, reg := range proc.Reg.MergeReceivers {
133+
if reg.Ch != nil {
134+
v := <-reg.Ch
135+
switch {
136+
case v == nil:
137+
reg.Ch = nil
138+
reg.Wg.Done()
139+
default:
140+
bat := v.(*batch.Batch)
141+
if bat == nil || bat.Attrs == nil {
142+
reg.Ch = nil
143+
reg.Wg.Done()
144+
} else {
145+
bat.Clean(proc)
146+
reg.Ch = nil
147+
reg.Wg.Done()
148+
}
149+
}
150+
}
151+
}
152+
153+
}
154+
124155
// prefetch
125156
func (q *queue) prefetch(attrs []string) error {
126157
if q.prefetchIndex == len(q.blocks) {

0 commit comments

Comments
 (0)