diff --git a/.github/workflows/main.yaml b/.github/workflows/main.yaml index 0f70282..9483e3a 100644 --- a/.github/workflows/main.yaml +++ b/.github/workflows/main.yaml @@ -19,28 +19,33 @@ jobs: GO111MODULE: on steps: - name: Check out code - uses: actions/checkout@v2 + uses: actions/checkout@8f4b7f84864484a7bf31766abe9204da3cbe65b3 # v3.5.0 - name: Configure git for private modules env: TOKEN: ${{ secrets.PROXY_PIPELINES_GITHUB_TOKEN }} run: git config --global url."https://oauth2:${TOKEN}@github.com/gateway-fm".insteadOf "https://github.com/gateway-fm" - - name: Setup Golang - uses: actions/setup-go@v2 + # Required by golangci-lint, according to docs + - name: Setup Golang Environment + uses: actions/setup-go@4d34df0c2316fe8122ab82dc22947d607c0c91f9 # v4.0.0 with: - go-version: "1.19.*" # The Go version to download (if necessary) and use. + go-version-file: go.mod + cache: true - run: | go version go clean -modcache - go mod tidy + go mod tidy - name: Run linter - uses: golangci/golangci-lint-action@v2 + uses: golangci/golangci-lint-action@v3 with: - version: latest + version: v1.52 + skip-cache: true skip-build-cache: true skip-pkg-cache: true + skip-go-installation: true + args: --timeout=10m - name: Test run: go test ./... diff --git a/discovery/consul_discovery.go b/discovery/consul_discovery.go index b4d9d30..199cae3 100644 --- a/discovery/consul_discovery.go +++ b/discovery/consul_discovery.go @@ -67,5 +67,10 @@ func (d *ConsulDiscovery) createServiceFromConsul(srv *consul.ServiceEntry) serv addr := d.transport.FormatAddress(srv.Service.Address) logger.Log().Debug(fmt.Sprintf("discovered new service: %s", addr)) - return service.NewService(fmt.Sprintf("%s:%d", addr, srv.Service.Port), srv.Node.Node) + tagsMap := make(map[string]struct{}) + for _, t := range srv.Service.Tags { + tagsMap[t] = struct{}{} + } + + return service.NewService(fmt.Sprintf("%s:%d", addr, srv.Service.Port), srv.Node.Node, tagsMap) } diff --git a/discovery/manual_discovery.go b/discovery/manual_discovery.go index 1c90513..59fab2b 100644 --- a/discovery/manual_discovery.go +++ b/discovery/manual_discovery.go @@ -21,7 +21,7 @@ func NewManualDiscovery(transport TransportProtocol, addrs ...string) (IServiceD // blockchain addresses for requested networks func (d *ManualDiscovery) Discover(string) (nodes []service.IService, err error) { for _, n := range d.addresses { - nodes = append(nodes, service.NewService(d.transport.FormatAddress(n), "")) + nodes = append(nodes, service.NewService(d.transport.FormatAddress(n), "", nil)) } return } diff --git a/service/service.go b/service/service.go index 30e7156..7317b0a 100644 --- a/service/service.go +++ b/service/service.go @@ -21,6 +21,8 @@ type IService interface { // NodeName return node name from discovery NodeName() string + + Tags() map[string]struct{} } // TODO split address field to host and port @@ -28,19 +30,21 @@ type IService interface { // BaseService represent basic service // model implementation type BaseService struct { - id string // service unique id - sha256(address) - status Status // service current status - address string // service address to connect - nodeName string // node name from discovery + id string // service unique id - sha256(address) + status Status // service current status + address string // service address to connect + nodeName string // node name from discovery + tags map[string]struct{} // service tags } // NewService create new BaseService with address and discovery -func NewService(address, nodeName string) IService { +func NewService(address, nodeName string, tags map[string]struct{}) IService { return &BaseService{ id: generateServiceID(address), status: StatusUnHealthy, address: address, nodeName: nodeName, + tags: tags, } } @@ -75,6 +79,10 @@ func (n *BaseService) SetStatus(status Status) { n.status = status } +func (n *BaseService) Tags() map[string]struct{} { + return n.tags +} + // generateServiceID create BaseService unique id by // hashing given address string func generateServiceID(addr string) string { diff --git a/services_list_test.go b/services_list_test.go index 60f8d60..9e459ae 100644 --- a/services_list_test.go +++ b/services_list_test.go @@ -10,7 +10,7 @@ import ( ) func newHealthyService(addr string) service.IService { - srv := service.NewService(addr, "") + srv := service.NewService(addr, "", nil) baseSrv := srv.(*service.BaseService) baseSrv.SetStatus(service.StatusHealthy) diff --git a/services_pool.go b/services_pool.go index 81280cd..6de7ed6 100644 --- a/services_pool.go +++ b/services_pool.go @@ -15,7 +15,7 @@ import ( type IServicesPool interface { // Start run service pool discovering // and healthchecks loops - Start() + Start(healthchecks bool) // DiscoverServices discover all visible active // services via service-discovery @@ -34,6 +34,10 @@ type IServicesPool interface { // Close Stop all service pool Close() + + SetOnNewDiscCallback(f OnNewDiscCallback) + + SetOnDiscCompletedCallback(f func()) } // ServicesPool holds information about reachable @@ -50,6 +54,10 @@ type ServicesPool struct { stop chan struct{} MutationFnc func(srv service.IService) (service.IService, error) + + onNewDiscCallback OnNewDiscCallback + + onDiscCompletedCallback func() } // ServicesPoolsOpts is options that needs @@ -65,6 +73,8 @@ type ServicesPoolsOpts struct { CustomList IServicesList } +type OnNewDiscCallback func(srv service.IService) error + // NewServicesPool create new Services Pool // based on given params func NewServicesPool(opts *ServicesPoolsOpts) IServicesPool { @@ -83,18 +93,17 @@ func NewServicesPool(opts *ServicesPoolsOpts) IServicesPool { } - if err := pool.DiscoverServices(); err != nil { - logger.Log().Error(fmt.Errorf("error discovering %s services: %w", pool.name, err).Error()) - } - return pool } // Start run service pool discovering // and healthchecks loops -func (p *ServicesPool) Start() { +func (p *ServicesPool) Start(healthchecks bool) { go p.discoverServicesLoop() - go p.list.HealthChecksLoop() + + if healthchecks { + go p.list.HealthChecksLoop() + } } // DiscoverServices discover all visible active @@ -123,6 +132,12 @@ func (p *ServicesPool) DiscoverServices() error { } p.list.Add(mutatedService) + + if p.onNewDiscCallback != nil { + if err := p.onNewDiscCallback(mutatedService); err != nil { + logger.Log().Warn(fmt.Sprintf("callback on new discovered service: %s", err)) + } + } } return nil } @@ -151,6 +166,22 @@ func (p *ServicesPool) Close() { close(p.stop) } +func (p *ServicesPool) SetOnNewDiscCallback(f OnNewDiscCallback) { + if p == nil { + return + } + + p.onNewDiscCallback = f +} + +func (p *ServicesPool) SetOnDiscCompletedCallback(f func()) { + if p == nil { + return + } + + p.onDiscCompletedCallback = f +} + // discoverServicesLoop spawn discovery for // services periodically func (p *ServicesPool) discoverServicesLoop() { @@ -172,6 +203,10 @@ func (p *ServicesPool) discoverServicesLoop() { if !onceShuffled { p.list.Shuffle() onceShuffled = true + + if p.onDiscCompletedCallback != nil { + p.onDiscCompletedCallback() + } } Sleep(p.discoveryInterval, p.stop)