Skip to content

Commit

Permalink
Implement broadcast-related methods for client
Browse files Browse the repository at this point in the history
  • Loading branch information
sejongk committed Sep 6, 2023
1 parent 669a739 commit 8b2a9a7
Show file tree
Hide file tree
Showing 3 changed files with 332 additions and 70 deletions.
244 changes: 182 additions & 62 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -409,6 +409,8 @@ func (c *Client) Watch(
}

rch := make(chan WatchResponse)
bch := make(chan *api.BroadcastEvent)

stream, err := c.client.WatchDocument(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.WatchDocumentRequest{
Expand All @@ -420,69 +422,11 @@ func (c *Client) Watch(
return nil, err
}

handleResponse := func(pbResp *api.WatchDocumentResponse) (*WatchResponse, error) {
switch resp := pbResp.Body.(type) {
case *api.WatchDocumentResponse_Initialization_:
var clientIDs []string
for _, clientID := range resp.Initialization.ClientIds {
id, err := time.ActorIDFromHex(clientID)
if err != nil {
return nil, err
}
clientIDs = append(clientIDs, id.String())
}

doc.SetOnlineClients(clientIDs...)
return nil, nil
case *api.WatchDocumentResponse_Event:
eventType, err := converter.FromEventType(resp.Event.Type)
if err != nil {
return nil, err
}

cli, err := time.ActorIDFromHex(resp.Event.Publisher)
if err != nil {
return nil, err
}

switch eventType {
case types.DocumentChangedEvent:
return &WatchResponse{Type: DocumentChanged}, nil
case types.DocumentWatchedEvent:
doc.AddOnlineClient(cli.String())
if doc.Presence(cli.String()) == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentWatched,
Presences: map[string]innerpresence.Presence{
cli.String(): doc.Presence(cli.String()),
},
}, nil
case types.DocumentUnwatchedEvent:
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())
if p == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentUnwatched,
Presences: map[string]innerpresence.Presence{
cli.String(): p,
},
}, nil
}
}
return nil, ErrUnsupportedWatchResponseType
}

