Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add heroes to the room summary #2373

Merged
merged 7 commits into from
Apr 26, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions syncapi/storage/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type Database interface {
GetStateDeltas(ctx context.Context, device *userapi.Device, r types.Range, userID string, stateFilter *gomatrixserverlib.StateFilter) ([]types.StateDelta, []string, error)
RoomIDsWithMembership(ctx context.Context, userID string, membership string) ([]string, error)
MembershipCount(ctx context.Context, roomID, membership string, pos types.StreamPosition) (int, error)
GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error)

RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error)

Expand Down
38 changes: 31 additions & 7 deletions syncapi/storage/postgres/memberships_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"database/sql"
"fmt"

"github.com/lib/pq"
"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
Expand Down Expand Up @@ -61,9 +63,13 @@ const selectMembershipCountSQL = "" +
" SELECT DISTINCT ON (room_id, user_id) room_id, user_id, membership FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 ORDER BY room_id, user_id, stream_pos DESC" +
") t WHERE t.membership = $3"

const selectHeroesSQL = "" +
"SELECT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership = ANY($3) LIMIT 5"

type membershipsStatements struct {
upsertMembershipStmt *sql.Stmt
selectMembershipCountStmt *sql.Stmt
selectHeroesStmt *sql.Stmt
}

func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
Expand All @@ -72,13 +78,11 @@ func NewPostgresMembershipsTable(db *sql.DB) (tables.Memberships, error) {
if err != nil {
return nil, err
}
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
return nil, err
}
if s.selectMembershipCountStmt, err = db.Prepare(selectMembershipCountSQL); err != nil {
return nil, err
}
return s, nil
return s, sqlutil.StatementList{
{&s.upsertMembershipStmt, upsertMembershipSQL},
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
{&s.selectHeroesStmt, selectHeroesSQL},
}.Prepare(db)
}

