diff --git a/etc/river.toml b/etc/river.toml index a5db6b7a..72f6e815 100644 --- a/etc/river.toml +++ b/etc/river.toml @@ -16,6 +16,9 @@ es_pass = "" # Path to store data, like master.info, if not set or empty, # we must use this to support breakpoint resume syncing. # TODO: support other storage, like etcd. +# Beta feature: Store to elasticsearch +# data_dir = "es:http://username:password@hostname:9200/index/type?id=1001" +# recommend id = server_id data_dir = "./var" # Inner Http status address diff --git a/river/elasticsearch_masterinfo.go b/river/elasticsearch_masterinfo.go new file mode 100644 index 00000000..8c210353 --- /dev/null +++ b/river/elasticsearch_masterinfo.go @@ -0,0 +1,175 @@ +package river + +import ( + "bytes" + "encoding/json" + "fmt" + "net/http" + "net/url" + "regexp" + "strconv" + "strings" + "sync" + + "github.com/juju/errors" + "github.com/siddontang/go-mysql-elasticsearch/elastic" + "github.com/siddontang/go-mysql/mysql" + "gopkg.in/birkirb/loggers.v1/log" +) + +type elasticsearchMasterInfo struct { + sync.RWMutex + + es *elastic.Client + + Name string + Pos uint32 + index string + docType string + id string +} + +func loadElasticsearchMasterInfo(dataPath string) (masterInfo, error) { + var m elasticsearchMasterInfo + + if !strings.HasPrefix(dataPath, "es:") { + return &m, errors.Errorf("error elasticsearch prefix: %s", dataPath) + } + esURL, err := url.Parse(dataPath[3:]) + if err != nil { + return &m, err + } + + cfg := new(elastic.ClientConfig) + cfg.Addr = esURL.Host + if esURL.User != nil { + cfg.User = esURL.User.Username() + cfg.Password, _ = esURL.User.Password() + } + cfg.Https = esURL.Scheme == "https" + m.es = elastic.NewClient(cfg) + + paths := strings.Split(esURL.Path, "/") + m.index = paths[1] + m.docType = paths[2] + + m.id = "1" + id := esURL.Query().Get("id") + if id != "" { + m.id = id + } + + err = m.loadMasterInfo() + if err != nil { + return nil, err + } + + return &m, nil +} + +func (m *elasticsearchMasterInfo) Save(pos mysql.Position) error { + log.Infof("save position %s", pos) + + m.Lock() + defer m.Unlock() + + m.Name = pos.Name + m.Pos = pos.Pos + doc := map[string]interface{}{ + "name": m.Name, + "pos": m.Pos, + } + err := m.es.Update(m.index, m.docType, m.id, doc) + if err != nil { + log.Errorf("ES MasterInfo save error: %s", err) + return err + } + return nil +} + +func (m *elasticsearchMasterInfo) Position() mysql.Position { + m.RLock() + defer m.RUnlock() + + return mysql.Position{ + Name: m.Name, + Pos: m.Pos, + } +} + +func (m *elasticsearchMasterInfo) Close() error { + pos := m.Position() + return m.Save(pos) +} + +func (m *elasticsearchMasterInfo) loadMasterInfo() error { + mapping, err := m.es.GetMapping(m.index, m.docType) + if err != nil || mapping.Code == http.StatusNotFound { + err = m.createMapping() + } + + info, err := m.es.Get(m.index, m.docType, m.id) + if err == nil && (info.Code == http.StatusOK || info.Code == http.StatusNotFound) { + item := info.ResponseItem + if item.Found { + source := item.Source + m.Name = source["name"].(string) + m.Pos = uint32(source["pos"].(float64)) + } + return nil + } + return errors.Wrap(err, errors.New("loadMasterInfo error")) +} + +func (m *elasticsearchMasterInfo) createMapping() error { + version, _ := m.getEsVersion() + nameType := "keyword" + if version < 5 { + nameType = "string" + } + mapping := map[string]interface{}{ + "properties": map[string]interface{}{ + "name": map[string]interface{}{ + "type": nameType, + }, + "pos": map[string]interface{}{ + "type": "long", + }, + }, + } + + return m.es.CreateMapping(m.index, m.docType, mapping) +} + +func (m *elasticsearchMasterInfo) getEsVersion() (int64, error) { + reqURL := fmt.Sprintf("%s://%s/", m.es.Protocol, m.es.Addr) + resp, err := m.es.DoRequest("GET", reqURL, bytes.NewBuffer(nil)) + if err != nil { + return 0, err + } + defer resp.Body.Close() + + var esinfo EsinfoResponse + err = json.NewDecoder(resp.Body).Decode(&esinfo) + + version := esinfo.Version.Number + if version == "" { + return 0, errors.Errorf("unknow version") + } + v := regexp.MustCompile("^\\d+").FindString(version) + mainVersion, err := strconv.ParseInt(v, 10, 32) + if err == nil { + return mainVersion, nil + } + return 0, errors.Errorf("unknow version") +} + +type EsinfoResponse struct { + Name string `json:"name"` + ClusterName string `json:"cluster_name"` + Version EsVersionResponse `json:"version"` +} + +type EsVersionResponse struct { + Number string +} diff --git a/river/master.go b/river/file_masterinfo.go similarity index 84% rename from river/master.go rename to river/file_masterinfo.go index fe10b577..bc39a7e9 100644 --- a/river/master.go +++ b/river/file_masterinfo.go @@ -14,7 +14,7 @@ import ( "gopkg.in/birkirb/loggers.v1/log" ) -type masterInfo struct { +type fileMasterInfo struct { sync.RWMutex Name string `toml:"bin_name"` @@ -24,8 +24,8 @@ type masterInfo struct { lastSaveTime time.Time } -func loadMasterInfo(dataDir string) (*masterInfo, error) { - var m masterInfo +func loadFileMasterInfo(dataDir string) (masterInfo, error) { + var m fileMasterInfo if len(dataDir) == 0 { return &m, nil @@ -50,7 +50,7 @@ func loadMasterInfo(dataDir string) (*masterInfo, error) { return &m, errors.Trace(err) } -func (m *masterInfo) Save(pos mysql.Position) error { +func (m *fileMasterInfo) Save(pos mysql.Position) error { log.Infof("save position %s", pos) m.Lock() @@ -82,7 +82,7 @@ func (m *masterInfo) Save(pos mysql.Position) error { return errors.Trace(err) } -func (m *masterInfo) Position() mysql.Position { +func (m *fileMasterInfo) Position() mysql.Position { m.RLock() defer m.RUnlock() @@ -92,7 +92,7 @@ func (m *masterInfo) Position() mysql.Position { } } -func (m *masterInfo) Close() error { +func (m *fileMasterInfo) Close() error { pos := m.Position() return m.Save(pos) diff --git a/river/masterinfo.go b/river/masterinfo.go new file mode 100644 index 00000000..ec8d396e --- /dev/null +++ b/river/masterinfo.go @@ -0,0 +1,21 @@ +package river + +import ( + "strings" + + "github.com/siddontang/go-mysql/mysql" +) + +type masterInfo interface { + Save(pos mysql.Position) error + Position() mysql.Position + Close() error +} + +func loadMasterInfo(dataPath string) (masterInfo, error) { + if strings.HasPrefix(dataPath, "es:") { + return loadElasticsearchMasterInfo(dataPath) + } + + return loadFileMasterInfo(dataPath) +} diff --git a/river/river.go b/river/river.go index 24338960..1ddd12bd 100644 --- a/river/river.go +++ b/river/river.go @@ -34,7 +34,7 @@ type River struct { st *stat - master *masterInfo + master masterInfo syncCh chan interface{} }