Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Websocket replacement for Eventstream #20543

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
github.com/google/uuid v1.3.0
github.com/gorilla/feeds v1.1.1
github.com/gorilla/sessions v1.2.1
github.com/gorilla/websocket v1.4.2
github.com/hashicorp/go-version v1.4.0
github.com/hashicorp/golang-lru v0.5.4
github.com/huandu/xstrings v1.3.2
Expand Down Expand Up @@ -194,7 +195,6 @@ require (
github.com/gorilla/handlers v1.5.1 // indirect
github.com/gorilla/mux v1.8.0 // indirect
github.com/gorilla/securecookie v1.1.1 // indirect
github.com/gorilla/websocket v1.4.2 // indirect
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 // indirect
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway v1.16.0 // indirect
Expand Down
21 changes: 19 additions & 2 deletions modules/context/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,31 @@ func (r *Response) Before(f func(ResponseWriter)) {
r.befores = append(r.befores, f)
}

// hijackerResponse wraps the Response to allow casting as a Hijacker if the underlying response is a hijacker
type hijackerResponse struct {
*Response
http.Hijacker
}

// NewResponse creates a response
func NewResponse(resp http.ResponseWriter) *Response {
func NewResponse(resp http.ResponseWriter) ResponseWriter {
if v, ok := resp.(*Response); ok {
return v
}
return &Response{
hijacker, ok := resp.(http.Hijacker)

response := &Response{
ResponseWriter: resp,
status: 0,
befores: make([]func(ResponseWriter), 0),
}
if ok {
// ensure that the Response we return is also hijackable
return hijackerResponse{
Response: response,
Hijacker: hijacker,
}
}

return response
}
235 changes: 235 additions & 0 deletions routers/web/events/websocket.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Copyright 2022 The Gitea Authors. All rights reserved.
// Use of this source code is governed by a MIT-style
// license that can be found in the LICENSE file.

package events

import (
"net/http"
"net/url"
"time"

"code.gitea.io/gitea/modules/context"
"code.gitea.io/gitea/modules/eventsource"
"code.gitea.io/gitea/modules/graceful"
"code.gitea.io/gitea/modules/json"
"code.gitea.io/gitea/modules/log"
"code.gitea.io/gitea/modules/setting"
"code.gitea.io/gitea/routers/web/auth"
"github.com/gorilla/websocket"
)

const (
writeWait = 10 * time.Second
pongWait = 60 * time.Second
pingPeriod = (pongWait * 9) / 10

maximumMessageSize = 2048
readMessageChanSize = 20 // <- I've put 20 here because it seems like a reasonable buffer but it may to increase
)

type readMessage struct {
messageType int
message []byte
err error
}

// Events listens for events
func Websocket(ctx *context.Context) {
upgrader := websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: func(r *http.Request) bool {
origin := r.Header["Origin"]
if len(origin) == 0 {
return true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return true
return false

Origin header will be present even on first-party requests, so we can require it.

}
u, err := url.Parse(origin[0])
if err != nil {
return false
}
appURLURL, err := url.Parse(setting.AppURL)
if err != nil {
return true
}

return u.Host == appURLURL.Host
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this is safe, it should probably be stricter and include protocol and port as well. Thought we could also lean more on the cookie header. When the cookie has SameSite=strict, it could technically serve as the only authentification mechanism on the websocket. Thought, origin check is always good to have.

},
}

// Because http proxies will tend not to pass these headers
ctx.Req.Header.Add("Upgrade", "websocket")
ctx.Req.Header.Add("Connection", "Upgrade")

conn, err := upgrader.Upgrade(ctx.Resp, ctx.Req, nil)
if err != nil {
log.Error("Unable to upgrade due to error: %v", err)
return
}
defer conn.Close()

notify := ctx.Done()
shutdownCtx := graceful.GetManager().ShutdownContext()

eventChan := make(<-chan *eventsource.Event)
uid := int64(0)
unregister := func() {}
if ctx.IsSigned {
uid = ctx.Doer.ID
eventChan = eventsource.GetManager().Register(uid)
unregister = func() {
go func() {
eventsource.GetManager().Unregister(uid, eventChan)
// ensure the messageChan is closed
for {
_, ok := <-eventChan
if !ok {
break
}
}
}()
}
}
defer unregister()

readMessageChan := make(chan readMessage, readMessageChanSize)
go readMessagesFromConnToChan(conn, readMessageChan)

pingTicker := time.NewTicker(pingPeriod)

for {
select {
case <-notify:
return
case <-shutdownCtx.Done():
return
case <-pingTicker.C:
// ensure that we're not already cancelled
select {
case <-notify:
return
case <-shutdownCtx.Done():
return
default:
}

if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Error("unable to SetWriteDeadline: %v", err)
return
}
if err := conn.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
log.Error("unable to send PingMessage: %v", err)
return
}
case message, ok := <-readMessageChan:
if !ok {
break
}

// ensure that we're not already cancelled
select {
case <-notify:
return
case <-shutdownCtx.Done():
return
default:
}

// FIXME: HANDLE MESSAGES
log.Info("Got Message: %d:%s:%v", message.messageType, message.message, message.err)
case event, ok := <-eventChan:
if !ok {
break
}

// ensure that we're not already cancelled
select {
case <-notify:
return
case <-shutdownCtx.Done():
return
default:
}

// Handle events
if event.Name == "logout" {
if ctx.Session.ID() == event.Data {
event = &eventsource.Event{
Name: "logout",
Data: "here",
}
_ = writeEvent(conn, event)
go unregister()
auth.HandleSignOut(ctx)
break
}
// Replace the event - we don't want to expose the session ID to the user
event = &eventsource.Event{
Name: "logout",
Data: "elsewhere",
}
}
if err := writeEvent(conn, event); err != nil {
return
}
}
}
}

