From c002aacadc3746b65b914c5a34be6dd590ca7da2 Mon Sep 17 00:00:00 2001 From: medcl Date: Thu, 17 Feb 2022 16:11:27 +0800 Subject: [PATCH] bug fix --- Makefile | 128 ++------------ bulk.go | 12 +- http.go | 47 ++++- logstash_tcp_client.go | 382 ++++++++++++++++++++--------------------- 4 files changed, 248 insertions(+), 321 deletions(-) mode change 100644 => 100755 Makefile diff --git a/Makefile b/Makefile old mode 100644 new mode 100755 index 6c74b4e..7cb6207 --- a/Makefile +++ b/Makefile @@ -1,119 +1,15 @@ SHELL=/bin/bash -CWD=$(shell pwd) -OLDGOPATH=${GOPATH} -NEWGOPATH:=${CWD}:${OLDGOPATH} -export GOPATH=$(NEWGOPATH) -PATH := $(PATH):$(GOPATH)/bin -GO111MODULE=off - -build: clean config - @go version - go build -o bin/esm - -tar: build - tar cfz bin/esm.tar.gz bin/esm - -cross-build-all-platform: clean config - go test - GOOS=windows GOARCH=amd64 go build -o bin/windows64/esm.exe - GOOS=windows GOARCH=386 go build -o bin/windows32/esm.exe - GOOS=darwin GOARCH=amd64 go build -o bin/darwin64/esm - GOOS=darwin GOARCH=386 go build -o bin/darwin32/esm - GOOS=linux GOARCH=amd64 go build -o bin/linux64/esm - GOOS=linux GOARCH=386 go build -o bin/linux32/esm - GOOS=linux GOARCH=arm go build -o bin/linux_arm/esm - GOOS=freebsd GOARCH=amd64 go build -o bin/freebsd64/esm - GOOS=freebsd GOARCH=386 go build -o bin/freebsd32/esm - GOOS=netbsd GOARCH=amd64 go build -o bin/netbsd64/esm - GOOS=netbsd GOARCH=386 go build -o bin/netbsd32/esm - GOOS=openbsd GOARCH=amd64 go build -o bin/openbsd64/esm - GOOS=openbsd GOARCH=386 go build -o bin/openbsd32/esm - - -gox-cross-build-all-platform: clean config - go get github.com/mitchellh/gox - go test - gox -output="bin/esm_{{.OS}}_{{.Arch}}" - -cross-gox-build-all-platform: clean config - go get github.com/mitchellh/gox - go test - gox -os=windows -arch=amd64 -output="bin/windows64/esm" - gox -os=windows -arch=386 -output=bin/windows32/esm - gox -os=darwin -arch=amd64 -output=bin/darwin64/esm - gox -os=darwin -arch=386 -output=bin/darwin32/esm - gox -os=linux -arch=amd64 -output=bin/linux64/esm - gox -os=linux -arch=386 -output=bin/linux32/esm - gox -os=linux -arch=arm -output=bin/linux_arm/esm - gox -os=freebsd -arch=amd64 -output=bin/freebsd64/esm - gox -os=freebsd -arch=386 -output=bin/freebsd32/esm - gox -os=netbsd -arch=amd64 -output=bin/netbsd64/esm - gox -os=netbsd -arch=386 -output=bin/netbsd32/esm - gox -os=openbsd -arch=amd64 -output=bin/openbsd64/esm - gox -os=openbsd -arch=386 -output=bin/openbsd32/esm - -cross-build: clean config - go test - GOOS=windows GOARCH=amd64 go build -o bin/windows64/esm.exe - GOOS=darwin GOARCH=amd64 go build -o bin/darwin64/esm - GOOS=linux GOARCH=amd64 go build -o bin/linux64/esm - -all: clean config cross-build - -all-platform: clean config cross-build-all-platform - -format: - gofmt -s -w -tabs=false -tabwidth=4 main.go - -clean: - rm -rif bin - mkdir bin - -config: - @echo "get Dependencies" - go env - go get gopkg.in/cheggaaa/pb.v1 - go get github.com/jessevdk/go-flags - go get github.com/olekukonko/ts - go get github.com/cihub/seelog - go get github.com/parnurzeal/gorequest - go get github.com/mattn/go-isatty - -dist: cross-build package - -dist-all: all package - -dist-all-platform: all-platform package-all-platform - -package: - @echo "Packaging" - tar cfz bin/windows64.tar.gz bin/windows64/esm.exe - tar cfz bin/darwin64.tar.gz bin/darwin64/esm - tar cfz bin/linux64.tar.gz bin/linux64/esm - -package-all-platform: - @echo "Packaging" - tar cfz bin/windows64.tar.gz bin/windows64/esm.exe - tar cfz bin/windows32.tar.gz bin/windows32/esm.exe - tar cfz bin/darwin64.tar.gz bin/darwin64/esm - tar cfz bin/darwin32.tar.gz bin/darwin32/esm - tar cfz bin/linux64.tar.gz bin/linux64/esm - tar cfz bin/linux32.tar.gz bin/linux32/esm - tar cfz bin/linux_arm.tar.gz bin/linux_arm/esm - tar cfz bin/freebsd64.tar.gz bin/freebsd64/esm - tar cfz bin/freebsd32.tar.gz bin/freebsd32/esm - tar cfz bin/netbsd64.tar.gz bin/netbsd64/esm - tar cfz bin/netbsd32.tar.gz bin/netbsd32/esm - tar cfz bin/openbsd64.tar.gz bin/openbsd64/esm - tar cfz bin/openbsd32.tar.gz bin/openbsd32/esm - - -cross-compile: - @echo "Prepare Cross Compiling" - cd $(GOROOT)/src && GOOS=windows GOARCH=amd64 ./make.bash --no-clean - cd $(GOROOT)/src && GOOS=darwin GOARCH=amd64 ./make.bash --no-clean 2> /dev/null 1> /dev/null - cd $(GOROOT)/src && GOOS=linux GOARCH=amd64 ./make.bash --no-clean 2> /dev/null 1> /dev/null - - cd $(CWD) +# APP info +APP_NAME := migrator +APP_VERSION := 1.0.0_SNAPSHOT +APP_CONFIG := $(APP_NAME).yml +APP_EOLDate := "2023-12-31 10:10:10" +APP_STATIC_FOLDER := .public +APP_STATIC_PACKAGE := public +APP_UI_FOLDER := ui +APP_PLUGIN_FOLDER := plugin + +# GO15VENDOREXPERIMENT="1" GO111MODULE=off easyjson -all domain.go +include ../framework/Makefile diff --git a/bulk.go b/bulk.go index d2f4455..d968eba 100644 --- a/bulk.go +++ b/bulk.go @@ -138,17 +138,19 @@ READ_DOCS: log.Error(err) } - // if we approach the 100mb es limit, flush to es and reset mainBuf - if mainBuf.Len()+docBuf.Len() > (c.Config.BulkSizeInMB * 1000000) { - goto CLEAN_BUFFER - } // append the doc to the main buffer mainBuf.Write(docBuf.Bytes()) // reset for next document bulkItemSize++ - docBuf.Reset() (*docCount)++ + docBuf.Reset() + + // if we approach the 100mb es limit, flush to es and reset mainBuf + if mainBuf.Len()+docBuf.Len() > (c.Config.BulkSizeInMB * 1024*1024) { + goto CLEAN_BUFFER + } + case <-idleTimeout.C: log.Debug("5s no message input") goto CLEAN_BUFFER diff --git a/http.go b/http.go index 838d0ec..be17470 100644 --- a/http.go +++ b/http.go @@ -25,6 +25,7 @@ import ( "fmt" log "github.com/cihub/seelog" "github.com/parnurzeal/gorequest" + "infini.sh/framework/core/util" "infini.sh/framework/lib/fasthttp" "io" "io/ioutil" @@ -40,6 +41,7 @@ func BasicAuth(req *fasthttp.Request,user,pass string) { } func Get(url string,auth *Auth,proxy string) (*http.Response, string, []error) { + request := gorequest.New() tr := &http.Transport{ @@ -54,7 +56,7 @@ func Get(url string,auth *Auth,proxy string) (*http.Response, string, []error) { request.SetBasicAuth(auth.User,auth.Pass) } - request.Header["Content-Type"]= "application/json" + //request.Type("application/json") if(len(proxy)>0){ request.Proxy(proxy) @@ -78,7 +80,7 @@ func Post(url string,auth *Auth, body string,proxy string)(*http.Response, strin request.SetBasicAuth(auth.User,auth.Pass) } - request.Header["Content-Type"]="application/json" + //request.Type("application/json") if(len(proxy)>0){ request.Proxy(proxy) @@ -148,14 +150,18 @@ func DoRequest(compress bool,method string,loadUrl string,auth *Auth,body []byte req := fasthttp.AcquireRequest() resp := fasthttp.AcquireResponse() - defer fasthttp.ReleaseRequest(req) // <- do not forget to release - defer fasthttp.ReleaseResponse(resp) // <- do not forget to release + //defer fasthttp.ReleaseRequest(req) // <- do not forget to release + //defer fasthttp.ReleaseResponse(resp) // <- do not forget to release req.SetRequestURI(loadUrl) req.Header.SetMethod(method) - req.Header.Set("Content-Type", "application/json") + //req.Header.Set("Content-Type", "application/json") + if compress { + req.Header.Set("Accept-Encoding", "gzip") + req.Header.Set("content-encoding", "gzip") + } if auth!=nil{ req.URI().SetUsername(auth.User) @@ -163,6 +169,21 @@ func DoRequest(compress bool,method string,loadUrl string,auth *Auth,body []byte } if len(body)>0{ + + //if compress { + // _, err := fasthttp.WriteGzipLevel(req.BodyWriter(), data.Bytes(), fasthttp.CompressBestSpeed) + // if err != nil { + // panic(err) + // } + //} else { + // //req.SetBody(body) + // req.SetBodyStreamWriter(func(w *bufio.Writer) { + // w.Write(data.Bytes()) + // w.Flush() + // }) + // + //} + if compress{ _, err := fasthttp.WriteGzipLevel(req.BodyWriter(), body, fasthttp.CompressBestSpeed) if err != nil { @@ -171,6 +192,10 @@ func DoRequest(compress bool,method string,loadUrl string,auth *Auth,body []byte }else{ req.SetBody(body) + //req.SetBodyStreamWriter(func(w *bufio.Writer) { + // w.Write(body) + // w.Flush() + //}) } } @@ -183,10 +208,12 @@ func DoRequest(compress bool,method string,loadUrl string,auth *Auth,body []byte panic("empty response") } + log.Debug("received status code", resp.StatusCode, "from", string(resp.Header.Header()), "content", util.SubString(string(resp.Body()),0,500), req) + if resp.StatusCode() == http.StatusOK || resp.StatusCode() == http.StatusCreated { } else { - //fmt.Println("received status code", resp.StatusCode, "from", string(resp.Header.Header()), "content", string(resp.Body()), req) + //log.Error("received status code", resp.StatusCode, "from", string(resp.Header.Header()), "content", string(resp.Body()), req) } @@ -254,7 +281,7 @@ func Request(method string,r string,auth *Auth,body *bytes.Buffer,proxy string)( resp,errs := client.Do(reqest) if errs != nil { - log.Error(errs) + log.Error(util.SubString(errs.Error(),0,500)) return "",errs } @@ -270,8 +297,10 @@ func Request(method string,r string,auth *Auth,body *bytes.Buffer,proxy string)( respBody,err:=ioutil.ReadAll(resp.Body) + log.Error(util.SubString(string(respBody),0,500)) + if err != nil { - log.Error(err) + log.Error(util.SubString(string(err.Error()),0,500)) return string(respBody),err } @@ -289,7 +318,7 @@ func DecodeJson(jsonStream string, o interface{})(error) { decoder := json.NewDecoder(strings.NewReader(jsonStream)) // UseNumber causes the Decoder to unmarshal a number into an interface{} as a Number instead of as a float64. decoder.UseNumber() - decoder. + //decoder. if err := decoder.Decode(o); err != nil { fmt.Println("error:", err) diff --git a/logstash_tcp_client.go b/logstash_tcp_client.go index d1c92aa..7b97e52 100644 --- a/logstash_tcp_client.go +++ b/logstash_tcp_client.go @@ -16,194 +16,194 @@ limitations under the License. package main -import ( - "fmt" - "os" - "bufio" - "strconv" - "crypto/tls" - "net" - "log" - "encoding/binary" - "github.com/paulbellamy/ratecounter" - "time" - "io/ioutil" - "infini.sh/framework/core/util" -) - -func WriteToLogstash(c *Config) { - if c.LogstashEndpoint==""{ - fmt.Println("logstash endpoint not defined"); - return - } - - //go StartServer("127.0.0.1:5055") - - // connect to this socket - - //read files - - - - - - //fmt.Println(len(arr)) - //fmt.Println(arr[0]) - - fmt.Println(c.Workers) - conf := &tls.Config{ - InsecureSkipVerify: true, - } - counter := ratecounter.NewRateCounter(1 * time.Second) - for i:=0;i= contentSize-HEAD_SIZE) { - contentBuf = buffer.read(HEAD_SIZE, contentSize) - fmt.Println(string(contentBuf)) - continue - } - break - } - } -} +//import ( +// "fmt" +// "os" +// "bufio" +// "strconv" +// "crypto/tls" +// "net" +// "log" +// "encoding/binary" +// "github.com/paulbellamy/ratecounter" +// "time" +// "io/ioutil" +// "infini.sh/framework/core/util" +//) +// +//func WriteToLogstash(c *Config) { +// if c.LogstashEndpoint==""{ +// fmt.Println("logstash endpoint not defined"); +// return +// } +// +// //go StartServer("127.0.0.1:5055") +// +// // connect to this socket +// +// //read files +// +// +// +// +// +// //fmt.Println(len(arr)) +// //fmt.Println(arr[0]) +// +// fmt.Println(c.Workers) +// conf := &tls.Config{ +// InsecureSkipVerify: true, +// } +// counter := ratecounter.NewRateCounter(1 * time.Second) +// for i:=0;i= contentSize-HEAD_SIZE) { +// contentBuf = buffer.read(HEAD_SIZE, contentSize) +// fmt.Println(string(contentBuf)) +// continue +// } +// break +// } +// } +//}