Skip to content

Commit 4cedaf4

Browse files
authored
Add 'blacklist-regex' config option (#166)
* add 'validation-regex' config option * make validationRegex public * use base struct instead of tagdesc * fix empty regex * shorten expressions * cosmetic changes * add validationRegexDropped metric * rename validation-regex -> blacklist-regex
1 parent 93622ce commit 4cedaf4

File tree

12 files changed

+186
-15
lines changed

12 files changed

+186
-15
lines changed

carbon/app.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,7 @@ func (app *App) Start() (err error) {
256256
receiver.DropPast(uint32(conf.Tcp.DropPast.Value().Seconds())),
257257
receiver.DropLongerThan(conf.Tcp.DropLongerThan),
258258
receiver.ReadTimeout(uint32(conf.Tcp.ReadTimeout.Value().Seconds())),
259+
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
259260
)
260261

261262
if err != nil {
@@ -274,6 +275,7 @@ func (app *App) Start() (err error) {
274275
receiver.DropFuture(uint32(conf.Udp.DropFuture.Value().Seconds())),
275276
receiver.DropPast(uint32(conf.Udp.DropPast.Value().Seconds())),
276277
receiver.DropLongerThan(conf.Udp.DropLongerThan),
278+
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
277279
)
278280

279281
if err != nil {
@@ -292,6 +294,7 @@ func (app *App) Start() (err error) {
292294
receiver.DropFuture(uint32(conf.Pickle.DropFuture.Value().Seconds())),
293295
receiver.DropPast(uint32(conf.Pickle.DropPast.Value().Seconds())),
294296
receiver.DropLongerThan(conf.Pickle.DropLongerThan),
297+
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
295298
)
296299

297300
if err != nil {
@@ -309,6 +312,7 @@ func (app *App) Start() (err error) {
309312
receiver.DropFuture(uint32(conf.Grpc.DropFuture.Value().Seconds())),
310313
receiver.DropPast(uint32(conf.Grpc.DropPast.Value().Seconds())),
311314
receiver.DropLongerThan(conf.Grpc.DropLongerThan),
315+
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
312316
)
313317

314318
if err != nil {
@@ -326,6 +330,7 @@ func (app *App) Start() (err error) {
326330
receiver.DropFuture(uint32(conf.Prometheus.DropFuture.Value().Seconds())),
327331
receiver.DropPast(uint32(conf.Prometheus.DropPast.Value().Seconds())),
328332
receiver.DropLongerThan(conf.Prometheus.DropLongerThan),
333+
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
329334
)
330335

331336
if err != nil {
@@ -344,6 +349,7 @@ func (app *App) Start() (err error) {
344349
receiver.DropPast(uint32(conf.TelegrafHttpJson.DropPast.Value().Seconds())),
345350
receiver.DropLongerThan(conf.TelegrafHttpJson.DropLongerThan),
346351
receiver.ConcatChar(conf.TelegrafHttpJson.Concat),
352+
receiver.BlacklistRegex(conf.Common.BlacklistRegex),
347353
)
348354

349355
if err != nil {

carbon/config.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"bytes"
55
"fmt"
66
"io/ioutil"
7+
"regexp"
78
"strings"
89
"time"
910

@@ -25,6 +26,7 @@ type commonConfig struct {
2526
MetricEndpoint string `toml:"metric-endpoint"`
2627
MaxCPU int `toml:"max-cpu"`
2728
Enabled bool `toml:"enabled"`
29+
BlacklistRegex string `toml:"blacklist-regex"`
2830
}
2931

3032
type clickhouseConfig struct {
@@ -279,6 +281,12 @@ func ReadConfig(filename string, exactConfig bool) (*Config, error) {
279281
}
280282
}
281283

284+
if cfg.Common.BlacklistRegex != "" {
285+
if _, err := regexp.Compile(cfg.Common.BlacklistRegex); err != nil {
286+
return nil, fmt.Errorf("invalid regex in blacklist-regex option: %s", err.Error())
287+
}
288+
}
289+
282290
if cfg.Logging == nil {
283291
cfg.Logging = make([]zapwriter.Config, 0)
284292
}

receiver/base.go

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package receiver
33
import (
44
"fmt"
55
"net/http"
6+
"regexp"
67
"sort"
78
"sync"
89
"sync/atomic"
@@ -18,15 +19,16 @@ const droppedListSize = 1000
1819
type Base struct {
1920
stop.Struct
2021
stat struct {
21-
samplesReceived uint64 // atomic
22-
messagesReceived uint64 // atomic
23-
metricsReceived uint64 // atomic
24-
errors uint64 // atomic
25-
active int64 // atomic
26-
incompleteReceived uint64 // atomic
27-
futureDropped uint64 // atomic
28-
pastDropped uint64 // atomic
29-
tooLongDropped uint64 // atomic
22+
samplesReceived uint64 // atomic
23+
messagesReceived uint64 // atomic
24+
metricsReceived uint64 // atomic
25+
errors uint64 // atomic
26+
active int64 // atomic
27+
incompleteReceived uint64 // atomic
28+
futureDropped uint64 // atomic
29+
pastDropped uint64 // atomic
30+
tooLongDropped uint64 // atomic
31+
blacklistRegexDropped uint64 // atomic
3032
}
3133
droppedList [droppedListSize]string
3234
droppedListNext int
@@ -36,6 +38,7 @@ type Base struct {
3638
dropPastSeconds uint32
3739
dropTooLongLimit uint16
3840
readTimeoutSeconds uint32
41+
blacklistRegex *regexp.Regexp
3942
writeChan chan *RowBinary.WriteBuffer
4043
logger *zap.Logger
4144
Tags tags.TagConfig
@@ -85,6 +88,14 @@ func (base *Base) isDropMetricNameTooLong(name string) bool {
8588
return false
8689
}
8790

91+
func (base *Base) isMatchedByBlacklistRegex(name []byte) bool {
92+
if base.blacklistRegex != nil && base.blacklistRegex.Match(name) {
93+
atomic.AddUint64(&base.stat.blacklistRegexDropped, 1)
94+
return true
95+
}
96+
return false
97+
}
98+
8899
func (base *Base) DroppedHandler(w http.ResponseWriter, r *http.Request) {
89100
w.Header().Set("Content-Type", "text/plain")
90101

@@ -143,6 +154,8 @@ func (base *Base) SendStat(send func(metric string, value float64), fields ...st
143154
sendUint64Counter(send, f, &base.stat.pastDropped)
144155
case "tooLongDropped":
145156
sendUint64Counter(send, f, &base.stat.tooLongDropped)
157+
case "blacklistRegexDropped":
158+
sendUint64Counter(send, f, &base.stat.blacklistRegexDropped)
146159
case "errors":
147160
sendUint64Counter(send, f, &base.stat.errors)
148161
case "active":

receiver/grpc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ func (g *GRPC) Addr() net.Addr {
3232
}
3333

3434
func (g *GRPC) Stat(send func(metric string, value float64)) {
35-
g.SendStat(send, "metricsReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped")
35+
g.SendStat(send, "metricsReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped")
3636
}
3737

3838
// Listen bind port. Receive messages and send to out channel

receiver/pickle.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ func (rcv *Pickle) Addr() net.Addr {
3333

3434
func (rcv *Pickle) Stat(send func(metric string, value float64)) {
3535
rcv.SendStat(send, "metricsReceived", "messagesReceived", "errors", "active", "futureDropped", "pastDropped",
36-
"tooLongDropped")
36+
"tooLongDropped", "blacklistRegexDropped")
3737
}
3838

3939
func (rcv *Pickle) HandleConnection(conn net.Conn) {

receiver/plain.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,10 @@ func (base *Base) PlainParseLine(p []byte, now uint32, buf *tags.GraphiteBuf) ([
7070
i3--
7171
}
7272

73+
if base.isMatchedByBlacklistRegex(p[:i1]) {
74+
return nil, 0, 0, errors.New("metric name matched by blacklist regex: '" + unsafeString(p) + "'")
75+
}
76+
7377
value, err := strconv.ParseFloat(unsafeString(p[i1+1:i2]), 64)
7478
if err != nil || math.IsNaN(value) {
7579
return nil, 0, 0, errors.New("bad message: '" + unsafeString(p) + "'")

receiver/plain_test.go

Lines changed: 127 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package receiver
33
import (
44
"context"
55
"fmt"
6+
"regexp"
67
"sync"
78
"testing"
89
"time"
@@ -177,6 +178,8 @@ func TestPlainParseLine(t *testing.T) {
177178
{"metric.name;tag=value;k=v 42.15 1422642189\r\n", "metric.name?k=v&tag=value", 42.15, 1422642189},
178179
{"metric..name 42.15 -1\n", "metric.name", 42.15, now},
179180
{"cpu.loadavg;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg?env=test&host=host1", 21.4, 1422642189},
181+
{"cpu.loadavg~ 21.4 1422642189\n", "cpu.loadavg~", 21.4, 1422642189},
182+
{"cpu.loadavg~;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg~?env=test&host=host1", 21.4, 1422642189},
180183
}
181184

182185
base := &Base{}
@@ -202,4 +205,128 @@ func TestPlainParseLine(t *testing.T) {
202205
}
203206
}
204207
}
208+
209+
tableWithValidation := [](struct {
210+
b string
211+
name string
212+
value float64
213+
timestamp uint32
214+
}){
215+
{b: "42"},
216+
{b: ""},
217+
{b: "\n"},
218+
{b: "metric..name 42 \n"},
219+
{b: "metric..name 42"},
220+
{b: "metric.name 42 a1422642189\n"},
221+
{b: "metric.name 42a 1422642189\n"},
222+
{b: "metric.name NaN 1422642189\n"},
223+
{b: "metric.name 42 NaN\n"},
224+
{"metric.name -42.76 1422642189\n", "metric.name", -42.76, 1422642189},
225+
{"metric.name 42.15 1422642189\n", "metric.name", 42.15, 1422642189},
226+
{"metric..name 42.15 1422642189\n", "metric.name", 42.15, 1422642189},
227+
{"metric...name 42.15 1422642189\n", "metric.name", 42.15, 1422642189},
228+
{"metric.name 42.15 1422642189\r\n", "metric.name", 42.15, 1422642189},
229+
{"metric.name;tag=value;k=v 42.15 1422642189\r\n", "metric.name?k=v&tag=value", 42.15, 1422642189},
230+
{"metric..name 42.15 -1\n", "metric.name", 42.15, now},
231+
{"cpu.loadavg;env=test2;host=host1;env=test 21.4 1422642189\n", "cpu.loadavg?env=test&host=host1", 21.4, 1422642189},
232+
233+
// Additional test cases for validation
234+
// Test invalid characters in metric names
235+
{b: "metric@name 42.15 1422642189\n"},
236+
{b: "metric#name 42.15 1422642189\n"},
237+
{b: "metric$name 42.15 1422642189\n"},
238+
{b: "metric%name 42.15 1422642189\n"},
239+
{b: "metric&name 42.15 1422642189\n"},
240+
{b: "metric*name 42.15 1422642189\n"},
241+
{b: "metric!name 42.15 1422642189\n"},
242+
{b: "metric name 42.15 1422642189\n"}, // space in metric name
243+
{b: "metric\tname 42.15 1422642189\n"}, // tab in metric name
244+
{b: "metric[name] 42.15 1422642189\n"},
245+
{b: "metric{name} 42.15 1422642189\n"},
246+
{b: "metric(name) 42.15 1422642189\n"},
247+
{b: "metric/name 42.15 1422642189\n"},
248+
{b: "metric\\name 42.15 1422642189\n"},
249+
{b: "metric|name 42.15 1422642189\n"},
250+
{b: "metric?name 42.15 1422642189\n"},
251+
{b: "metric<name> 42.15 1422642189\n"},
252+
{b: "metric'name' 42.15 1422642189\n"},
253+
{b: "metric\"name\" 42.15 1422642189\n"},
254+
255+
// Test valid characters that should pass
256+
{"metric-name 42.15 1422642189\n", "metric-name", 42.15, 1422642189},
257+
{"metric_name 42.15 1422642189\n", "metric_name", 42.15, 1422642189},
258+
{"metric:name 42.15 1422642189\n", "metric:name", 42.15, 1422642189},
259+
{"metric.sub.name 42.15 1422642189\n", "metric.sub.name", 42.15, 1422642189},
260+
{"metric-123_test:data 42.15 1422642189\n", "metric-123_test:data", 42.15, 1422642189},
261+
262+
// Test invalid characters in tags
263+
{b: "metric.name;tag@=value 42.15 1422642189\n"},
264+
{b: "metric.name;tag=val@ue 42.15 1422642189\n"},
265+
{b: "metric.name;t ag=value 42.15 1422642189\n"},
266+
{b: "metric.name;tag=val ue 42.15 1422642189\n"},
267+
{b: "metric.name;tag#key=value 42.15 1422642189\n"},
268+
{b: "metric.name;tag=value! 42.15 1422642189\n"},
269+
{b: "metric.name;tag=value;key=val*ue 42.15 1422642189\n"},
270+
{b: "metric.name;tag=value;k ey=value 42.15 1422642189\n"},
271+
{b: "metric.name;tag=value;key=val\tue 42.15 1422642189\n"},
272+
{b: "metric.name;tag=value;key=val\nue 42.15 1422642189\n"},
273+
274+
// Test valid tags that should pass
275+
{"metric.name;env=prod 42.15 1422642189\n", "metric.name?env=prod", 42.15, 1422642189},
276+
{"metric.name;env=prod;region=us-east-1 42.15 1422642189\n", "metric.name?env=prod&region=us-east-1", 42.15, 1422642189},
277+
{"metric.name;tag-name=tag-value 42.15 1422642189\n", "metric.name?tag-name=tag-value", 42.15, 1422642189},
278+
{"metric.name;tag_name=tag_value 42.15 1422642189\n", "metric.name?tag_name=tag_value", 42.15, 1422642189},
279+
{"metric.name;tag:name=tag:value 42.15 1422642189\n", "metric.name?tag%3Aname=tag%3Avalue", 42.15, 1422642189},
280+
{"metric.name;tag.name=tag.value 42.15 1422642189\n", "metric.name?tag.name=tag.value", 42.15, 1422642189},
281+
282+
// Test edge cases with multiple invalid characters
283+
{b: "metric@#$%name 42.15 1422642189\n"},
284+
{b: "metric.name;tag@#=value$% 42.15 1422642189\n"},
285+
{b: "met!ric.na@me;ta#g=val$ue 42.15 1422642189\n"},
286+
287+
// Test unicode characters (should fail validation)
288+
{b: "metric.名前 42.15 1422642189\n"},
289+
{b: "metric.name;tag=値 42.15 1422642189\n"},
290+
{b: "metric.name;标签=value 42.15 1422642189\n"},
291+
{b: "метрика.name 42.15 1422642189\n"},
292+
293+
// Test empty tag keys/values
294+
{b: "metric.name;=value 42.15 1422642189\n"},
295+
{b: "metric.name;= 42.15 1422642189\n"},
296+
297+
// Test metrics with numbers
298+
{"metric123 42.15 1422642189\n", "metric123", 42.15, 1422642189},
299+
{"123metric 42.15 1422642189\n", "123metric", 42.15, 1422642189},
300+
{"123 42.15 1422642189\n", "123", 42.15, 1422642189},
301+
302+
// Test metrics with only valid special characters
303+
{"metric-_.:name 42.15 1422642189\n", "metric-_.:name", 42.15, 1422642189},
304+
{"metric.name;tag-_.:key=tag-_.:value 42.15 1422642189\n", "metric.name?tag-_.%3Akey=tag-_.%3Avalue", 42.15, 1422642189},
305+
306+
// Additional tests for colon encoding
307+
{"host:port:metric 42.15 1422642189\n", "host:port:metric", 42.15, 1422642189},
308+
{"metric.name;service:port=web:8080 42.15 1422642189\n", "metric.name?service%3Aport=web%3A8080", 42.15, 1422642189},
309+
{"app:service:metric;env=prod:primary 42.15 1422642189\n", "app:service:metric?env=prod%3Aprimary", 42.15, 1422642189},
310+
}
311+
312+
baseWithValidation := &Base{blacklistRegex: regexp.MustCompile(`[^a-zA-Z0-9.;\-_:=]{1}`)}
313+
for _, p := range tableWithValidation {
314+
name, value, timestamp, err := baseWithValidation.PlainParseLine([]byte(p.b), now, &tagBuf)
315+
if p.name == "" {
316+
// expected error
317+
if err == nil {
318+
t.Fatal("error expected")
319+
}
320+
} else {
321+
if string(name) != p.name {
322+
t.Fatalf("%#v != %#v", string(name), p.name)
323+
}
324+
if value != p.value {
325+
t.Fatalf("%#v != %#v", value, p.value)
326+
}
327+
if timestamp != p.timestamp {
328+
t.Fatalf("%d != %d", timestamp, p.timestamp)
329+
}
330+
}
331+
}
205332
}

receiver/prometheus.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -199,7 +199,7 @@ func (rcv *PrometheusRemoteWrite) Addr() net.Addr {
199199
}
200200

201201
func (rcv *PrometheusRemoteWrite) Stat(send func(metric string, value float64)) {
202-
rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped")
202+
rcv.SendStat(send, "samplesReceived", "errors", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped")
203203
}
204204

205205
// Listen bind port. Receive messages and send to out channel

receiver/receiver.go

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"net"
66
"net/http"
77
"net/url"
8+
"regexp"
89
"strings"
910

1011
"github.com/lomik/carbon-clickhouse/helper/RowBinary"
@@ -90,6 +91,18 @@ func ConcatChar(concat string) Option {
9091
}
9192
}
9293

94+
// BlacklistRegex creates option for New constructor
95+
func BlacklistRegex(regex string) Option {
96+
return func(r interface{}) error {
97+
if t, ok := r.(*Base); ok {
98+
if regex != "" {
99+
t.blacklistRegex = regexp.MustCompile(regex)
100+
}
101+
}
102+
return nil
103+
}
104+
}
105+
93106
// New creates udp, tcp, pickle receiver
94107
func New(dsn string, config tags.TagConfig, opts ...Option) (Receiver, error) {
95108
u, err := url.Parse(dsn)

receiver/tcp.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ func (rcv *TCP) Addr() net.Addr {
2828
}
2929

3030
func (rcv *TCP) Stat(send func(metric string, value float64)) {
31-
rcv.SendStat(send, "metricsReceived", "errors", "active", "futureDropped", "pastDropped", "tooLongDropped")
31+
rcv.SendStat(send, "metricsReceived", "errors", "active", "futureDropped", "pastDropped", "tooLongDropped", "blacklistRegexDropped")
3232
}
3333

3434
func (rcv *TCP) HandleConnection(conn net.Conn) {

0 commit comments

Comments
 (0)