func readMessagesFromConnToChan(conn *websocket.Conn, messageChan chan readMessage) {
defer func() {
close(messageChan) // Please note: this has to be within a wrapping anonymous func otherwise it will be evaluated when creating the defer
Copy link
Member

@silverwind silverwind Sep 5, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Interesting, I didn't know this. Is this evaluation behaviour something specific to the close builtin?

_ = conn.Close()
}()
conn.SetReadLimit(maximumMessageSize)
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Error("unable to SetReadDeadline: %v", err)
return
}
conn.SetPongHandler(func(string) error {
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Error("unable to SetReadDeadline: %v", err)
}
return nil
})

for {
messageType, message, err := conn.ReadMessage()
messageChan <- readMessage{
messageType: messageType,
message: message,
err: err,
}
if err != nil {
// don't need to handle the error here as it is passed down the channel
return
}
if err := conn.SetReadDeadline(time.Now().Add(pongWait)); err != nil {
log.Error("unable to SetReadDeadline: %v", err)
return
}
}
}

func writeEvent(conn *websocket.Conn, event *eventsource.Event) error {
if err := conn.SetWriteDeadline(time.Now().Add(writeWait)); err != nil {
log.Error("unable to SetWriteDeadline: %v", err)
return err
}

w, err := conn.NextWriter(websocket.TextMessage)
if err != nil {
log.Error("Unable to get writer for websocket %v", err)
return err
}

if err := json.NewEncoder(w).Encode(event); err != nil {
log.Error("Unable to create encoder for %v %v", event, err)
return err
}
if err := w.Close(); err != nil {
log.Warn("Unable to close writer for websocket %v", err)
return err
}
return nil
}
1 change: 1 addition & 0 deletions routers/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -365,6 +365,7 @@ func RegisterRoutes(m *web.Route) {
}, reqSignOut)

m.Any("/user/events", routing.MarkLongPolling, events.Events)
m.Any("/user/websocket", routing.MarkLongPolling, events.Websocket)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this cookie-authenticated? If not, it should be.


m.Group("/login/oauth", func() {
m.Get("/authorize", bindIgnErr(forms.AuthorizationForm{}), auth.AuthorizeOAuth)
Expand Down
6 changes: 5 additions & 1 deletion web_src/js/features/eventsource.sharedworker.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,13 @@ class Source {
if (this.listening[eventType]) return;
this.listening[eventType] = true;
this.eventSource.addEventListener(eventType, (event) => {
let data;
if (event.data) {
data = JSON.parse(event.data);
}
this.notifyClients({
type: eventType,
data: event.data
data
});
});
}
Expand Down
41 changes: 25 additions & 16 deletions web_src/js/features/notification.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,17 @@ export function initNotificationsTable() {
});
}

async function receiveUpdateCount(event) {
async function receiveUpdateCount(data, document) {
try {
const data = JSON.parse(event.data);

const notificationCount = document.querySelector('.notification_count');
if (data.Count > 0) {
notificationCount.classList.remove('hidden');
} else {
notificationCount.classList.add('hidden');
const notificationCounts = document.querySelectorAll('.notification_count');
for (const count of notificationCounts) {
count.classList.toggle('hidden', data.Count === 0);
count.textContent = `${data.Count}`;
}

notificationCount.textContent = `${data.Count}`;
await updateNotificationTable();
} catch (error) {
console.error(error, event);
console.error(error, data);
}
}

Expand All @@ -49,26 +45,39 @@ export function initNotificationCount() {
return;
}

if (notificationSettings.EventSourceUpdateTime > 0 && !!window.EventSource && window.SharedWorker) {
let worker;
let workerUrl;

if (notificationSettings.EventSourceUpdateTime > 0 && !!window.WebSocket && window.SharedWorker) {
// Try to connect to the event source via the shared worker first
worker = new SharedWorker(`${__webpack_public_path__}js/websocket.sharedworker.js`, 'notification-worker');
workerUrl = `${window.location.origin}${appSubUrl}/user/websocket`;
} else if (notificationSettings.EventSourceUpdateTime > 0 && !!window.EventSource && window.SharedWorker) {
// Try to connect to the event source via the shared worker first
const worker = new SharedWorker(`${__webpack_public_path__}js/eventsource.sharedworker.js`, 'notification-worker');
worker = new SharedWorker(`${__webpack_public_path__}js/eventsource.sharedworker.js`, 'notification-worker');
workerUrl = `${window.location.origin}${appSubUrl}/user/events`;
}

const currentDocument = document;

if (worker) {
worker.addEventListener('error', (event) => {
console.error(event);
console.error('error from listener: ', event);
});
worker.port.addEventListener('messageerror', () => {
console.error('Unable to deserialize message');
});
worker.port.postMessage({
type: 'start',
url: `${window.location.origin}${appSubUrl}/user/events`,
url: workerUrl,
});
worker.port.addEventListener('message', (event) => {
if (!event.data || !event.data.type) {
console.error(event);
console.error('Unexpected event:', event);
return;
}
if (event.data.type === 'notification-count') {
const _promise = receiveUpdateCount(event.data);
const _promise = receiveUpdateCount(event.data.data, currentDocument);
} else if (event.data.type === 'error') {
console.error(event.data);
} else if (event.data.type === 'logout') {
Expand Down
Loading