From a5a164f23b185c275c36ed78d5462c51b6f6a9a9 Mon Sep 17 00:00:00 2001 From: henter Date: Fri, 29 Sep 2017 17:25:48 +0800 Subject: [PATCH 1/6] support TYPE_DATETIME to es date (RFC3339 UTC) --- river/sync.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/river/sync.go b/river/sync.go index 8d556b1b..41f0793a 100644 --- a/river/sync.go +++ b/river/sync.go @@ -311,6 +311,12 @@ func (r *River) makeReqColumnData(col *schema.TableColumn, value interface{}) in if err == nil && f != nil { return f } + case schema.TYPE_DATETIME: + switch v := value.(type) { + case string: + vt, _ := time.Parse("2006-01-02 15:04:05", string(v)) + return vt.Format(time.RFC3339) + } } return value From de56de28352fc5114450f96c275e3c457a0e6a0a Mon Sep 17 00:00:00 2001 From: henter Date: Fri, 29 Sep 2017 23:33:53 +0800 Subject: [PATCH 2/6] timezone --- river/sync.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/river/sync.go b/river/sync.go index 41f0793a..4e868279 100644 --- a/river/sync.go +++ b/river/sync.go @@ -311,12 +311,13 @@ func (r *River) makeReqColumnData(col *schema.TableColumn, value interface{}) in if err == nil && f != nil { return f } - case schema.TYPE_DATETIME: - switch v := value.(type) { - case string: - vt, _ := time.Parse("2006-01-02 15:04:05", string(v)) - return vt.Format(time.RFC3339) - } + case schema.TYPE_DATETIME: + switch v := value.(type) { + case string: + vt, _ := time.Parse("2006-01-02 15:04:05", string(v)) + timezone := time.Now().Local().Format("-0700") + return vt.Format("2006-01-02T15:04:05") + timezone + } } return value From 8c85a48cdd611d737dc8c18408c4e9c179ea1688 Mon Sep 17 00:00:00 2001 From: henter Date: Sat, 30 Sep 2017 11:12:03 +0800 Subject: [PATCH 3/6] fix --- river/sync.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/river/sync.go b/river/sync.go index 4e868279..01c0c586 100644 --- a/river/sync.go +++ b/river/sync.go @@ -314,9 +314,8 @@ func (r *River) makeReqColumnData(col *schema.TableColumn, value interface{}) in case schema.TYPE_DATETIME: switch v := value.(type) { case string: - vt, _ := time.Parse("2006-01-02 15:04:05", string(v)) - timezone := time.Now().Local().Format("-0700") - return vt.Format("2006-01-02T15:04:05") + timezone + vt, _ := time.ParseInLocation(mysql.TimeFormat, string(v), time.Local) + return vt.Format(time.RFC3339) } } From e94713b7b770a13c9b4fafcf979b468ea6874bcd Mon Sep 17 00:00:00 2001 From: henter Date: Sat, 30 Sep 2017 16:48:41 +0800 Subject: [PATCH 4/6] es mapping test, type datetime --- elastic/client.go | 49 +++++++++++++++++++++++++++++++++++++++++++++ river/river_test.go | 20 ++++++++++++++++++ 2 files changed, 69 insertions(+) diff --git a/elastic/client.go b/elastic/client.go index 659a5871..f18c730d 100644 --- a/elastic/client.go +++ b/elastic/client.go @@ -144,6 +144,16 @@ type BulkResponseItem struct { Found bool `json:"found"` } +type MappingResponse struct { + Code int + Properties map[string]*MappingType `json:"properties"` +} + +type MappingType struct { + Type string `json:"type"` + Fields interface{} `json:"fields"` +} + func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) { req, err := http.NewRequest(method, url, body) if err != nil { @@ -249,6 +259,45 @@ func (c *Client) CreateMapping(index string, docType string, mapping map[string] return errors.Trace(err) } +func (c *Client) GetMapping(index string, docType string) (*MappingResponse, error){ + reqUrl := fmt.Sprintf("http://%s/%s/%s/_mapping", c.Addr, + url.QueryEscape(index), + url.QueryEscape(docType)) + buf := bytes.NewBuffer(nil) + resp, err := c.DoRequest("GET", reqUrl, buf) + + if err != nil { + return nil, errors.Trace(err) + } + + defer resp.Body.Close() + + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, errors.Trace(err) + } + + var m map[string]map[string]map[string]map[string]map[string]interface{} + err = json.Unmarshal(data, &m) + if err != nil { + return nil, errors.Trace(err) + } + + properties := m[index]["mappings"][docType]["properties"] + ret := new(MappingResponse) + ret.Properties = make(map[string]*MappingType) + ret.Code = resp.StatusCode + + var mt MappingType + for k, v := range properties { + b, _ := json.Marshal(v) + err = json.Unmarshal(b, &mt) + ret.Properties[k] = &MappingType{mt.Type, mt.Fields} + } + + return ret, errors.Trace(err) +} + func (c *Client) DeleteIndex(index string) error { reqUrl := fmt.Sprintf("http://%s/%s", c.Addr, url.QueryEscape(index)) diff --git a/river/river_test.go b/river/river_test.go index 98e11636..240948ad 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -10,6 +10,7 @@ import ( . "github.com/pingcap/check" "github.com/siddontang/go-mysql-elasticsearch/elastic" "github.com/siddontang/go-mysql/client" + "github.com/siddontang/go-mysql/mysql" ) var my_addr = flag.String("my_addr", "127.0.0.1:3306", "MySQL addr") @@ -42,6 +43,7 @@ func (s *riverTestSuite) SetUpSuite(c *C) { tenum ENUM("e1", "e2", "e3"), tset SET("a", "b", "c"), tbit BIT(1) default 1, + tdatetime DATETIME DEFAULT NULL, PRIMARY KEY(id)) ENGINE=INNODB; ` @@ -214,6 +216,9 @@ func (s *riverTestSuite) testPrepareData(c *C) { table := fmt.Sprintf("test_river_%04d", i) s.testExecute(c, fmt.Sprintf("INSERT INTO %s (id, title, content, tenum, tset) VALUES (?, ?, ?, ?, ?)", table), 5+i, "abc", "hello", "e1", "a,b,c") } + + datetime := time.Now().Format(mysql.TimeFormat) + s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime) VALUES (?, ?, ?, ?, ?, ?)", 5, "first", "hello go 1", "e1", "a,b", datetime) } func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response { @@ -226,6 +231,17 @@ func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response { return r } +func (s *riverTestSuite) testElasticMapping(c *C) *elastic.MappingResponse { + index := "river" + docType := "river" + + r, err := s.r.es.GetMapping(index, docType) + c.Assert(err, IsNil) + + c.Assert(r.Properties["tdatetime"].Type, Equals, "date") + return r +} + func testWaitSyncDone(c *C, r *River) { <-r.canal.WaitDumpDone() @@ -250,6 +266,10 @@ func (s *riverTestSuite) TestRiver(c *C) { testWaitSyncDone(c, s.r) + var mr *elastic.MappingResponse + mr = s.testElasticMapping(c) + c.Assert(mr.Code, Equals, 200) + var r *elastic.Response r = s.testElasticGet(c, "1") c.Assert(r.Found, Equals, true) From 241db0e656a073050d327026ec67832b83048b72 Mon Sep 17 00:00:00 2001 From: henter Date: Sat, 30 Sep 2017 17:07:21 +0800 Subject: [PATCH 5/6] magic id 16 > 10+5 :-( --- river/river_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/river/river_test.go b/river/river_test.go index 240948ad..2bdab0d7 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -218,7 +218,7 @@ func (s *riverTestSuite) testPrepareData(c *C) { } datetime := time.Now().Format(mysql.TimeFormat) - s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime) VALUES (?, ?, ?, ?, ?, ?)", 5, "first", "hello go 1", "e1", "a,b", datetime) + s.testExecute(c, "INSERT INTO test_river (id, title, content, tenum, tset, tdatetime) VALUES (?, ?, ?, ?, ?, ?)", 16, "test datetime", "hello go 16", "e1", "a,b", datetime) } func (s *riverTestSuite) testElasticGet(c *C, id string) *elastic.Response { From 38c0eb08ab20c86cc6255846005c10ce910ecc5f Mon Sep 17 00:00:00 2001 From: henter Date: Sun, 1 Oct 2017 20:39:48 +0800 Subject: [PATCH 6/6] es mapping struct --- elastic/client.go | 27 ++++++++++----------------- river/river_test.go | 2 +- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/elastic/client.go b/elastic/client.go index f18c730d..24bbe3f1 100644 --- a/elastic/client.go +++ b/elastic/client.go @@ -146,12 +146,16 @@ type BulkResponseItem struct { type MappingResponse struct { Code int - Properties map[string]*MappingType `json:"properties"` + Mapping Mapping } -type MappingType struct { - Type string `json:"type"` - Fields interface{} `json:"fields"` +type Mapping map[string]struct { + Mappings map[string]struct { + Properties map[string]struct { + Type string `json:"type"` + Fields interface{} `json:"fields"` + } `json:"properties"` + } `json:"mappings"` } func (c *Client) DoRequest(method string, url string, body *bytes.Buffer) (*http.Response, error) { @@ -277,24 +281,13 @@ func (c *Client) GetMapping(index string, docType string) (*MappingResponse, err return nil, errors.Trace(err) } - var m map[string]map[string]map[string]map[string]map[string]interface{} - err = json.Unmarshal(data, &m) + ret := new(MappingResponse) + err = json.Unmarshal(data, &ret.Mapping) if err != nil { return nil, errors.Trace(err) } - properties := m[index]["mappings"][docType]["properties"] - ret := new(MappingResponse) - ret.Properties = make(map[string]*MappingType) ret.Code = resp.StatusCode - - var mt MappingType - for k, v := range properties { - b, _ := json.Marshal(v) - err = json.Unmarshal(b, &mt) - ret.Properties[k] = &MappingType{mt.Type, mt.Fields} - } - return ret, errors.Trace(err) } diff --git a/river/river_test.go b/river/river_test.go index 2bdab0d7..1cb177c5 100644 --- a/river/river_test.go +++ b/river/river_test.go @@ -238,7 +238,7 @@ func (s *riverTestSuite) testElasticMapping(c *C) *elastic.MappingResponse { r, err := s.r.es.GetMapping(index, docType) c.Assert(err, IsNil) - c.Assert(r.Properties["tdatetime"].Type, Equals, "date") + c.Assert(r.Mapping[index].Mappings[docType].Properties["tdatetime"].Type, Equals, "date") return r }