|
| 1 | +use futures::{unsync, Async, Future, Poll, Stream}; |
| 2 | +use never::Never; |
| 3 | +use notify_cell::NotifyCell; |
| 4 | +use project; |
| 5 | +use rpc::{self, client, server}; |
| 6 | +use serde_json; |
| 7 | +use std::cell::RefCell; |
| 8 | +use std::rc::Rc; |
| 9 | +use window::{View, WeakViewHandle, Window}; |
| 10 | +use workspace::UserId; |
| 11 | +use ForegroundExecutor; |
| 12 | +use IntoShared; |
| 13 | + |
| 14 | +pub trait DiscussionViewDelegate { |
| 15 | + fn anchor(&self) -> Option<project::Anchor>; |
| 16 | + fn jump(&self, anchor: &project::Anchor) -> Option<project::Anchor>; |
| 17 | +} |
| 18 | + |
| 19 | +pub struct Discussion { |
| 20 | + messages: Vec<Message>, |
| 21 | + local_user_id: UserId, |
| 22 | + outgoing_message_txs: Vec<unsync::mpsc::UnboundedSender<Message>>, |
| 23 | + updates: NotifyCell<()>, |
| 24 | + client: Option<client::Service<DiscussionService>>, |
| 25 | +} |
| 26 | + |
| 27 | +#[derive(Clone, Serialize, Deserialize)] |
| 28 | +pub struct Message { |
| 29 | + text: String, |
| 30 | + anchor: Option<project::Anchor>, |
| 31 | + user_id: UserId, |
| 32 | +} |
| 33 | + |
| 34 | +pub struct DiscussionView<T: DiscussionViewDelegate> { |
| 35 | + discussion: Rc<RefCell<Discussion>>, |
| 36 | + updates: Box<Stream<Item = (), Error = ()>>, |
| 37 | + delegate: WeakViewHandle<T>, |
| 38 | +} |
| 39 | + |
| 40 | +#[derive(Deserialize)] |
| 41 | +#[serde(tag = "type")] |
| 42 | +enum DiscussionViewAction { |
| 43 | + Send { text: String }, |
| 44 | + Jump { message_index: usize }, |
| 45 | +} |
| 46 | + |
| 47 | +pub struct DiscussionService { |
| 48 | + remote_user_id: UserId, |
| 49 | + discussion: Rc<RefCell<Discussion>>, |
| 50 | + outgoing_messages: Box<Stream<Item = Message, Error = Never>>, |
| 51 | +} |
| 52 | + |
| 53 | +#[derive(Serialize, Deserialize)] |
| 54 | +pub struct ServiceRequest { |
| 55 | + text: String, |
| 56 | + anchor: Option<project::Anchor>, |
| 57 | +} |
| 58 | + |
| 59 | +impl Discussion { |
| 60 | + pub fn new(local_user_id: UserId) -> Self { |
| 61 | + Self { |
| 62 | + messages: Vec::new(), |
| 63 | + local_user_id, |
| 64 | + outgoing_message_txs: Vec::new(), |
| 65 | + updates: NotifyCell::new(()), |
| 66 | + client: None, |
| 67 | + } |
| 68 | + } |
| 69 | + |
| 70 | + pub fn remote( |
| 71 | + executor: ForegroundExecutor, |
| 72 | + local_user_id: UserId, |
| 73 | + client: client::Service<DiscussionService>, |
| 74 | + ) -> Result<Rc<RefCell<Self>>, rpc::Error> { |
| 75 | + let client_updates = client.updates()?; |
| 76 | + let discussion = Self { |
| 77 | + messages: client.state()?, |
| 78 | + local_user_id, |
| 79 | + outgoing_message_txs: Vec::new(), |
| 80 | + updates: NotifyCell::new(()), |
| 81 | + client: Some(client), |
| 82 | + }.into_shared(); |
| 83 | + |
| 84 | + let discussion_weak = Rc::downgrade(&discussion); |
| 85 | + executor.execute(Box::new(client_updates.for_each(move |message| { |
| 86 | + if let Some(discussion) = discussion_weak.upgrade() { |
| 87 | + discussion.borrow_mut().push_message(message); |
| 88 | + } |
| 89 | + |
| 90 | + Ok(()) |
| 91 | + }))); |
| 92 | + Ok(discussion) |
| 93 | + } |
| 94 | + |
| 95 | + fn updates(&self) -> impl Stream<Item = (), Error = ()> { |
| 96 | + self.updates.observe() |
| 97 | + } |
| 98 | + |
| 99 | + fn outgoing_messages(&mut self) -> impl Stream<Item = Message, Error = Never> { |
| 100 | + let (tx, rx) = unsync::mpsc::unbounded(); |
| 101 | + self.outgoing_message_txs.push(tx); |
| 102 | + rx.map_err(|_| unreachable!()) |
| 103 | + } |
| 104 | + |
| 105 | + fn send(&mut self, text: String, anchor: Option<project::Anchor>) { |
| 106 | + if let Some(ref client) = self.client { |
| 107 | + client.request(ServiceRequest { text, anchor }); |
| 108 | + } else { |
| 109 | + let user_id = self.local_user_id; |
| 110 | + self.push_message(Message { |
| 111 | + text, |
| 112 | + anchor, |
| 113 | + user_id, |
| 114 | + }); |
| 115 | + } |
| 116 | + } |
| 117 | + |
| 118 | + fn push_message(&mut self, message: Message) { |
| 119 | + self.outgoing_message_txs |
| 120 | + .retain(|tx| !tx.unbounded_send(message.clone()).is_err()); |
| 121 | + self.messages.push(message); |
| 122 | + self.updates.set(()); |
| 123 | + } |
| 124 | +} |
| 125 | + |
| 126 | +impl<T: DiscussionViewDelegate> View for DiscussionView<T> { |
| 127 | + fn component_name(&self) -> &'static str { |
| 128 | + "Discussion" |
| 129 | + } |
| 130 | + |
| 131 | + fn render(&self) -> serde_json::Value { |
| 132 | + let discussion = self.discussion.borrow(); |
| 133 | + json!({ |
| 134 | + "messages": discussion.messages.iter().enumerate().map(|(index, message)| json!({ |
| 135 | + "index": index, |
| 136 | + "text": message.text, |
| 137 | + "user_id": message.user_id |
| 138 | + })).collect::<Vec<_>>() |
| 139 | + }) |
| 140 | + } |
| 141 | + |
| 142 | + fn dispatch_action(&mut self, action: serde_json::Value, _: &mut Window) { |
| 143 | + match serde_json::from_value(action) { |
| 144 | + Ok(DiscussionViewAction::Send { text }) => { |
| 145 | + if let Some(anchor) = self.delegate.map(|delegate| delegate.anchor()) { |
| 146 | + self.discussion.borrow_mut().send(text, anchor); |
| 147 | + } |
| 148 | + } |
| 149 | + Ok(DiscussionViewAction::Jump { message_index }) => { |
| 150 | + let discussion = self.discussion.borrow(); |
| 151 | + let message = &discussion.messages[message_index]; |
| 152 | + self.delegate.map(|delegate| { |
| 153 | + if let Some(ref anchor) = message.anchor { |
| 154 | + delegate.jump(anchor); |
| 155 | + } |
| 156 | + }); |
| 157 | + } |
| 158 | + _ => eprintln!("Unrecognized action"), |
| 159 | + } |
| 160 | + } |
| 161 | +} |
| 162 | + |
| 163 | +impl<T: DiscussionViewDelegate> DiscussionView<T> { |
| 164 | + pub fn new(discussion: Rc<RefCell<Discussion>>, delegate: WeakViewHandle<T>) -> Self { |
| 165 | + let updates = discussion.borrow().updates(); |
| 166 | + Self { |
| 167 | + delegate, |
| 168 | + discussion, |
| 169 | + updates: Box::new(updates), |
| 170 | + } |
| 171 | + } |
| 172 | +} |
| 173 | + |
| 174 | +impl<T: DiscussionViewDelegate> Stream for DiscussionView<T> { |
| 175 | + type Item = (); |
| 176 | + type Error = (); |
| 177 | + |
| 178 | + fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { |
| 179 | + self.updates.poll() |
| 180 | + } |
| 181 | +} |
| 182 | + |
| 183 | +impl DiscussionService { |
| 184 | + pub fn new(remote_user_id: UserId, discussion: Rc<RefCell<Discussion>>) -> Self { |
| 185 | + let outgoing_messages = Box::new(discussion.borrow_mut().outgoing_messages()); |
| 186 | + Self { |
| 187 | + remote_user_id, |
| 188 | + discussion, |
| 189 | + outgoing_messages, |
| 190 | + } |
| 191 | + } |
| 192 | +} |
| 193 | + |
| 194 | +impl server::Service for DiscussionService { |
| 195 | + type State = Vec<Message>; |
| 196 | + type Update = Message; |
| 197 | + type Request = ServiceRequest; |
| 198 | + type Response = (); |
| 199 | + |
| 200 | + fn init(&mut self, _: &rpc::server::Connection) -> Self::State { |
| 201 | + self.discussion.borrow().messages.clone() |
| 202 | + } |
| 203 | + |
| 204 | + fn poll_update(&mut self, _: &rpc::server::Connection) -> Async<Option<Self::Update>> { |
| 205 | + self.outgoing_messages.poll().unwrap() |
| 206 | + } |
| 207 | + |
| 208 | + fn request( |
| 209 | + &mut self, |
| 210 | + request: Self::Request, |
| 211 | + _connection: &rpc::server::Connection, |
| 212 | + ) -> Option<Box<Future<Item = Self::Response, Error = Never>>> { |
| 213 | + self.discussion.borrow_mut().push_message(Message { |
| 214 | + text: request.text, |
| 215 | + anchor: request.anchor, |
| 216 | + user_id: self.remote_user_id, |
| 217 | + }); |
| 218 | + None |
| 219 | + } |
| 220 | +} |
0 commit comments