Skip to content

Commit

Permalink
Merge pull request #1 from apache/feature/dubbo-2.7.5
Browse files Browse the repository at this point in the history
Feature/dubbo 2.7.5
  • Loading branch information
lzp0412 authored Apr 22, 2020
2 parents a594afd + 370681a commit ad18357
Show file tree
Hide file tree
Showing 47 changed files with 1,919 additions and 241 deletions.
2 changes: 2 additions & 0 deletions common/constant/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
VERSION_KEY = "version"
INTERFACE_KEY = "interface"
PATH_KEY = "path"
PROTOCOL_KEY = "protocol"
SERVICE_KEY = "service"
METHODS_KEY = "methods"
TIMEOUT_KEY = "timeout"
Expand All @@ -40,6 +41,7 @@ const (
TOKEN_KEY = "token"
LOCAL_ADDR = "local-addr"
REMOTE_ADDR = "remote-addr"
PATH_SEPARATOR = "/"
DUBBO_KEY = "dubbo"
RELEASE_KEY = "release"
ANYHOST_KEY = "anyhost"
Expand Down
62 changes: 62 additions & 0 deletions common/extension/event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package extension

import (
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/observer"
)

var (
globalEventDispatcher observer.EventDispatcher
initEventListeners []observer.EventListener
)

var (
dispatchers = make(map[string]func() observer.EventDispatcher, 8)
)

// SetEventDispatcher by name
func SetEventDispatcher(name string, v func() observer.EventDispatcher) {
dispatchers[name] = v
}

// SetAndInitGlobalDispatcher
func SetAndInitGlobalDispatcher(name string) {
if len(name) == 0 {
name = "direct"
}
if globalEventDispatcher != nil {
logger.Warnf("EventDispatcher already init. It will be replaced")
}
if dp, ok := dispatchers[name]; !ok || dp == nil {
panic("EventDispatcher for " + name + " is not existing, make sure you have import the package.")
}
globalEventDispatcher = dispatchers[name]()
globalEventDispatcher.AddEventListeners(initEventListeners)
}

// GetGlobalDispatcher
func GetGlobalDispatcher() observer.EventDispatcher {
return globalEventDispatcher
}

// AddEventListener it will be added in global event dispatcher
func AddEventListener(listener observer.EventListener) {
initEventListeners = append(initEventListeners, listener)
}
8 changes: 4 additions & 4 deletions common/extension/metadata_report_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,20 @@
package extension

import (
"github.com/apache/dubbo-go/metadata"
"github.com/apache/dubbo-go/metadata/report/factory"
)

var (
metaDataReportFactories = make(map[string]func() metadata.MetadataReportFactory, 8)
metaDataReportFactories = make(map[string]func() factory.MetadataReportFactory, 8)
)

// SetMetadataReportFactory ...
func SetMetadataReportFactory(name string, v func() metadata.MetadataReportFactory) {
func SetMetadataReportFactory(name string, v func() factory.MetadataReportFactory) {
metaDataReportFactories[name] = v
}

// GetMetadataReportFactory ...
func GetMetadataReportFactory(name string) metadata.MetadataReportFactory {
func GetMetadataReportFactory(name string) factory.MetadataReportFactory {
if metaDataReportFactories[name] == nil {
panic("metadata report for " + name + " is not existing, make sure you have import the package.")
}
Expand Down
64 changes: 64 additions & 0 deletions common/observer/dispatcher/direct_event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dispatcher

import (
"reflect"
)

import (
"github.com/apache/dubbo-go/common/extension"
"github.com/apache/dubbo-go/common/logger"
"github.com/apache/dubbo-go/common/observer"
)

func init() {
extension.SetEventDispatcher("direct", NewDirectEventDispatcher)
}

// DirectEventDispatcher is align with DirectEventDispatcher interface in Java.
// it's the top abstraction
// Align with 2.7.5
// Dispatcher event to listener direct
type DirectEventDispatcher struct {
observer.BaseListenable
}

// NewDirectEventDispatcher ac constructor of DirectEventDispatcher
func NewDirectEventDispatcher() observer.EventDispatcher {
return &DirectEventDispatcher{}
}

// Dispatch event directly
func (ded *DirectEventDispatcher) Dispatch(event observer.Event) {
if event == nil {
logger.Warnf("[DirectEventDispatcher] dispatch event nil")
return
}
eventType := reflect.TypeOf(event).Elem()
value, loaded := ded.ListenersCache.Load(eventType)
if !loaded {
return
}
listenersSlice := value.([]observer.EventListener)
for _, listener := range listenersSlice {
if err := listener.OnEvent(event); err != nil {
logger.Warnf("[DirectEventDispatcher] dispatch event error:%v", err)
}
}
}
75 changes: 75 additions & 0 deletions common/observer/dispatcher/direct_event_dispatcher_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package dispatcher

import (
"fmt"
"reflect"
"testing"
)

import (
"github.com/apache/dubbo-go/common/observer"
)

func TestDirectEventDispatcher_Dispatch(t *testing.T) {
ded := NewDirectEventDispatcher()
ded.AddEventListener(&TestEventListener{})
ded.AddEventListener(&TestEventListener1{})
ded.Dispatch(&TestEvent{})
ded.Dispatch(nil)
}

type TestEvent struct {
observer.BaseEvent
}

type TestEventListener struct {
observer.BaseListenable
observer.EventListener
}

func (tel *TestEventListener) OnEvent(e observer.Event) error {
fmt.Println("TestEventListener")
return nil
}

func (tel *TestEventListener) GetPriority() int {
return -1
}

func (tel *TestEventListener) GetEventType() reflect.Type {
return reflect.TypeOf(&TestEvent{})
}

type TestEventListener1 struct {
observer.EventListener
}

func (tel *TestEventListener1) OnEvent(e observer.Event) error {
fmt.Println("TestEventListener1")
return nil
}

func (tel *TestEventListener1) GetPriority() int {
return 1
}

func (tel *TestEventListener1) GetEventType() reflect.Type {
return reflect.TypeOf(TestEvent{})
}
66 changes: 66 additions & 0 deletions common/observer/event.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package observer

import (
"fmt"
"math/rand"
"time"
)

func init() {
rand.Seed(time.Now().UnixNano())
}

// Event is align with Event interface in Java.
// it's the top abstraction
// Align with 2.7.5
type Event interface {
fmt.Stringer
GetSource() interface{}
GetTimestamp() time.Time
}

// BaseEvent is the base implementation of Event
// You should never use it directly
type BaseEvent struct {
Source interface{}
Timestamp time.Time
}

// GetSource return the source
func (b *BaseEvent) GetSource() interface{} {
return b.Source
}

// GetTimestamp return the Timestamp when the event is created
func (b *BaseEvent) GetTimestamp() time.Time {
return b.Timestamp
}

// String return a human readable string representing this event
func (b *BaseEvent) String() string {
return fmt.Sprintf("BaseEvent[source = %#v]", b.Source)
}

func newBaseEvent(source interface{}) *BaseEvent {
return &BaseEvent{
Source: source,
Timestamp: time.Now(),
}
}
27 changes: 27 additions & 0 deletions common/observer/event_dispatcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package observer

// EventDispatcher is align with EventDispatcher interface in Java.
// it's the top abstraction
// Align with 2.7.5
type EventDispatcher interface {
Listenable
// Dispatch event
Dispatch(event Event)
}
48 changes: 48 additions & 0 deletions common/observer/event_listener.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package observer

import (
"reflect"
)

import (
gxsort "github.com/dubbogo/gost/sort"
)

// EventListener is an new interface used to align with dubbo 2.7.5
// It contains the Prioritized means that the listener has its priority
type EventListener interface {
gxsort.Prioritizer
// OnEvent handle this event
OnEvent(e Event) error
// GetEventType listen which event type
GetEventType() reflect.Type
}

// ConditionalEventListener only handle the event which it can handle
type ConditionalEventListener interface {
EventListener
// Accept will make the decision whether it should handle this event
Accept(e Event) bool
}

// TODO (implement ConditionalEventListener)
type ServiceInstancesChangedListener struct {
ServiceName string
}
Loading

0 comments on commit ad18357

Please sign in to comment.