func (s *membershipsStatements) UpsertMembership(
Expand Down Expand Up @@ -108,3 +112,23 @@ func (s *membershipsStatements) SelectMembershipCount(
err = stmt.QueryRowContext(ctx, roomID, pos, membership).Scan(&count)
return
}

func (s *membershipsStatements) SelectHeroes(
ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string,
) (heroes []string, err error) {
stmt := sqlutil.TxStmt(txn, s.selectHeroesStmt)
var rows *sql.Rows
rows, err = stmt.QueryContext(ctx, roomID, userID, pq.StringArray(memberships))
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectHeroes: rows.close() failed")
var hero string
for rows.Next() {
if err = rows.Scan(&hero); err != nil {
return
}
heroes = append(heroes, hero)
}
return heroes, rows.Err()
}
4 changes: 4 additions & 0 deletions syncapi/storage/shared/syncserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,10 @@ func (d *Database) MembershipCount(ctx context.Context, roomID, membership strin
return d.Memberships.SelectMembershipCount(ctx, nil, roomID, membership, pos)
}

func (d *Database) GetRoomHeroes(ctx context.Context, roomID, userID string, memberships []string) ([]string, error) {
return d.Memberships.SelectHeroes(ctx, nil, roomID, userID, memberships)
}

func (d *Database) RecentEvents(ctx context.Context, roomID string, r types.Range, eventFilter *gomatrixserverlib.RoomEventFilter, chronologicalOrder bool, onlySyncEvents bool) ([]types.StreamEvent, bool, error) {
return d.OutputEvents.SelectRecentEvents(ctx, nil, roomID, r, eventFilter, chronologicalOrder, onlySyncEvents)
}
Expand Down
50 changes: 43 additions & 7 deletions syncapi/storage/sqlite3/memberships_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ import (
"context"
"database/sql"
"fmt"
"strings"

"github.com/matrix-org/dendrite/internal"
"github.com/matrix-org/dendrite/internal/sqlutil"
"github.com/matrix-org/dendrite/syncapi/storage/tables"
"github.com/matrix-org/dendrite/syncapi/types"
Expand Down Expand Up @@ -61,10 +63,14 @@ const selectMembershipCountSQL = "" +
" SELECT * FROM syncapi_memberships WHERE room_id = $1 AND stream_pos <= $2 GROUP BY user_id HAVING(max(stream_pos))" +
") t WHERE t.membership = $3"

const selectHeroesSQL = "" +
"SELECT DISTINCT user_id FROM syncapi_memberships WHERE room_id = $1 AND user_id != $2 AND membership IN ($3) LIMIT 5"

type membershipsStatements struct {
db *sql.DB
upsertMembershipStmt *sql.Stmt
selectMembershipCountStmt *sql.Stmt
selectHeroesStmt *sql.Stmt
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we have to prepare to do variadics, there isn't much point in preparing it up front. Normally I just leave these commented out so that it's obvious there's something but we don't waste the effort preparing a statement we never use.

}

func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
Expand All @@ -75,13 +81,11 @@ func NewSqliteMembershipsTable(db *sql.DB) (tables.Memberships, error) {
if err != nil {
return nil, err
}
if s.upsertMembershipStmt, err = db.Prepare(upsertMembershipSQL); err != nil {
return nil, err
}
if s.selectMembershipCountStmt, err = db.Prepare(selectMembershipCountSQL); err != nil {
return nil, err
}
return s, nil
return s, sqlutil.StatementList{
{&s.upsertMembershipStmt, upsertMembershipSQL},
{&s.selectMembershipCountStmt, selectMembershipCountSQL},
{&s.selectHeroesStmt, selectHeroesSQL},
}.Prepare(db)
}

func (s *membershipsStatements) UpsertMembership(
Expand Down Expand Up @@ -111,3 +115,35 @@ func (s *membershipsStatements) SelectMembershipCount(
err = stmt.QueryRowContext(ctx, roomID, pos, membership).Scan(&count)
return
}

func (s *membershipsStatements) SelectHeroes(
ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string,
) (heroes []string, err error) {
stmtSQL := strings.Replace(selectHeroesSQL, "($3)", sqlutil.QueryVariadicOffset(len(memberships), 2), 1)
stmt, err := s.db.Prepare(stmtSQL)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need to close this too, otherwise the prepared statements leak.

if err != nil {
return
}
params := []interface{}{
roomID, userID,
}
for _, membership := range memberships {
params = append(params, membership)
}

stmt = sqlutil.TxStmt(txn, stmt)
var rows *sql.Rows
rows, err = stmt.QueryContext(ctx, params...)
if err != nil {
return
}
defer internal.CloseAndLogIfError(ctx, rows, "SelectHeroes: rows.close() failed")
var hero string
for rows.Next() {
if err = rows.Scan(&hero); err != nil {
return
}
heroes = append(heroes, hero)
}
return heroes, rows.Err()
}
1 change: 1 addition & 0 deletions syncapi/storage/tables/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ type Receipts interface {
type Memberships interface {
UpsertMembership(ctx context.Context, txn *sql.Tx, event *gomatrixserverlib.HeaderedEvent, streamPos, topologicalPos types.StreamPosition) error
SelectMembershipCount(ctx context.Context, txn *sql.Tx, roomID, membership string, pos types.StreamPosition) (count int, err error)
SelectHeroes(ctx context.Context, txn *sql.Tx, roomID, userID string, memberships []string) (heroes []string, err error)
}

type NotificationData interface {
Expand Down
56 changes: 45 additions & 11 deletions syncapi/streams/stream_pdu.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"
"database/sql"
"fmt"
"sort"
"sync"
"time"

"github.com/matrix-org/dendrite/internal/caching"
roomserverAPI "github.com/matrix-org/dendrite/roomserver/api"
"github.com/matrix-org/dendrite/syncapi/types"
userapi "github.com/matrix-org/dendrite/userapi/api"
"github.com/matrix-org/gomatrixserverlib"
"github.com/tidwall/gjson"
"go.uber.org/atomic"
)

Expand All @@ -30,6 +33,7 @@ type PDUStreamProvider struct {
workers atomic.Int32
// userID+deviceID -> lazy loading cache
lazyLoadCache *caching.LazyLoadCache
rsAPI roomserverAPI.RoomserverInternalAPI
}

func (p *PDUStreamProvider) worker() {
Expand Down Expand Up @@ -290,16 +294,11 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
}
}

// Work out how many members are in the room.
joinedCount, _ := p.DB.MembershipCount(ctx, delta.RoomID, gomatrixserverlib.Join, latestPosition)
invitedCount, _ := p.DB.MembershipCount(ctx, delta.RoomID, gomatrixserverlib.Invite, latestPosition)

switch delta.Membership {
case gomatrixserverlib.Join:
jr := types.NewJoinResponse()
if hasMembershipChange {
jr.Summary.JoinedMemberCount = &joinedCount
jr.Summary.InvitedMemberCount = &invitedCount
p.addRoomSummary(ctx, jr, delta.RoomID, device.UserID, latestPosition)
}
jr.Timeline.PrevBatch = &prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
Expand Down Expand Up @@ -332,6 +331,45 @@ func (p *PDUStreamProvider) addRoomDeltaToResponse(
return latestPosition, nil
}

func (p *PDUStreamProvider) addRoomSummary(ctx context.Context, jr *types.JoinResponse, roomID, userID string, latestPosition types.StreamPosition) {
// Work out how many members are in the room.
joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, latestPosition)
invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, latestPosition)

jr.Summary.JoinedMemberCount = &joinedCount
jr.Summary.InvitedMemberCount = &invitedCount

fetchStates := []gomatrixserverlib.StateKeyTuple{
{EventType: gomatrixserverlib.MRoomName},
{EventType: gomatrixserverlib.MRoomCanonicalAlias},
}
// Check if the room has a name or a canonical alias
latestState := &roomserverAPI.QueryLatestEventsAndStateResponse{}
err := p.rsAPI.QueryLatestEventsAndState(ctx, &roomserverAPI.QueryLatestEventsAndStateRequest{StateToFetch: fetchStates, RoomID: roomID}, latestState)
if err != nil {
return
}
// Check if the room has a name or canonical alias, if so, return.
for _, ev := range latestState.StateEvents {
switch ev.Type() {
case gomatrixserverlib.MRoomName:
if gjson.GetBytes(ev.Content(), "name").Str != "" {
return
}
case gomatrixserverlib.MRoomCanonicalAlias:
if gjson.GetBytes(ev.Content(), "alias").Str != "" {
return
}
}
}
heroes, err := p.DB.GetRoomHeroes(ctx, roomID, userID, []string{"join", "invite"})
if err != nil {
return
}
sort.Strings(heroes)
jr.Summary.Heroes = heroes
}

func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
ctx context.Context,
roomID string,
Expand Down Expand Up @@ -416,9 +454,7 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
prevBatch.Decrement()
}

// Work out how many members are in the room.
joinedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Join, r.From)
invitedCount, _ := p.DB.MembershipCount(ctx, roomID, gomatrixserverlib.Invite, r.From)
p.addRoomSummary(ctx, jr, roomID, device.UserID, r.From)

