Skip to content

Commit 17503e5

Browse files
authored
Merge pull request #2099 from c9s/c9s/xmaker/trade-signal-refactor
REFACTOR: [xmaker] improve TradeVolumeWindowSignal memory usage
2 parents dab1aba + eaebe18 commit 17503e5

File tree

2 files changed

+410
-38
lines changed

2 files changed

+410
-38
lines changed

pkg/strategy/xmaker/signal_trade.go

Lines changed: 171 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -2,19 +2,19 @@ package xmaker
22

33
import (
44
"context"
5+
"math"
56
"sync"
67
"time"
78

89
"github.com/prometheus/client_golang/prometheus"
10+
"github.com/sirupsen/logrus"
911

1012
"github.com/c9s/bbgo/pkg/bbgo"
1113
"github.com/c9s/bbgo/pkg/fixedpoint"
1214
"github.com/c9s/bbgo/pkg/types"
1315
)
1416

1517
const tradeSliceCapacityLimit = 10000
16-
const tradeSliceShrinkThreshold = tradeSliceCapacityLimit * 4 / 5
17-
const tradeSliceShrinkSize = tradeSliceCapacityLimit * 1 / 5
1818

1919
var tradeVolumeWindowSignalMetrics = prometheus.NewGaugeVec(
2020
prometheus.GaugeOpts{
@@ -26,23 +26,53 @@ func init() {
2626
prometheus.MustRegister(tradeVolumeWindowSignalMetrics)
2727
}
2828

29+
// TradeVolumeWindowSignal uses a fixed capacity ring buffer to store trades.
2930
type TradeVolumeWindowSignal struct {
3031
Threshold fixedpoint.Value `json:"threshold"`
3132
Window types.Duration `json:"window"`
3233

34+
DecayRate float64 `json:"decayRate"` // decay rate for weighting trades (per second), 0 means no decay
35+
36+
// new unexported fields for frequency algorithm coefficients:
37+
//
38+
// Alpha: represents the volume contribution of each trade. A higher Alpha makes the trade amount
39+
// more significant in the overall score. Adjust this if you believe trade amounts should have more weight.
40+
//
41+
// Beta: represents the frequency contribution of each trade. Each trade contributes a constant value
42+
// (scaled by decay) to indicate its occurrence. Increase Beta if you want the frequency (number of trades)
43+
// to have a greater impact on the final signal.
44+
ConsiderFreq bool `json:"considerFreq"`
45+
Alpha float64 `json:"alpha"` // coefficient for trade volume (default 1.0)
46+
Beta float64 `json:"beta"` // coefficient for trade frequency (default 1.0)
47+
48+
// Use fixed capacity ring buffer
3349
trades []types.Trade
50+
start int // ring buffer start index
51+
count int // current number of stored trades
52+
3453
symbol string
3554

3655
mu sync.Mutex
3756
}
3857

58+
// handleTrade adds a trade into the ring buffer.
3959
func (s *TradeVolumeWindowSignal) handleTrade(trade types.Trade) {
4060
s.mu.Lock()
41-
s.trades = append(s.trades, trade)
42-
s.trades = types.ShrinkSlice(s.trades, tradeSliceShrinkThreshold, tradeSliceShrinkSize)
43-
s.mu.Unlock()
61+
defer s.mu.Unlock()
62+
63+
if s.count < tradeSliceCapacityLimit {
64+
// If not full, add trade directly.
65+
idx := (s.start + s.count) % tradeSliceCapacityLimit
66+
s.trades[idx] = trade
67+
s.count++
68+
} else {
69+
// If ring buffer is full, overwrite the oldest trade and update start index.
70+
s.trades[s.start] = trade
71+
s.start = (s.start + 1) % tradeSliceCapacityLimit
72+
}
4473
}
4574

75+
// Bind preallocates the fixed capacity ring buffer and binds the market trade callback.
4676
func (s *TradeVolumeWindowSignal) Bind(ctx context.Context, session *bbgo.ExchangeSession, symbol string) error {
4777
s.symbol = symbol
4878

@@ -54,32 +84,53 @@ func (s *TradeVolumeWindowSignal) Bind(ctx context.Context, session *bbgo.Exchan
5484
s.Threshold = fixedpoint.NewFromFloat(0.7)
5585
}
5686

57-
s.trades = make([]types.Trade, 0, tradeSliceCapacityLimit)
87+
// set defaults for frequency coefficients if not provided
88+
if s.Alpha == 0 {
89+
s.Alpha = 1.0
90+
}
91+
if s.Beta == 0 {
92+
s.Beta = 1.0
93+
}
94+
95+
// Preallocate fixed capacity ring buffer.
96+
s.trades = make([]types.Trade, tradeSliceCapacityLimit)
97+
s.start = 0
98+
s.count = 0
5899

59100
session.MarketDataStream.OnMarketTrade(s.handleTrade)
60101
return nil
61102
}
62103

104+
// filterTrades returns trades not before startTime, while updating the ring buffer.
63105
func (s *TradeVolumeWindowSignal) filterTrades(startTime time.Time) []types.Trade {
64-
startIdx := 0
65-
66106
s.mu.Lock()
67107
defer s.mu.Unlock()
68108

69-
for idx, td := range s.trades {
70-
// skip trades before the start time
71-
if td.Time.Before(startTime) {
72-
continue
109+
newStart := s.start
110+
n := s.count
111+
// Find first trade with time after startTime.
112+
for i := 0; i < s.count; i++ {
113+
idx := (s.start + i) % tradeSliceCapacityLimit
114+
if !s.trades[idx].Time.Before(startTime) {
115+
newStart = idx
116+
n = s.count - i
117+
break
73118
}
74-
75-
startIdx = idx
76-
break
77119
}
120+
// Update ring buffer: set start and count.
121+
s.start = newStart
122+
s.count = n
78123

79-
s.trades = s.trades[startIdx:]
80-
return s.trades
124+
// Copy valid data to a new slice for return.
125+
res := make([]types.Trade, n, n)
126+
for i := 0; i < n; i++ {
127+
idx := (s.start + i) % tradeSliceCapacityLimit
128+
res[i] = s.trades[idx]
129+
}
130+
return res
81131
}
82132

133+
// aggTradeVolume aggregates the buy and sell trade volumes.
83134
func (s *TradeVolumeWindowSignal) aggTradeVolume(trades []types.Trade) (buyVolume, sellVolume float64) {
84135
for _, td := range trades {
85136
if td.IsBuyer {
@@ -92,25 +143,116 @@ func (s *TradeVolumeWindowSignal) aggTradeVolume(trades []types.Trade) (buyVolum
92143
return buyVolume, sellVolume
93144
}
94145

95-
func (s *TradeVolumeWindowSignal) CalculateSignal(_ context.Context) (float64, error) {
146+
// aggDecayedTradeVolume aggregates trade volumes with exponential decay.
147+
// It calculates a weight for each trade as: weight = exp(-DecayRate * deltaSeconds),
148+
// where deltaSeconds is the difference between now and the trade's timestamp.
149+
func aggDecayedTradeVolume(now time.Time, trades []types.Trade, decayRate float64) (buyVolume, sellVolume float64) {
150+
for _, td := range trades {
151+
// calculate time difference in seconds
152+
delta := now.Sub(td.Time.Time()).Seconds()
153+
weight := math.Exp(-decayRate * delta)
154+
weightedQty := td.Quantity.Float64() * weight
155+
if td.IsBuyer {
156+
buyVolume += weightedQty
157+
} else {
158+
sellVolume += weightedQty
159+
}
160+
}
161+
return
162+
}
163+
164+
// calculateSignal computes the signal based on buy and sell volumes.
165+
// If the absolute value of the signal is below the threshold, zero is returned.
166+
func calculateSignal(buyVol, sellVol, threshold float64) float64 {
167+
total := buyVol + sellVol
168+
if total == 0 {
169+
return 0.0
170+
}
171+
// signal ranges from -1 to 1.
172+
sig := (buyVol - sellVol) / total
173+
if math.Abs(sig) < threshold {
174+
return 0.0
175+
}
176+
return sig
177+
}
178+
179+
// CalculateSignal computes the trading signal using decayed trade volumes.
180+
// If DecayRate > 0, older trades have less influence.
181+
// The computed signal is scaled by 2x.
182+
func (s *TradeVolumeWindowSignal) CalculateSignal(ctx context.Context) (float64, error) {
183+
if s.ConsiderFreq && s.Alpha != 0.0 && s.Beta != 0.0 {
184+
return s.CalculateSignalWithFrequency(ctx)
185+
}
186+
96187
now := time.Now()
97188
trades := s.filterTrades(now.Add(-time.Duration(s.Window)))
98-
buyVolume, sellVolume := s.aggTradeVolume(trades)
99-
totalVolume := buyVolume + sellVolume
100-
189+
var buyVolume, sellVolume float64
190+
if s.DecayRate > 0 {
191+
buyVolume, sellVolume = aggDecayedTradeVolume(now, trades, s.DecayRate)
192+
} else {
193+
buyVolume, sellVolume = s.aggTradeVolume(trades)
194+
}
101195
threshold := s.Threshold.Float64()
102-
buyRatio := buyVolume / totalVolume
103-
sellRatio := sellVolume / totalVolume
104196

105-
sig := 0.0
106-
if buyRatio > threshold {
107-
sig = buyRatio * 2.0
108-
} else if sellRatio > threshold {
109-
sig = -sellRatio * 2.0
197+
// Use the refined algorithm.
198+
sig := calculateSignal(buyVolume, sellVolume, threshold)
199+
sig *= 2.0
200+
201+
logrus.Infof("[TradeVolumeWindowSignal] signal=%f, buyVolume=%f, sellVolume=%f", sig, buyVolume, sellVolume)
202+
tradeVolumeWindowSignalMetrics.WithLabelValues(s.symbol).Set(sig)
203+
return sig, nil
204+
}
205+
206+
// aggDecayedTradeScore aggregates decayed trade volume and frequency scores.
207+
// For each trade, weight = exp(-decayRate * deltaSeconds) if decayRate > 0, else 1.
208+
// The score for each trade is computed as: (Alpha * quantity * weight + Beta * weight),
209+
// then summed separately for buy and sell trades.
210+
func aggDecayedTradeScore(
211+
now time.Time, trades []types.Trade, decayRate, alpha, beta float64,
212+
) (buyScore, sellScore float64) {
213+
for _, td := range trades {
214+
delta := now.Sub(td.Time.Time()).Seconds()
215+
weight := 1.0
216+
if decayRate > 0 {
217+
weight = math.Exp(-decayRate * delta)
218+
}
219+
score := alpha*td.Quantity.Float64()*weight + beta*weight
220+
if td.IsBuyer {
221+
buyScore += score
222+
} else {
223+
sellScore += score
224+
}
225+
}
226+
return
227+
}
228+
229+
// calculateSignalWithFrequency computes the signal using aggregated buy and sell scores.
230+
// It returns 0 if total score is zero or below the threshold.
231+
func calculateSignalWithFrequency(buyScore, sellScore, threshold float64) float64 {
232+
total := buyScore + sellScore
233+
if total == 0 {
234+
return 0.0
235+
}
236+
sig := (buyScore - sellScore) / total
237+
if math.Abs(sig) < threshold {
238+
return 0.0
110239
}
240+
return sig
241+
}
242+
243+
// CalculateSignalWithFrequency computes the trading signal using decayed trade volume and frequency.
244+
// It integrates both the trade amount and the trade occurrence (frequency) using the configured coefficients.
245+
// The final signal is scaled by 2x.
246+
func (s *TradeVolumeWindowSignal) CalculateSignalWithFrequency(_ context.Context) (float64, error) {
247+
now := time.Now()
248+
trades := s.filterTrades(now.Add(-time.Duration(s.Window)))
249+
buyScore, sellScore := aggDecayedTradeScore(now, trades, s.DecayRate, s.Alpha, s.Beta)
250+
threshold := s.Threshold.Float64()
111251

112-
log.Infof("[TradeVolumeWindowSignal] %f buy/sell = %f/%f", sig, buyVolume, sellVolume)
252+
sig := calculateSignalWithFrequency(buyScore, sellScore, threshold)
253+
sig *= 2.0
113254

255+
logrus.Infof("[TradeVolumeWindowSignal] frequency signal=%f, buyScore=%f, sellScore=%f", sig, buyScore, sellScore)
114256
tradeVolumeWindowSignalMetrics.WithLabelValues(s.symbol).Set(sig)
115257
return sig, nil
116258
}

0 commit comments

Comments
 (0)