pbResp, err := stream.Recv()
if err != nil {
return nil, err
}
if _, err := handleResponse(pbResp); err != nil {
if _, err := handleResponse(pbResp, doc, bch); err != nil {
return nil, err
}

Expand All @@ -494,7 +438,7 @@ func (c *Client) Watch(
close(rch)
return
}
resp, err := handleResponse(pbResp)
resp, err := handleResponse(pbResp, doc, bch)
if err != nil {
rch <- WatchResponse{Err: err}
close(rch)
Expand All @@ -520,7 +464,7 @@ func (c *Client) Watch(
go func() {
for {
select {
case e := <-doc.Events():
case e := <-doc.DocEvents():
t := PresenceChanged
if e.Type == document.WatchedEvent {
t = DocumentWatched
Expand All @@ -534,10 +478,107 @@ func (c *Client) Watch(
}
}()

go func() {
for {
select {
case r := <-doc.BroadcastRequests():
// TODO(sejongk): How to deal with errors in the goroutine?
switch r.RequestType {
case document.Broadcast:
_ = c.broadcast(ctx, doc, r.EventType, r.Payload)
case document.Subscribe:
_ = c.subscribeBroadcastEvent(ctx, doc, r.EventType)
case document.Unsubscribe:
_ = c.unsubscribeBroadcastEvent(ctx, doc, r.EventType)
}
case b := <-bch:
if handler, ok := doc.BroadcastEventHandlers()[b.Type]; ok && handler != nil {
err := handler(b.Type, b.Publisher, b.Payload)
if err != nil {
if c.logger.Core().Enabled(zap.DebugLevel) {
c.logger.Debug("broadcast event handling error", zap.Error(err))
}
}
}
case <-ctx.Done():
return
}
}
}()

return rch, nil
}

func (c *Client) findDocKey(docID string) (key.Key, error) {
func handleResponse(
pbResp *api.WatchDocumentResponse,
doc *document.Document,
bch chan *api.BroadcastEvent,
) (*WatchResponse, error) {
switch resp := pbResp.Body.(type) {
case *api.WatchDocumentResponse_Initialization_:
var clientIDs []string
for _, clientID := range resp.Initialization.ClientIds {
id, err := time.ActorIDFromHex(clientID)
if err != nil {
return nil, err
}
clientIDs = append(clientIDs, id.String())
}

doc.SetOnlineClients(clientIDs...)
return nil, nil
case *api.WatchDocumentResponse_DocEvent:
// Handle a document event
eventType, err := converter.FromEventType(resp.DocEvent.Type)
if err != nil {
return nil, err
}

cli, err := time.ActorIDFromHex(resp.DocEvent.Publisher)
if err != nil {
return nil, err
}

switch eventType {
case types.DocumentChangedEvent:
return &WatchResponse{Type: DocumentChanged}, nil
case types.DocumentWatchedEvent:
doc.AddOnlineClient(cli.String())
if doc.Presence(cli.String()) == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentWatched,
Presences: map[string]innerpresence.Presence{
cli.String(): doc.Presence(cli.String()),
},
}, nil
case types.DocumentUnwatchedEvent:
p := doc.Presence(cli.String())
doc.RemoveOnlineClient(cli.String())
if p == nil {
return nil, nil
}

return &WatchResponse{
Type: DocumentUnwatched,
Presences: map[string]innerpresence.Presence{
cli.String(): p,
},
}, nil
}

case *api.WatchDocumentResponse_BroadcastEvent:
// Handle a broadcast event
bch <- resp.BroadcastEvent
return nil, nil
}
return nil, ErrUnsupportedWatchResponseType
}

// FindDocKey returns the document key of the given document id.
func (c *Client) FindDocKey(docID string) (key.Key, error) {
for _, attachment := range c.attachments {
if attachment.docID.String() == docID {
return attachment.doc.Key(), nil
Expand Down Expand Up @@ -650,6 +691,85 @@ func (c *Client) Remove(ctx context.Context, doc *document.Document) error {
return nil
}

func (c *Client) broadcast(ctx context.Context, doc *document.Document, eventType string, payload []byte) error {
if c.status != activated {
return ErrClientNotActivated
}

attachment, ok := c.attachments[doc.Key()]
if !ok {
return ErrDocumentNotAttached
}

_, err := c.client.Broadcast(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.BroadcastRequest{
ClientId: c.id.String(),
DocumentId: attachment.docID.String(),
Event: &api.BroadcastEvent{
Type: eventType,
Publisher: c.id.String(),
Payload: payload,
},
},
)
if err != nil {
return err
}

return nil
}

func (c *Client) subscribeBroadcastEvent(ctx context.Context, doc *document.Document, eventType string) error {
if c.status != activated {
return ErrClientNotActivated
}

attachment, ok := c.attachments[doc.Key()]
if !ok {
return ErrDocumentNotAttached
}

_, err := c.client.SubscribeBroadcastEvent(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.SubscribeBroadcastEventRequest{
ClientId: c.id.String(),
DocumentId: attachment.docID.String(),
Type: eventType,
},
)
if err != nil {
return err
}

return nil
}

func (c *Client) unsubscribeBroadcastEvent(ctx context.Context, doc *document.Document, eventType string) error {
if c.status != activated {
return ErrClientNotActivated
}

attachment, ok := c.attachments[doc.Key()]
if !ok {
return ErrDocumentNotAttached
}

_, err := c.client.UnsubscribeBroadcastEvent(
withShardKey(ctx, c.options.APIKey, doc.Key().String()),
&api.UnsubscribeBroadcastEventRequest{
ClientId: c.id.String(),
DocumentId: attachment.docID.String(),
Type: eventType,
},
)
if err != nil {
return err
}

return nil
}

/**
* withShardKey returns a context with the given shard key in metadata.
*/
Expand Down
Loading

0 comments on commit 8b2a9a7

Please sign in to comment.