// We don't include a device here as we don't need to send down
// transaction IDs for complete syncs, but we do it anyway because Sytest demands it for:
Expand All @@ -439,8 +475,6 @@ func (p *PDUStreamProvider) getJoinResponseForCompleteSync(
}
}

jr.Summary.JoinedMemberCount = &joinedCount
jr.Summary.InvitedMemberCount = &invitedCount
jr.Timeline.PrevBatch = prevBatch
jr.Timeline.Events = gomatrixserverlib.HeaderedToClientEvents(recentEvents, gomatrixserverlib.FormatSync)
jr.Timeline.Limited = limited
Expand Down
1 change: 1 addition & 0 deletions syncapi/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func NewSyncStreamProviders(
PDUStreamProvider: &PDUStreamProvider{
StreamProvider: StreamProvider{DB: d},
lazyLoadCache: lazyLoadCache,
rsAPI: rsAPI,
},
TypingStreamProvider: &TypingStreamProvider{
StreamProvider: StreamProvider{DB: d},
Expand Down
5 changes: 4 additions & 1 deletion sytest-whitelist
Original file line number Diff line number Diff line change
Expand Up @@ -713,4 +713,7 @@ Presence can be set from sync
/state returns M_NOT_FOUND for a rejected message event
/state_ids returns M_NOT_FOUND for a rejected message event
/state returns M_NOT_FOUND for a rejected state event
/state_ids returns M_NOT_FOUND for a rejected state event
/state_ids returns M_NOT_FOUND for a rejected state event
Unnamed room comes with a name summary
Named room comes with just joined member count summary
Room summary only has 5 heroes