Skip to content

Commit

Permalink
router: migrate connection automatically during scale-in and scale-out (
Browse files Browse the repository at this point in the history
  • Loading branch information
djshow832 authored May 19, 2022
1 parent d0f9096 commit 07234b4
Show file tree
Hide file tree
Showing 5 changed files with 339 additions and 96 deletions.
11 changes: 6 additions & 5 deletions pkg/proxy/driver/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,15 +51,16 @@ type Router interface {

type RedirectableConn interface {
SetEventReceiver(receiver ConnEventReceiver)
Redirect(addr string) error
Redirect(addr string)
ConnectionID() uint64
}

type ConnEventReceiver interface {
AddConn(addr string, conn RedirectableConn)
BeginRedirect(from, to string, conn RedirectableConn)
FinishRedirect(from, to string, conn RedirectableConn)
CloseConn(addr string, conn RedirectableConn)
OnConnCreated(addr string, conn RedirectableConn)
OnRedirectBegin(from, to string, conn RedirectableConn)
OnRedirectSucceed(from, to string, conn RedirectableConn)
OnRedirectFail(from, to string, conn RedirectableConn)
OnConnClosed(addr string, conn RedirectableConn)
}

type Stmt interface {
Expand Down
29 changes: 16 additions & 13 deletions pkg/proxy/router/backend_observer.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,19 @@ const (
healthCheckTimeout = 1 * time.Second
)

type onBackendChanged func(removed, added map[string]*BackendInfo)
type BackendEventReceiver interface {
OnBackendChanged(removed, added map[string]*BackendInfo)
}

type BackendInfo struct {
*infosync.TopologyInfo
status BackendStatus
}

type BackendObserver struct {
backendInfo map[string]*BackendInfo
onBackendChanged onBackendChanged
cancelFunc context.CancelFunc
backendInfo map[string]*BackendInfo
eventReceiver BackendEventReceiver
cancelFunc context.CancelFunc
}

var globalEtcdClient atomic.Value
Expand Down Expand Up @@ -124,16 +126,17 @@ func GetEtcdClient() *clientv3.Client {
return etcdClient.(*clientv3.Client)
}

func NewBackendObserver(onBackendChanged onBackendChanged) (*BackendObserver, error) {
bo := &BackendObserver{
backendInfo: make(map[string]*BackendInfo),
onBackendChanged: onBackendChanged,
func NewBackendObserver(eventReceiver BackendEventReceiver) (*BackendObserver, error) {
if GetEtcdClient() == nil {
return nil, nil
}
if GetEtcdClient() != nil {
childCtx, cancelFunc := context.WithCancel(context.Background())
bo.cancelFunc = cancelFunc
go bo.observe(childCtx)
bo := &BackendObserver{
backendInfo: make(map[string]*BackendInfo),
eventReceiver: eventReceiver,
}
childCtx, cancelFunc := context.WithCancel(context.Background())
bo.cancelFunc = cancelFunc
go bo.observe(childCtx)
return bo, nil
}

Expand Down Expand Up @@ -251,7 +254,7 @@ func (bo *BackendObserver) notifyIfChanged(backendInfo map[string]*BackendInfo)
}
}
if len(removedBackends) > 0 || len(addedBackends) > 0 {
bo.onBackendChanged(removedBackends, addedBackends)
bo.eventReceiver.OnBackendChanged(removedBackends, addedBackends)
}
bo.backendInfo = backendInfo
}
Expand Down
Loading

0 comments on commit 07234b4

Please sign in to comment.