diff --git a/drainer/checkpoint/checkpoint.go b/drainer/checkpoint/checkpoint.go index c33dd2f48..57093f7a7 100644 --- a/drainer/checkpoint/checkpoint.go +++ b/drainer/checkpoint/checkpoint.go @@ -54,6 +54,12 @@ func NewCheckPoint(cfg *Config) (CheckPoint, error) { cp, err = newMysql(cfg) case "file": cp, err = NewFile(cfg.InitialCommitTS, cfg.CheckPointFile) + case "plugin": + if cfg.Db != nil { + cp, err = newMysql(cfg) + } else { + cp, err = NewFile(cfg.InitialCommitTS, cfg.CheckPointFile) + } default: err = errors.Errorf("unsupported checkpoint type %s", cfg.CheckpointType) } diff --git a/drainer/config.go b/drainer/config.go index af8eb9eb3..47d084478 100644 --- a/drainer/config.go +++ b/drainer/config.go @@ -85,10 +85,13 @@ type SyncerConfig struct { // for backward compatibility. // disable* is keep for backward compatibility. // if both setted, the disable one take affect. - DisableCausalityFlag *bool `toml:"-" json:"disable-detect-flag"` - EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"` - DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"` - EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"` + DisableCausalityFlag *bool `toml:"-" json:"disable-detect-flag"` + EnableCausalityFlag *bool `toml:"-" json:"enable-detect-flag"` + DisableCausalityFile *bool `toml:"disable-detect" json:"disable-detect"` + EnableCausalityFile *bool `toml:"enable-detect" json:"enable-detect"` + PluginPath string `toml:"plugin-path" json:"plugin-path"` + PluginName string `toml:"plugin-name" json:"plugin-name"` + PluginCfgFile string `toml:"plugin-cfg-file" json:"plugin-cfg-file"` } // EnableDispatch return true if enable dispatch. @@ -216,6 +219,9 @@ func NewConfig() *Config { fs.IntVar(&maxBinlogItemCount, "cache-binlog-count", defaultBinlogItemCount, "blurry count of binlogs in cache, limit cache size") fs.IntVar(&cfg.SyncedCheckTime, "synced-check-time", defaultSyncedCheckTime, "if we can't detect new binlog after many minute, we think the all binlog is all synced") fs.StringVar(new(string), "log-rotate", "", "DEPRECATED") + fs.StringVar(&cfg.SyncerCfg.PluginName, "plugin-name", "", "syncer plugin name") + fs.StringVar(&cfg.SyncerCfg.PluginPath, "plugin-path", "", "syncer plugin path") + fs.StringVar(&cfg.SyncerCfg.PluginCfgFile, "plugin-cfg-path", "", "syncer plugin's config file") return cfg } diff --git a/drainer/sync/plugin_demo.go b/drainer/sync/plugin_demo.go new file mode 100644 index 000000000..e5034a788 --- /dev/null +++ b/drainer/sync/plugin_demo.go @@ -0,0 +1,56 @@ +package sync + +import ( + "fmt" + + "github.com/pingcap/log" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/drainer/relay" + "github.com/pingcap/tidb-binlog/drainer/translator" + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" +) + +//SyncerDemo is a syncer demo +type SyncerDemo struct { + *baseSyncer +} + +//Sync is the method that interface must implement +func (sd *SyncerDemo) Sync(item *Item) error { + //demo + log.Info("item", zap.String("%s", fmt.Sprintf("%v", item))) + sd.success <- item + return nil +} + +//Close is the method that interface must implement +func (sd *SyncerDemo) Close() error { + return nil +} + +//SetSafeMode is the method that interface must implement +func (sd *SyncerDemo) SetSafeMode(mode bool) bool { + return false +} + +//NewSyncerDemo is a syncer demo +func NewSyncerDemo( + cfg *DBConfig, + file string, + tableInfoGetter translator.TableInfoGetter, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, + relayer relay.Relayer, + info *loopbacksync.LoopBackSync, + enableDispatch bool, + enableCausility bool, +) (Syncer, error) { + log.Info("call NewSyncerDemo()") + executor := &SyncerDemo{} + executor.baseSyncer = newBaseSyncer(tableInfoGetter) + return executor, nil +} diff --git a/drainer/syncer.go b/drainer/syncer.go index 12b4255c4..7d54e4ae5 100644 --- a/drainer/syncer.go +++ b/drainer/syncer.go @@ -20,6 +20,7 @@ import ( "time" "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/drainer/syncplg" "github.com/pingcap/tidb-binlog/pkg/loader" "github.com/pingcap/errors" @@ -120,6 +121,26 @@ func createDSyncer(cfg *SyncerConfig, schema *Schema, info *loopbacksync.LoopBac // only use for test case "_intercept": dsyncer = newInterceptSyncer() + case "plugin": + if len(cfg.PluginName) == 0 || len(cfg.PluginPath) == 0 { + return nil, errors.Errorf("plugin-name or plugin-path is incorrect") + } + newSyncer, err := syncplg.LoadPlugin(cfg.PluginPath, cfg.PluginName) + if err != nil { + return nil, errors.Annotate(err, "fail to load plugin dsyncer") + } + + var relayer relay.Relayer + if cfg.Relay.IsEnabled() { + if relayer, err = relay.NewRelayer(cfg.Relay.LogDir, cfg.Relay.MaxFileSize, schema); err != nil { + return nil, errors.Annotate(err, "fail to create relayer") + } + } + + dsyncer, err = newSyncer(cfg.To, cfg.PluginCfgFile, schema, cfg.WorkerCount, cfg.TxnBatch, queryHistogramVec, cfg.StrSQLMode, cfg.DestDBType, relayer, info, cfg.EnableDispatch(), cfg.EnableCausality()) + if err != nil { + return nil, errors.Annotate(err, "fail to create plugin dsyncer") + } default: return nil, errors.Errorf("unknown DestDBType: %s", cfg.DestDBType) } diff --git a/drainer/syncplg/plugin.go b/drainer/syncplg/plugin.go new file mode 100644 index 000000000..b08e9706e --- /dev/null +++ b/drainer/syncplg/plugin.go @@ -0,0 +1,76 @@ +package syncplg + +import ( + "errors" + "fmt" + "plugin" + + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/drainer/relay" + "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/drainer/translator" + "github.com/prometheus/client_golang/prometheus" +) + +const ( + //NewPlugin is the name of exported function by syncer plugin + NewPlugin = "NewPluginFactory" +) + +//FactoryInterface is interface of Factory +type FactoryInterface interface { + NewSyncerPlugin( + cfg *sync.DBConfig, + file string, + tableInfoGetter translator.TableInfoGetter, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, + relayer relay.Relayer, + info *loopbacksync.LoopBackSync, + enableDispatch bool, + enableCausility bool, + ) (sync.Syncer, error) +} + +//NewSyncerFunc is a function type which syncer plugin must implement +type NewSyncerFunc func( + cfg *sync.DBConfig, + file string, + tableInfoGetter translator.TableInfoGetter, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, + relayer relay.Relayer, + info *loopbacksync.LoopBackSync, + enableDispatch bool, + enableCausility bool, +) (sync.Syncer, error) + +//LoadPlugin load syncer plugin +func LoadPlugin(path, name string) (NewSyncerFunc, error) { + fp := path + "/" + name + p, err := plugin.Open(fp) + if err != nil { + return nil, fmt.Errorf("faile to Open %s . err: %s", fp, err.Error()) + } + + sym, err := p.Lookup(NewPlugin) + if err != nil { + return nil, err + } + newFactory, ok := sym.(func() interface{}) + if !ok { + return nil, errors.New("function type is incorrect") + } + fac := newFactory() + plg, ok := fac.(FactoryInterface) + if !ok { + return nil, errors.New("not implement FactoryInterface") + } + return plg.NewSyncerPlugin, nil +} diff --git a/drainer/syncplg/syncerdemo/Makefile b/drainer/syncplg/syncerdemo/Makefile new file mode 100644 index 000000000..e2597d1a2 --- /dev/null +++ b/drainer/syncplg/syncerdemo/Makefile @@ -0,0 +1,4 @@ +plugin: + go build -o syncerdemo.so -buildmode=plugin syncerdemo.go +clean: + rm -rf syncerdemo.so \ No newline at end of file diff --git a/drainer/syncplg/syncerdemo/README.md b/drainer/syncplg/syncerdemo/README.md new file mode 100644 index 000000000..323e2aab1 --- /dev/null +++ b/drainer/syncplg/syncerdemo/README.md @@ -0,0 +1,84 @@ +### Drainer支持插件的用法 +#### 1.1 为什么支持插件 +目前Drainer已经支持file, MySQL, TiDB, Kafka等多种通用下游组件。但是在实际使用中,一些用户所在的公司对通用组件进行了定制化或者使用了公司内部的组件,此时Drainer便无法满足要求。 + +针对上述问题,Drainer提供了插件的形式,供用户针对特定的下游组件进行定制化开发,从而满足公司内部业务场景的需求。 +#### 1.2 插件原理 +Drainer会调用`Syncer`接口,将解析出的Binlog同步到各个下游组件中。用户只需要定制化实现`Syncer`接口即可。 + +定制化`Syncer`时候,需要实现`Syncer`的如下接口: + +``` +// Syncer sync binlog item to downstream +type Syncer interface { + // Sync the binlog item to downstream + Sync(item *Item) error + // will be close if Close normally or meet error, call Error() to check it + Successes() <-chan *Item + // Return not nil if fail to sync data to downstream or nil if closed normally + Error() <-chan error + // Close the Syncer, no more item can be added by `Sync` + // will drain all items and return nil if all successfully sync into downstream + Close() error + // SetSafeMode make the Syncer to use safe mode or not. If no need to handle, it should return false + SetSafeMode(mode bool) bool +} +``` + +用户需要将上述接口进行实现,并编译成动态库(*.so)形式,通过Drainer的启动参数进行加载。 + +我们已经实现了插件框架,用户在定制化插件时只需要关注各个`Syncer`接口的实现即可。 + +#### 1.3 Demo介绍 +为了向用户展示Drainer插件的用法,我们在源码中编写了一个Demo。Demo涉及的文件如下: + +``` +# 需要实现 Syncer 接口函数,主要是业务逻辑相关的内容 +./drainer/sync/plugin_demo.go +# 胶水代码,用来将 业务逻辑代码与插件代码进行耦合,基本不需要用户修改 +./drainer/syncplg/syncerdemo/syncerdemo.go +# 用来将 业务代码 编译成 插件(动态库),不需要用户修改 +./drainer/syncplg/syncerdemo/Makefile +``` + +编写一个插件的**步骤如下**: + +- 步骤一:plugin_demo.go文件实现各个接口 + +该文件中主要是需要用户实现的 `Syncer`接口的各个函数,例如例子中,我们只对binlog进行简单打印,核心代码集中在 `Sync(item *Item)`函数中,如下所示: + +``` +func (sd *SyncerDemo) Sync(item *Item) error { + //demo + log.Info("item", zap.String("%s", fmt.Sprintf("%v", item))) + sd.success <- item + return nil +} +``` +- 步骤二:编译 + +``` +cd ./drainer/syncplg/syncerdemo/ +make +``` + +如果没有报错,会生成插件,如下: + +``` +syncerdemo.so +``` + +- 步骤三:配置启动参数 + +``` +# 指定Syncer使用插件 +dest-db-type = "plugin" +# 需要加载的插件的名称 +plugin-name = "syncerdemo.so" +# 插件所在路径 +plugin-path = "./drainer/syncplg/syncerdemo/" +# 插件内部可以用的配置文件,如果使用不到,可以不配置 +plugin-cfg-path = "/drainer/syncplg/syncerdemo/plgcfg.toml" +``` + +加载插件后,可以通过Drainer的运行日志(`./conf/drainer.log`)来查看参数是否已经正常加载;也可以用来查看插件是否加载成功。 \ No newline at end of file diff --git a/drainer/syncplg/syncerdemo/syncerdemo.go b/drainer/syncplg/syncerdemo/syncerdemo.go new file mode 100644 index 000000000..b6c21fafe --- /dev/null +++ b/drainer/syncplg/syncerdemo/syncerdemo.go @@ -0,0 +1,41 @@ +package main + +import ( + "github.com/pingcap/log" + "github.com/pingcap/tidb-binlog/drainer/loopbacksync" + "github.com/pingcap/tidb-binlog/drainer/relay" + "github.com/pingcap/tidb-binlog/drainer/sync" + "github.com/pingcap/tidb-binlog/drainer/translator" + "github.com/prometheus/client_golang/prometheus" +) + +//PluginFactory is the Factory struct +type PluginFactory struct{} + +//NewPluginFactory is factory function of plugin +func NewPluginFactory() interface{} { + log.Info("call NewPluginFactory") + return PluginFactory{} +} + +//NewSyncerPlugin return A syncer instance which implemented interface of sync.Syncer +func (pf PluginFactory) NewSyncerPlugin( + cfg *sync.DBConfig, + file string, + tableInfoGetter translator.TableInfoGetter, + worker int, + batchSize int, + queryHistogramVec *prometheus.HistogramVec, + sqlMode *string, + destDBType string, + relayer relay.Relayer, + info *loopbacksync.LoopBackSync, + enableDispatch bool, + enableCausility bool, +) (sync.Syncer, error) { + return sync.NewSyncerDemo(cfg, file, tableInfoGetter, worker, batchSize, queryHistogramVec, sqlMode, + destDBType, relayer, info, enableDispatch, enableCausility) +} + +var _ PluginFactory +var _ = NewPluginFactory() diff --git a/drainer/util.go b/drainer/util.go index 09c52d7b6..1f693437c 100644 --- a/drainer/util.go +++ b/drainer/util.go @@ -106,7 +106,7 @@ func GenCheckPointCfg(cfg *Config, id uint64) (*checkpoint.Config, error) { } case "": switch cfg.SyncerCfg.DestDBType { - case "mysql", "tidb": + case "mysql", "tidb", "plugin": checkpointCfg.CheckpointType = cfg.SyncerCfg.DestDBType checkpointCfg.Db = &checkpoint.DBConfig{ Host: cfg.SyncerCfg.To.Host,