diff --git a/Gopkg.lock b/Gopkg.lock index 597ef95a..615b8b59 100644 --- a/Gopkg.lock +++ b/Gopkg.lock @@ -3,7 +3,6 @@ [[projects]] branch = "master" - digest = "1:69170709219fc206d312477fe65aa8841863ca3527a545b09c87c31789b5e0ff" name = "github.com/BaritoLog/go-boilerplate" packages = [ "errkit", @@ -11,261 +10,190 @@ "slicekit", "srvkit", "testkit", - "timekit", + "timekit" ] - pruneopts = "UT" revision = "20c4c30023960bc27eb5e4662207a0bcaae04f11" [[projects]] branch = "master" - digest = "1:913e517251821e5e76ac74ed005b67a5c3067a8101f1dfd0d9fdbd7c556aaa22" name = "github.com/BaritoLog/instru" packages = ["."] - pruneopts = "UT" revision = "6c2745f219df1f34656bfa59f9875d09198c092c" [[projects]] - digest = "1:66b8ed452b31eb9075bc53295952487c333a9cb555de57ed61f5664c058d9050" name = "github.com/Shopify/sarama" packages = ["."] - pruneopts = "UT" - revision = "35324cf48e33d8260e1c7c18854465a904ade249" - version = "v1.17.0" + revision = "a6144ae922fd99dd0ea5046c8137acfb7fab0914" + version = "v1.18.0" [[projects]] - branch = "master" - digest = "1:d8abdc866ebbe05fa3bce50863e23afd4c73c804b90c908caa864e86df1db8a1" name = "github.com/bouk/monkey" packages = ["."] - pruneopts = "UT" revision = "5df1f207ff77e025801505ae4d903133a0b4353f" + version = "v1.0.0" [[projects]] - digest = "1:4fdffd1724c105db8c394019cfc2444fd23466be04812850506437361ee5de55" name = "github.com/bsm/sarama-cluster" packages = ["."] - pruneopts = "UT" - revision = "cf455bc755fe41ac9bb2861e7a961833d9c2ecc3" - version = "v2.1.13" + revision = "c618e605e15c0d7535f6c96ff8efbb0dba4fd66c" + version = "v2.1.15" [[projects]] - digest = "1:a2c1d0e43bd3baaa071d1b9ed72c27d78169b2b269f71c105ac4ba34b1be4a39" name = "github.com/davecgh/go-spew" packages = ["spew"] - pruneopts = "UT" - revision = "346938d642f2ec3594ed81d874461961cd0faa76" - version = "v1.1.0" + revision = "8991bc29aa16c548c550c7ff78260e27b9ab7c73" + version = "v1.1.1" [[projects]] - digest = "1:1f0c7ab489b407a7f8f9ad16c25a504d28ab461517a971d341388a56156c1bd7" name = "github.com/eapache/go-resiliency" packages = ["breaker"] - pruneopts = "UT" revision = "ea41b0fad31007accc7f806884dcdf3da98b79ce" version = "v1.1.0" [[projects]] branch = "master" - digest = "1:0448d1c1a596941c608762912fe7c865f88d9ffa45afb8af1edcf1401762bf7e" name = "github.com/eapache/go-xerial-snappy" packages = ["."] - pruneopts = "UT" - revision = "040cc1a32f578808623071247fdbd5cc43f37f5f" + revision = "776d5712da21bc4762676d614db1d8a64f4238b0" [[projects]] - digest = "1:444b82bfe35c83bbcaf84e310fb81a1f9ece03edfed586483c869e2c046aef69" name = "github.com/eapache/queue" packages = ["."] - pruneopts = "UT" revision = "44cc805cf13205b55f69e14bcb69867d1ae92f98" version = "v1.1.0" +[[projects]] + name = "github.com/gofrs/uuid" + packages = ["."] + revision = "370558f003bfe29580cd0f698d8640daccdcc45c" + version = "v3.1.1" + [[projects]] branch = "master" - digest = "1:2e292c0e36117b617c4df3f10ac9a7fe05e3c90bf057f1e61867c913d866e0c9" name = "github.com/golang/mock" packages = ["gomock"] - pruneopts = "UT" - revision = "22bbf0ddf08105dfa364d0a2fa619dfa71014af5" + revision = "600781dde9cca80734169b9e969d9054ccc57937" [[projects]] branch = "master" - digest = "1:4a0c6bb4805508a6287675fac876be2ac1182539ca8a32468d8128882e9d5009" name = "github.com/golang/snappy" packages = ["."] - pruneopts = "UT" revision = "2e65f85255dbc3072edf28d6b5b8efc472979f5a" [[projects]] - digest = "1:f8cb7c367c825e0c0be75f17e9b003d39b1240a1535fbbf095a18d7bb0d0c9c9" name = "github.com/hashicorp/consul" packages = ["api"] - pruneopts = "UT" - revision = "e716d1b5f8be252b3e53906c6d5632e0228f30fa" - version = "v1.2.2" + revision = "48d287ef690ada66634885640f3444dbf7b71d18" + version = "v1.2.3" [[projects]] - branch = "master" - digest = "1:77cb3be9b21ba7f1a4701e870c84ea8b66e7d74c7c8951c58155fdadae9414ec" name = "github.com/hashicorp/go-cleanhttp" packages = ["."] - pruneopts = "UT" - revision = "d5fe4b57a186c716b0e00b8c301cbd9b4182694d" + revision = "e8ab9daed8d1ddd2d3c4efba338fe2eeae2e4f18" + version = "v0.5.0" [[projects]] branch = "master" - digest = "1:45aad874d3c7d5e8610427c81870fb54970b981692930ec2a319ce4cb89d7a00" name = "github.com/hashicorp/go-rootcerts" packages = ["."] - pruneopts = "UT" revision = "6bb64b370b90e7ef1fa532be9e591a81c3493e00" [[projects]] - digest = "1:0dd7b7b01769f9df356dc99f9e4144bdbabf6c79041ea7c0892379c5737f3c44" name = "github.com/hashicorp/serf" packages = ["coordinate"] - pruneopts = "UT" revision = "d6574a5bb1226678d7010325fb6c985db20ee458" version = "v0.8.1" [[projects]] branch = "master" - digest = "1:aa3d8d42865c42626b5c1add193692d045b3188b1479f0a0a88690d21fe20083" name = "github.com/mailru/easyjson" packages = [ ".", "buffer", "jlexer", - "jwriter", + "jwriter" ] - pruneopts = "UT" - revision = "03f2033d19d5860aef995fe360ac7d395cd8ce65" + revision = "60711f1a8329503b04e1c88535f419d0bb440bff" [[projects]] - branch = "master" - digest = "1:8eb17c2ec4df79193ae65b621cd1c0c4697db3bc317fe6afdc76d7f2746abd05" name = "github.com/mitchellh/go-homedir" packages = ["."] - pruneopts = "UT" - revision = "3864e76763d94a6df2f9960b16a20a33da9f9a66" + revision = "ae18d6b8b3205b561c79e8e5f69bff09736185f4" + version = "v1.0.0" [[projects]] - branch = "master" - digest = "1:5ab79470a1d0fb19b041a624415612f8236b3c06070161a910562f2b2d064355" name = "github.com/mitchellh/mapstructure" packages = ["."] - pruneopts = "UT" - revision = "f15292f7a699fcc1a38a80977f80a046874ba8ac" + revision = "fa473d140ef3c6adf42d6b391fe76707f1f243c8" + version = "v1.0.0" [[projects]] - digest = "1:d2997d9a773171df32642730331692fdf3f98fc51911067b708f92cfdc55708b" name = "github.com/olivere/elastic" packages = [ ".", "config", - "uritemplates", + "uritemplates" ] - pruneopts = "UT" - revision = "e1da71ef4bb6941f2051f5cc1c4d3e56b1978cf3" - version = "v6.1.26" + revision = "e6cae211ee802eab70248a68fe2cb03b616744c9" + version = "v6.2.5" [[projects]] - digest = "1:29803f52611cbcc1dfe55b456e9fdac362af7248b3d29d7ea1bec0a12e71dff4" name = "github.com/pierrec/lz4" packages = [ ".", - "internal/xxh32", + "internal/xxh32" ] - pruneopts = "UT" - revision = "1958fd8fff7f115e79725b1288e0b878b3e06b00" - version = "v2.0.3" + revision = "bb6bfd13c6a262f1943c0446eb25b7f54c1fb9a2" + version = "v2.0.6" [[projects]] - digest = "1:40e195917a951a8bf867cd05de2a46aaf1806c50cf92eebf4c16f78cd196f747" name = "github.com/pkg/errors" packages = ["."] - pruneopts = "UT" revision = "645ef00459ed84a119197bfb8d8205042c6df63d" version = "v0.8.0" [[projects]] branch = "master" - digest = "1:c4556a44e350b50a490544d9b06e9fba9c286c21d6c0e47f54f3a9214597298c" name = "github.com/rcrowley/go-metrics" packages = ["."] - pruneopts = "UT" revision = "e2704e165165ec55d062f5919b4b29494e9fa790" [[projects]] - digest = "1:274f67cb6fed9588ea2521ecdac05a6d62a8c51c074c1fccc6a49a40ba80e925" - name = "github.com/satori/go.uuid" - packages = ["."] - pruneopts = "UT" - revision = "f58768cc1a7a7e77a3bd49e98cdd21419399b6a3" - version = "v1.2.0" - -[[projects]] - digest = "1:d867dfa6751c8d7a435821ad3b736310c2ed68945d05b50fb9d23aee0540c8cc" name = "github.com/sirupsen/logrus" packages = ["."] - pruneopts = "UT" revision = "3e01752db0189b9157070a0e1668a620f9a85da2" version = "v1.0.6" [[projects]] - digest = "1:b24d38b282bacf9791408a080f606370efa3d364e4b5fd9ba0f7b87786d3b679" name = "github.com/urfave/cli" packages = ["."] - pruneopts = "UT" revision = "cfb38830724cc34fedffe9a2a29fb54fa9169cd1" version = "v1.20.0" [[projects]] branch = "master" - digest = "1:3f3a05ae0b95893d90b9b3b5afdb79a9b3d96e4e36e099d841ae602e4aca0da8" name = "golang.org/x/crypto" packages = ["ssh/terminal"] - pruneopts = "UT" - revision = "c126467f60eb25f8f27e5a981f32a87e3965053f" + revision = "0e37d006457bf46f9e6692014ba72ef82c33022c" [[projects]] branch = "master" - digest = "1:76ee51c3f468493aff39dbacc401e8831fbb765104cbf613b89bef01cf4bad70" name = "golang.org/x/net" packages = ["context"] - pruneopts = "UT" - revision = "a0f8a16cb08c06df97cbdf9c47f4731ba548c33c" + revision = "26e67e76b6c3f6ce91f7c52def5af501b4e0f3a2" [[projects]] branch = "master" - digest = "1:8742e6e73627b2877c3f723bc1823d5667ec59011242480309dc90fa862512aa" name = "golang.org/x/sys" packages = [ "unix", - "windows", + "windows" ] - pruneopts = "UT" - revision = "bd9dbc187b6e1dacfdd2722a87e83093c2d7bd6e" + revision = "d641721ec2dead6fe5ca284096fe4b1fcd49e427" [solve-meta] analyzer-name = "dep" analyzer-version = 1 - input-imports = [ - "github.com/BaritoLog/go-boilerplate/errkit", - "github.com/BaritoLog/go-boilerplate/saramatestkit", - "github.com/BaritoLog/go-boilerplate/slicekit", - "github.com/BaritoLog/go-boilerplate/srvkit", - "github.com/BaritoLog/go-boilerplate/testkit", - "github.com/BaritoLog/go-boilerplate/timekit", - "github.com/BaritoLog/instru", - "github.com/Shopify/sarama", - "github.com/bsm/sarama-cluster", - "github.com/golang/mock/gomock", - "github.com/hashicorp/consul/api", - "github.com/olivere/elastic", - "github.com/satori/go.uuid", - "github.com/sirupsen/logrus", - "github.com/urfave/cli", - ] + inputs-digest = "e9bffaaebc83a627b3c0ef16751ca4e8c61fd65b391170a57a1e496392e7c6f7" solver-name = "gps-cdcl" solver-version = 1 diff --git a/flow/barito_consumer_service.go b/flow/barito_consumer_service.go index 333cb1a0..25e027f1 100644 --- a/flow/barito_consumer_service.go +++ b/flow/barito_consumer_service.go @@ -4,10 +4,11 @@ import ( "context" "fmt" "strings" + "time" "github.com/BaritoLog/go-boilerplate/errkit" "github.com/Shopify/sarama" - uuid "github.com/satori/go.uuid" + uuid "github.com/gofrs/uuid" log "github.com/sirupsen/logrus" ) @@ -20,6 +21,7 @@ const ( ErrMakeNewTopicWorker = errkit.Error("Make new topic worker failed") ErrSpawnWorkerOnNewTopic = errkit.Error("Spawn worker on new topic failed") ErrSpawnWorker = errkit.Error("Span worker failed") + ErrHaltWorker = errkit.Error("Consumer Worker Halted") PrefixEventGroupID = "nte" ) @@ -46,6 +48,7 @@ type baritoConsumerService struct { lastError error lastTimber Timber lastNewTopic string + isHalt bool } func NewBaritoConsumerService(factory KafkaFactory, groupID, elasticURL, topicSuffix, newTopicEventName string) BaritoConsumerService { @@ -63,11 +66,12 @@ func NewBaritoConsumerService(factory KafkaFactory, groupID, elasticURL, topicSu func (s *baritoConsumerService) Start() (err error) { admin, err := s.initAdmin() + if err != nil { return errkit.Concat(ErrMakeKafkaAdmin, err) } - uuid := uuid.NewV4() + uuid, _ := uuid.NewV4() s.eventWorkerGroupID = fmt.Sprintf("%s-%s", PrefixEventGroupID, uuid) log.Infof("Generate event worker group id: %s", s.eventWorkerGroupID) @@ -93,6 +97,7 @@ func (s *baritoConsumerService) Start() (err error) { func (s *baritoConsumerService) initAdmin() (admin KafkaAdmin, err error) { admin, err = s.factory.MakeKafkaAdmin() s.admin = admin + return } @@ -160,10 +165,16 @@ func (s *baritoConsumerService) logNewTopic(topic string) { log.Infof("New topic: %s", topic) } +func (s *baritoConsumerService) onElasticRetry(err error) { + s.logError(errkit.Concat(ErrElasticsearchClient, err)) + s.HaltAllWorker() +} + func (s *baritoConsumerService) onStoreTimber(message *sarama.ConsumerMessage) { // create elastic client - client, err := elasticNewClient(s.elasticUrl) + retrier := s.elasticRetrier() + elastic, err := NewElastic(retrier, s.elasticUrl) if err != nil { s.logError(errkit.Concat(ErrElasticsearchClient, err)) return @@ -178,12 +189,20 @@ func (s *baritoConsumerService) onStoreTimber(message *sarama.ConsumerMessage) { // store to elasticsearch ctx := context.Background() - err = elasticStore(client, ctx, timber) + err = elastic.Store(ctx, timber) if err != nil { s.logError(errkit.Concat(ErrStore, err)) return } + if s.isHalt { + err = s.ResumeWorker() + if err != nil { + s.logError(errkit.Concat(ErrConsumerWorker, err)) + return + } + } + s.logTimber(timber) } @@ -212,3 +231,22 @@ func (s *baritoConsumerService) WorkerMap() map[string]ConsumerWorker { func (s *baritoConsumerService) NewTopicEventWorker() ConsumerWorker { return s.newTopicEventWorker } + +func (s *baritoConsumerService) HaltAllWorker() { + if !s.isHalt { + s.isHalt = true + s.logError(ErrHaltWorker) + s.Close() + } +} + +func (s *baritoConsumerService) elasticRetrier() *ElasticRetrier { + return NewElasticRetrier(30*time.Second, s.onElasticRetry) +} + +func (s *baritoConsumerService) ResumeWorker() (err error) { + s.isHalt = false + err = s.Start() + + return +} diff --git a/flow/barito_consumer_service_test.go b/flow/barito_consumer_service_test.go index f09e2ee1..85cc3dae 100644 --- a/flow/barito_consumer_service_test.go +++ b/flow/barito_consumer_service_test.go @@ -210,3 +210,54 @@ func TestBaritoConsumerService_onNewTopicEvent_IgnoreIfTopicExist(t *testing.T) FatalIf(t, service.lastNewTopic == "topic001", "lastNewTopic should be not topic001") } + +func TestHaltAllWorker(t *testing.T) { + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + factory := NewDummyKafkaFactory() + factory.Expect_MakeKafkaAdmin_ConsumerServiceSuccess(ctrl, []string{"abc_logs"}) + factory.Expect_MakeClusterConsumer_AlwaysSuccess(ctrl) + + var v interface{} = NewBaritoConsumerService(factory, "", "", "_logs", "") + service := v.(*baritoConsumerService) + + err := service.Start() + FatalIfError(t, err) + FatalIf(t, !strings.HasPrefix(service.eventWorkerGroupID, PrefixEventGroupID), "eventWorkerGroup should be have prefix") + + // service.Start() execute goroutine, so wait 1ms to make sure it come in to mainLoop + timekit.Sleep("1ms") + + worker := service.NewTopicEventWorker() + workerMap := service.WorkerMap() + + service.HaltAllWorker() + + FatalIf(t, !service.isHalt, "Consumer Worker should be halted") + FatalIf(t, !worker.IsStart(), "New Topic Event Worker should be halted") + + for _, w := range workerMap { + FatalIf(t, !w.IsStart(), "Worker should be halted") + } +} + +func TestResumeWorker(t *testing.T) { + + ctrl := gomock.NewController(t) + defer ctrl.Finish() + + factory := NewDummyKafkaFactory() + factory.Expect_MakeKafkaAdmin_ConsumerServiceSuccess(ctrl, []string{"abc_logs"}) + factory.Expect_MakeClusterConsumer_AlwaysSuccess(ctrl) + + var v interface{} = NewBaritoConsumerService(factory, "", "", "_logs", "") + service := v.(*baritoConsumerService) + + err := service.ResumeWorker() + FatalIfError(t, err) + FatalIf(t, service.isHalt, "Consumer Worker should be started") + // service.Start() execute goroutine, so wait 1ms to make sure it come in to mainLoop + timekit.Sleep("1ms") + defer service.Close() +} diff --git a/flow/consumer_worker.go b/flow/consumer_worker.go index 37121a63..1b3e9bf2 100644 --- a/flow/consumer_worker.go +++ b/flow/consumer_worker.go @@ -14,6 +14,7 @@ const ( type ConsumerWorker interface { Start() Stop() + Halt() IsStart() bool OnError(f func(error)) OnSuccess(f func(*sarama.ConsumerMessage)) @@ -28,6 +29,7 @@ type consumerWorker struct { onSuccessFunc func(*sarama.ConsumerMessage) onNotificationFunc func(*cluster.Notification) stop chan int + lastMessage *sarama.ConsumerMessage } func NewConsumerWorker(name string, consumer ClusterConsumer) ConsumerWorker { @@ -56,6 +58,13 @@ func (w *consumerWorker) Stop() { }() } +func (w *consumerWorker) Halt() { + go func() { + w.stop <- 1 + }() + log.Warnf("Halt worker '%s'", w.name) +} + func (w *consumerWorker) IsStart() bool { return w.isStart } @@ -78,8 +87,9 @@ func (w *consumerWorker) loopMain() { select { case message, ok := <-w.consumer.Messages(): if ok { - w.fireSuccess(message) w.consumer.MarkOffset(message, "") + w.fireSuccess(message) + log.Infof("Mark Offset, %s", message) } case <-w.stop: w.isStart = false diff --git a/flow/elastic.go b/flow/elastic.go index fa1af58c..2b54ad69 100644 --- a/flow/elastic.go +++ b/flow/elastic.go @@ -11,27 +11,43 @@ import ( log "github.com/sirupsen/logrus" ) -func elasticNewClient(urls ...string) (*elastic.Client, error) { - return elastic.NewClient( +type Elastic interface { + OnFailure(f func(*Timber)) + Store(ctx context.Context, timber Timber) + NewClient() +} + +type elasticClient struct { + client *elastic.Client + onFailureFunc func(*Timber) +} + +func NewElastic(retrierFunc *ElasticRetrier, urls ...string) (client elasticClient, err error) { + + c, err := elastic.NewClient( elastic.SetURL(urls...), elastic.SetSniff(false), elastic.SetHealthcheck(false), + elastic.SetRetrier(retrierFunc), ) -} -func elasticStore(client *elastic.Client, ctx context.Context, timber Timber) (err error) { + return elasticClient{ + client: c, + }, err +} +func (e *elasticClient) Store(ctx context.Context, timber Timber) (err error) { indexPrefix := timber.Context().ESIndexPrefix documentType := timber.Context().ESDocumentType indexName := fmt.Sprintf("%s-%s", indexPrefix, time.Now().Format("2006.01.02")) appSecret := timber.Context().AppSecret - exists, _ := client.IndexExists(indexName).Do(ctx) + exists, _ := e.client.IndexExists(indexName).Do(ctx) if !exists { log.Infof("ES index '%s' is not exist", indexName) index := elasticCreateIndex(indexPrefix) - _, err = client.CreateIndex(indexName). + _, err = e.client.CreateIndex(indexName). BodyJson(index). Do(ctx) instruESCreateIndex(err) @@ -42,7 +58,7 @@ func elasticStore(client *elastic.Client, ctx context.Context, timber Timber) (e document := ConvertTimberToElasticDocument(timber) - _, err = client.Index(). + _, err = e.client.Index(). Index(indexName). Type(documentType). BodyJson(document). @@ -52,6 +68,10 @@ func elasticStore(client *elastic.Client, ctx context.Context, timber Timber) (e return } +func (e *elasticClient) OnFailure(f func(*Timber)) { + e.onFailureFunc = f +} + func elasticCreateIndex(indexPrefix string) *es.Index { return &es.Index{ diff --git a/flow/elastic_retrier.go b/flow/elastic_retrier.go new file mode 100644 index 00000000..192cb8c5 --- /dev/null +++ b/flow/elastic_retrier.go @@ -0,0 +1,39 @@ +package flow + +import ( + "context" + "errors" + "fmt" + "net/http" + "syscall" + "time" + + "github.com/olivere/elastic" + log "github.com/sirupsen/logrus" +) + +type ElasticRetrier struct { + backoff elastic.Backoff + onRetryFunc func(err error) +} + +func NewElasticRetrier(t time.Duration, f func(err error)) *ElasticRetrier { + return &ElasticRetrier{ + backoff: elastic.NewConstantBackoff(t), + onRetryFunc: f, + } +} + +func (r *ElasticRetrier) Retry(ctx context.Context, retry int, req *http.Request, resp *http.Response, err error) (time.Duration, bool, error) { + + log.Warn(errors.New(fmt.Sprintf("Elasticsearch Retrier #%d", retry))) + + if err == syscall.ECONNREFUSED { + err = errors.New("Elasticsearch or network down") + } + + // Let the backoff strategy decide how long to wait and whether to stop + wait, stop := r.backoff.Next(retry) + r.onRetryFunc(err) + return wait, stop, nil +} diff --git a/flow/elastic_retrier_test.go b/flow/elastic_retrier_test.go new file mode 100644 index 00000000..39ba2ceb --- /dev/null +++ b/flow/elastic_retrier_test.go @@ -0,0 +1,32 @@ +package flow + +import ( + "context" + "testing" + "time" +) + +func mockElasticRetrier() *ElasticRetrier { + return NewElasticRetrier(1*time.Second, mockRetrier) +} + +func mockRetrier(err error) { + // Nothing to do +} + +func TestNewElasticRetrier(t *testing.T) { + r := NewElasticRetrier(1*time.Second, mockRetrier) + wait, ok, err := r.Retry(context.TODO(), 1, nil, nil, nil) + if want, got := 1*time.Second, wait; want != got { + t.Fatalf("expected %v, got %v", want, got) + } + + want := true //Loop + if got := ok; want != got { + t.Fatalf("expected %v, got %v", want, got) + } + + if err != nil { + t.Fatalf("expected nil, got %v", err) + } +} diff --git a/flow/elastic_test.go b/flow/elastic_test.go index bfcb4800..ba7250c8 100644 --- a/flow/elastic_test.go +++ b/flow/elastic_test.go @@ -24,10 +24,11 @@ func TestElasticStore_CreateIndexError(t *testing.T) { }) defer ts.Close() - client, err := elasticNewClient(ts.URL) + retrier := mockElasticRetrier() + client, err := NewElastic(retrier, ts.URL) FatalIfError(t, err) - err = elasticStore(client, context.Background(), timber) + err = client.Store(context.Background(), timber) FatalIfWrongError(t, err, "elastic: Error 500 (Internal Server Error)") FatalIf(t, instru.GetEventCount("es_create_index", "fail") != 1, "wrong total es_create_index.fail event") } @@ -45,12 +46,13 @@ func TestElasticStore_CreateindexSuccess(t *testing.T) { }) defer ts.Close() - client, err := elasticNewClient(ts.URL) + retrier := mockElasticRetrier() + client, err := NewElastic(retrier, ts.URL) FatalIfError(t, err) appSecret := timber.Context().AppSecret - err = elasticStore(client, context.Background(), timber) + err = client.Store(context.Background(), timber) FatalIfError(t, err) FatalIf(t, instru.GetEventCount("es_create_index", "success") != 1, "wrong es_store.total success event") FatalIf(t, instru.GetEventCount(fmt.Sprintf("%s_es_store", appSecret), "success") != 1, "wrong total es_store.success event") @@ -69,12 +71,13 @@ func TestElasticStoreman_store_SaveError(t *testing.T) { }) defer ts.Close() - client, err := elasticNewClient(ts.URL) + retrier := mockElasticRetrier() + client, err := NewElastic(retrier, ts.URL) FatalIfError(t, err) appSecret := timber.Context().AppSecret - err = elasticStore(client, context.Background(), timber) + err = client.Store(context.Background(), timber) FatalIfWrongError(t, err, "elastic: Error 400 (Bad Request)") FatalIf(t, instru.GetEventCount(fmt.Sprintf("%s_es_store", appSecret), "fail") != 1, "wrong total fail event") } diff --git a/flow/instrumentation.go b/flow/instrumentation.go index d8c0b597..47edba2e 100644 --- a/flow/instrumentation.go +++ b/flow/instrumentation.go @@ -49,6 +49,7 @@ func InstruApplicationSecret(appSecret string) { func GetApplicationSecretCollection() []string { collection := instru.Metric("application_group").Get("app_secrets") + if collection == nil { return []string{} } diff --git a/flow/instrumentation_test.go b/flow/instrumentation_test.go index d16bb606..4f1b10b4 100644 --- a/flow/instrumentation_test.go +++ b/flow/instrumentation_test.go @@ -2,34 +2,41 @@ package flow import ( "testing" - . "github.com/BaritoLog/go-boilerplate/testkit" - "github.com/BaritoLog/instru" + + . "github.com/BaritoLog/go-boilerplate/testkit" + "github.com/BaritoLog/instru" ) +func ResetApplicationSecretCollection() { + instru.Metric("application_group").Put("app_secrets", nil) +} + func TestContains_NotMatch(t *testing.T) { - given := []string{"a", "b"} - exist := Contains(given, "e") + given := []string{"a", "b"} + exist := Contains(given, "e") - FatalIf(t, exist, "Should not contain") + FatalIf(t, exist, "Should not contain") } func TestContains(t *testing.T) { - given := []string{"a", "b"} - exist := Contains(given, "b") + given := []string{"a", "b"} + exist := Contains(given, "b") - FatalIf(t, !exist, "Should contain") + FatalIf(t, !exist, "Should contain") } func TestGetApplicationSecretCollection_Empty(t *testing.T) { - want := GetApplicationSecretCollection() + ResetApplicationSecretCollection() + want := GetApplicationSecretCollection() FatalIf(t, len(want) > 0, "Should be empty") } func TestGetApplicationSecretCollection_Exist(t *testing.T) { - expected := []string{"some-secret"} - instru.Metric("application_group").Put("app_secrets", expected) - want := GetApplicationSecretCollection() + expected := []string{"some-secret"} + ResetApplicationSecretCollection() + instru.Metric("application_group").Put("app_secrets", expected) + want := GetApplicationSecretCollection() FatalIf(t, len(want) == 0, "Should not be empty") } @@ -43,18 +50,20 @@ func TestGetApplicationSecretCollection_Exist(t *testing.T) { // } func TestInstruApplicationSecret(t *testing.T) { - appSecret := "some-secret" - duplicateAppSecret := "some-secret" - nextAppSecret := "other-secret" - InstruApplicationSecret(appSecret) - collection := GetApplicationSecretCollection() - FatalIf(t, !Contains(collection, "some-secret"), "Should contain app secret") - InstruApplicationSecret(duplicateAppSecret) - - collection = GetApplicationSecretCollection() - FatalIf(t, len(collection) == 2, "Should not be duplicate") - - InstruApplicationSecret(nextAppSecret) - collection = GetApplicationSecretCollection() - FatalIf(t, len(collection) > 2, "Should be contain 2 app secret") + appSecret := "some-secret" + duplicateAppSecret := "some-secret" + nextAppSecret := "other-secret" + + ResetApplicationSecretCollection() + InstruApplicationSecret(appSecret) + collection := GetApplicationSecretCollection() + FatalIf(t, !Contains(collection, "some-secret"), "Should contain app secret") + InstruApplicationSecret(duplicateAppSecret) + + collection = GetApplicationSecretCollection() + FatalIf(t, len(collection) == 2, "Should not be duplicate") + + InstruApplicationSecret(nextAppSecret) + collection = GetApplicationSecretCollection() + FatalIf(t, len(collection) > 2, "Should be contain 2 app secret") } diff --git a/mock/consumer_worker.go b/mock/consumer_worker.go index 0b766ba9..2cce2299 100644 --- a/mock/consumer_worker.go +++ b/mock/consumer_worker.go @@ -5,10 +5,11 @@ package mock import ( + reflect "reflect" + sarama "github.com/Shopify/sarama" sarama_cluster "github.com/bsm/sarama-cluster" gomock "github.com/golang/mock/gomock" - reflect "reflect" ) // MockConsumerWorker is a mock of ConsumerWorker interface @@ -54,6 +55,16 @@ func (mr *MockConsumerWorkerMockRecorder) Stop() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Stop", reflect.TypeOf((*MockConsumerWorker)(nil).Stop)) } +// Start mocks base method +func (m *MockConsumerWorker) Halt() { + m.ctrl.Call(m, "Start") +} + +// Start indicates an expected call of Start +func (mr *MockConsumerWorkerMockRecorder) Halt() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Halt", reflect.TypeOf((*MockConsumerWorker)(nil).Halt)) +} + // IsStart mocks base method func (m *MockConsumerWorker) IsStart() bool { ret := m.ctrl.Call(m, "IsStart") @@ -66,6 +77,18 @@ func (mr *MockConsumerWorkerMockRecorder) IsStart() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsStart", reflect.TypeOf((*MockConsumerWorker)(nil).IsStart)) } +// IsHalt mocks base method +func (m *MockConsumerWorker) IsHalt() bool { + ret := m.ctrl.Call(m, "IsHalt") + ret0, _ := ret[0].(bool) + return ret0 +} + +// IsStart indicates an expected call of IsStart +func (mr *MockConsumerWorkerMockRecorder) IsHalt() *gomock.Call { + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "IsHalt", reflect.TypeOf((*MockConsumerWorker)(nil).IsHalt)) +} + // OnError mocks base method func (m *MockConsumerWorker) OnError(f func(error)) { m.ctrl.Call(m, "OnError", f) @@ -95,3 +118,8 @@ func (m *MockConsumerWorker) OnNotification(f func(*sarama_cluster.Notification) func (mr *MockConsumerWorkerMockRecorder) OnNotification(f interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "OnNotification", reflect.TypeOf((*MockConsumerWorker)(nil).OnNotification), f) } + +// OnHalt mocks base method +func (m *MockConsumerWorker) OnHalt(f func()) { + m.ctrl.Call(m, "OnHalt", f) +}