Skip to content

Commit

Permalink
Merge pull request #19 from gateway-fm/feat/service-tags
Browse files Browse the repository at this point in the history
feat: service tags
  • Loading branch information
dmitriyselivanov authored May 16, 2023
2 parents f53c191 + 5db5139 commit 3f24415
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 22 deletions.
19 changes: 12 additions & 7 deletions .github/workflows/main.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,33 @@ jobs:
GO111MODULE: on
steps:
- name: Check out code
uses: actions/checkout@v2
uses: actions/checkout@8f4b7f84864484a7bf31766abe9204da3cbe65b3 # v3.5.0

- name: Configure git for private modules
env:
TOKEN: ${{ secrets.PROXY_PIPELINES_GITHUB_TOKEN }}
run: git config --global url."https://oauth2:${TOKEN}@github.com/gateway-fm".insteadOf "https://github.com/gateway-fm"

- name: Setup Golang
uses: actions/setup-go@v2
# Required by golangci-lint, according to docs
- name: Setup Golang Environment
uses: actions/setup-go@4d34df0c2316fe8122ab82dc22947d607c0c91f9 # v4.0.0
with:
go-version: "1.19.*" # The Go version to download (if necessary) and use.
go-version-file: go.mod
cache: true
- run: |
go version
go clean -modcache
go mod tidy
go mod tidy
- name: Run linter
uses: golangci/golangci-lint-action@v2
uses: golangci/golangci-lint-action@v3
with:
version: latest
version: v1.52
skip-cache: true
skip-build-cache: true
skip-pkg-cache: true
skip-go-installation: true
args: --timeout=10m

- name: Test
run: go test ./...
Expand Down
7 changes: 6 additions & 1 deletion discovery/consul_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,5 +67,10 @@ func (d *ConsulDiscovery) createServiceFromConsul(srv *consul.ServiceEntry) serv
addr := d.transport.FormatAddress(srv.Service.Address)
logger.Log().Debug(fmt.Sprintf("discovered new service: %s", addr))

return service.NewService(fmt.Sprintf("%s:%d", addr, srv.Service.Port), srv.Node.Node)
tagsMap := make(map[string]struct{})
for _, t := range srv.Service.Tags {
tagsMap[t] = struct{}{}
}

return service.NewService(fmt.Sprintf("%s:%d", addr, srv.Service.Port), srv.Node.Node, tagsMap)
}
2 changes: 1 addition & 1 deletion discovery/manual_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func NewManualDiscovery(transport TransportProtocol, addrs ...string) (IServiceD
// blockchain addresses for requested networks
func (d *ManualDiscovery) Discover(string) (nodes []service.IService, err error) {
for _, n := range d.addresses {
nodes = append(nodes, service.NewService(d.transport.FormatAddress(n), ""))
nodes = append(nodes, service.NewService(d.transport.FormatAddress(n), "", nil))
}
return
}
18 changes: 13 additions & 5 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,26 +21,30 @@ type IService interface {

// NodeName return node name from discovery
NodeName() string

Tags() map[string]struct{}
}

// TODO split address field to host and port

// BaseService represent basic service
// model implementation
type BaseService struct {
id string // service unique id - sha256(address)
status Status // service current status
address string // service address to connect
nodeName string // node name from discovery
id string // service unique id - sha256(address)
status Status // service current status
address string // service address to connect
nodeName string // node name from discovery
tags map[string]struct{} // service tags
}

// NewService create new BaseService with address and discovery
func NewService(address, nodeName string) IService {
func NewService(address, nodeName string, tags map[string]struct{}) IService {
return &BaseService{
id: generateServiceID(address),
status: StatusUnHealthy,
address: address,
nodeName: nodeName,
tags: tags,
}
}

Expand Down Expand Up @@ -75,6 +79,10 @@ func (n *BaseService) SetStatus(status Status) {
n.status = status
}

func (n *BaseService) Tags() map[string]struct{} {
return n.tags
}

// generateServiceID create BaseService unique id by
// hashing given address string
func generateServiceID(addr string) string {
Expand Down
2 changes: 1 addition & 1 deletion services_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func newHealthyService(addr string) service.IService {
srv := service.NewService(addr, "")
srv := service.NewService(addr, "", nil)

baseSrv := srv.(*service.BaseService)
baseSrv.SetStatus(service.StatusHealthy)
Expand Down
49 changes: 42 additions & 7 deletions services_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
type IServicesPool interface {
// Start run service pool discovering
// and healthchecks loops
Start()
Start(healthchecks bool)

// DiscoverServices discover all visible active
// services via service-discovery
Expand All @@ -34,6 +34,10 @@ type IServicesPool interface {

// Close Stop all service pool
Close()

SetOnNewDiscCallback(f OnNewDiscCallback)

SetOnDiscCompletedCallback(f func())
}

// ServicesPool holds information about reachable
Expand All @@ -50,6 +54,10 @@ type ServicesPool struct {
stop chan struct{}

MutationFnc func(srv service.IService) (service.IService, error)

onNewDiscCallback OnNewDiscCallback

onDiscCompletedCallback func()
}

// ServicesPoolsOpts is options that needs
Expand All @@ -65,6 +73,8 @@ type ServicesPoolsOpts struct {
CustomList IServicesList
}

type OnNewDiscCallback func(srv service.IService) error

// NewServicesPool create new Services Pool
// based on given params
func NewServicesPool(opts *ServicesPoolsOpts) IServicesPool {
Expand All @@ -83,18 +93,17 @@ func NewServicesPool(opts *ServicesPoolsOpts) IServicesPool {

}

if err := pool.DiscoverServices(); err != nil {
logger.Log().Error(fmt.Errorf("error discovering %s services: %w", pool.name, err).Error())
}

return pool
}

// Start run service pool discovering
// and healthchecks loops
func (p *ServicesPool) Start() {
func (p *ServicesPool) Start(healthchecks bool) {
go p.discoverServicesLoop()
go p.list.HealthChecksLoop()

if healthchecks {
go p.list.HealthChecksLoop()
}
}

// DiscoverServices discover all visible active
Expand Down Expand Up @@ -123,6 +132,12 @@ func (p *ServicesPool) DiscoverServices() error {
}

p.list.Add(mutatedService)

if p.onNewDiscCallback != nil {
if err := p.onNewDiscCallback(mutatedService); err != nil {
logger.Log().Warn(fmt.Sprintf("callback on new discovered service: %s", err))
}
}
}
return nil
}
Expand Down Expand Up @@ -151,6 +166,22 @@ func (p *ServicesPool) Close() {
close(p.stop)
}

func (p *ServicesPool) SetOnNewDiscCallback(f OnNewDiscCallback) {
if p == nil {
return
}

p.onNewDiscCallback = f
}

func (p *ServicesPool) SetOnDiscCompletedCallback(f func()) {
if p == nil {
return
}

p.onDiscCompletedCallback = f
}

// discoverServicesLoop spawn discovery for
// services periodically
func (p *ServicesPool) discoverServicesLoop() {
Expand All @@ -172,6 +203,10 @@ func (p *ServicesPool) discoverServicesLoop() {
if !onceShuffled {
p.list.Shuffle()
onceShuffled = true

if p.onDiscCompletedCallback != nil {
p.onDiscCompletedCallback()
}
}

Sleep(p.discoveryInterval, p.stop)
Expand Down

0 comments on commit 3f24415

Please sign in to comment.