Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Indexing error due to multiple removeHandler #55

Open
Jjjpan opened this issue Jun 17, 2022 · 3 comments
Open

Indexing error due to multiple removeHandler #55

Jjjpan opened this issue Jun 17, 2022 · 3 comments

Comments

@Jjjpan
Copy link

Jjjpan commented Jun 17, 2022

In line 141 func Publish

// Publish executes callback defined for a topic. Any additional argument will be transferred to the callback.
func (bus *EventBus) Publish(topic string, args ...interface{}) {
	bus.lock.Lock() // will unlock if handler is not found or always after setUpPublish
	defer bus.lock.Unlock()
	if handlers, ok := bus.handlers[topic]; ok && 0 < len(handlers) {
		// Handlers slice may be changed by removeHandler and Unsubscribe during iteration,
		// so make a copy and iterate the copied slice.
		copyHandlers := make([]*eventHandler, len(handlers))
		copy(copyHandlers, handlers)
		for i, handler := range copyHandlers {
			if handler.flagOnce {
				bus.removeHandler(topic, i) // multiple operation causes indexing error
			}
			if !handler.async {
				bus.doPublish(handler, topic, args...)
			} else {
				bus.wg.Add(1)
				if handler.transactional {
					bus.lock.Unlock()
					handler.Lock()
					bus.lock.Lock()
				}
				go bus.doPublishAsync(handler, topic, args...)
			}
		}
	}
}

i and handler are ranged in copyHandlers while remove operation actions in bus.handlers. 🤨

@Jjjpan
Copy link
Author

Jjjpan commented Jun 17, 2022

In a test case of many subscribe and many subscribeOnce, bus.removeHandler tries to remove a handler with index out of range.

Add out-of-range outputs in event_bus.go:

func (bus *EventBus) removeHandler(topic string, idx int) {
	if _, ok := bus.handlers[topic]; !ok {
		return
	}
	l := len(bus.handlers[topic])

	if !(0 <= idx && idx < l) {
                fmt.Println("out of range")
		return
	}

	copy(bus.handlers[topic][idx:], bus.handlers[topic][idx+1:])
	bus.handlers[topic][l-1] = nil // or the zero value of T
	bus.handlers[topic] = bus.handlers[topic][:l-1]
}

Test case in event_bus_test.go:

func TestManySubscribeAndManySubscribe(t *testing.T) {
	bus := New()
	event := "topic"
	flag := 0
	fn := func() { flag += 1 }
	bus.SubscribeOnce(event, fn)
	bus.SubscribeOnce(event, fn)
	bus.Subscribe(event, fn)
	bus.Subscribe(event, fn)
	bus.SubscribeOnce(event, fn)
	bus.SubscribeOnce(event, fn)
	bus.Publish(event)

	if flag != 6 {
		t.Fail()
	}
}

Test result:

=== RUN   TestManySubscribeAndManySubscribe
remove out of range
remove out of range
--- PASS: TestManySubscribeAndManySubscribe (0.00s)
PASS
ok      eventbus/v1     (cached)

@Jjjpan
Copy link
Author

Jjjpan commented Jun 17, 2022

Would it be better to use offset to record the offset of index between handlers and copyHandlers.🙄

Maybe like this:


func (bus *EventBus) Publish(topic string, args ...interface{}) {
	bus.lock.Lock()
	defer bus.lock.Unlock()

	if topicHandlers, ok := bus.handlers[topic]; ok && len(topicHandlers) > 0 {
		copiedHandlers := make([]*eventHandler, len(topicHandlers))
		copy(copiedHandlers, topicHandlers)
		offset := 0
		for index, handler := range copiedHandlers {
			if handler.flagOnce {
				bus.removeHandler(topic, index+offset)
				offset--
			}
			if !handler.async {
				bus.doPublish(handler, topic, args...)
			} else {
				bus.wg.Add(1)
				if handler.transactional {
					bus.lock.Unlock()
					handler.lock.Lock()
					bus.lock.Lock()
				}
				go bus.doPublishAsync(handler, topic, args...)
			}
		}
	}
}

@varfrog
Copy link

varfrog commented Aug 9, 2022

This project seems to not be maintained any more. I'm just looking around the issues here. Maybe there's a maintained fork...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants