Skip to content

Commit

Permalink
simple telegram client #1065 (#1114)
Browse files Browse the repository at this point in the history
* feat(telegram): wip telegram send message

* feat(telegram): add telegram simple notification client

* feat(telegram): clippy warning

* feat(message): add a message_service object that handle a collection of MessageServiceTraits

* feat(ci): add telegram api key

* feat(message): fix review

* feat(message): use message service in simple_market_maker bot

* feat(message): fix compilation and review

* feat(simple_market_maker_bot): add swap notification

* feat(style): clone macro at the beginning

* feat(style): nest use statement

* feat(simple_bot): more detailed errors for swap history

* feat(lp_swap): log an error if the swap is not present in DB.

- If you have a swap that is not present in DB the function was returning an error, it's too aggressive.

`Error when querying swap history order cannot be created for: KMD/LTC`

- Instead skip the erroneous swaps and continue the iteration

* feat(simple_market_maker): add the possibility to control the loop refresh_rate

- Useful for small coins that doesn't update often price and to avoid taker to dump on you
- Add a default precision for notification, 8 should be enough for readable message.

* feat(wasm): use now_float() instead of system function for time manipulation

* feat(dispatcher): add a simple event dispatcher

* feat(dispatcher): use dispatcher in lp_bot

* feat(dispatcher): make wasm great again

* feat(dispatcher): fix code review

* feat(bot_cfg): add new options to prevent flash crash

* feat(simple_market_maker_bot): use V2 API for price

* feat(telegram): remove usage of unwrap_or for telegram send message

- The operation in unwrap_or was evaluated even if the option was not None

* feat(telegram): move trading bot cfg into running state

* feat(telegram): better syntax for chat id

* feat(telegram): cargo clippy

* feat(dispatcher): use TypeId to avoid construction of type

* feat(review): fix sergey review

* feat(default): make the dispatcher default constructible without implying the events

* feat(review): continue review fixing, extract more_code, introduce BotEvent

* feat(bot): add stopped event

* feat(bot): simplify on_trading_bot_event

* feat(message_service): introduce message_service ctx in MmArc

* feat(message_service): make init_message_service compile in wasm

* feat(lp_bot): add min_volume_usd settings

* feat(tests): add min_volume_usd to unit tests

* feat(lp_bot): add nomics support

* feat(lp_bot): simplify logging

* feat(log): log the time used to process a whole tick of bot logic

* feat(lp_bot): add max_volume_usd as balance_percentage alternative

* feat(lp_bot): use volume settings based on review

* feat(bot): use new volume approach in unit test

* feat(lp_bot): fix unit tests

* feat(lp_bot): fix minor changes review comment

* feat(lp_bot): add forex endpoint
  • Loading branch information
Milerius authored Nov 17, 2021
1 parent 89c975f commit fbd47e1
Show file tree
Hide file tree
Showing 19 changed files with 925 additions and 130 deletions.
2 changes: 2 additions & 0 deletions azure-pipelines-build-stage-job.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ parameters:
bob_userpass: ''
alice_passphrase: ''
alice_userpass: ''
telegram_api_key: ''

jobs:
- job: ${{ parameters.name }}
Expand Down Expand Up @@ -64,6 +65,7 @@ jobs:
BOB_USERPASS: $(${{ parameters.bob_userpass }})
ALICE_PASSPHRASE: $(${{ parameters.alice_passphrase }})
ALICE_USERPASS: $(${{ parameters.alice_userpass }})
TELEGRAM_API_KEY: $(${{ parameters.telegram_api_key }})
RUST_LOG: debug
condition: or( eq( variables['Build.Reason'], 'PullRequest' ), eq( variables['Build.SourceBranchName'], 'mm2.1' ), eq( variables['Build.SourceBranchName'], 'dev' ) )
- bash: |
Expand Down
3 changes: 3 additions & 0 deletions azure-pipelines.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ stages:
bob_userpass: 'BOB_USERPASS_LINUX'
alice_passphrase: 'ALICE_PASSPHRASE_LINUX'
alice_userpass: 'ALICE_USERPASS_LINUX'
telegram_api_key: 'TELEGRAM_API_KEY'

- template: azure-pipelines-build-stage-job.yml # Template reference
parameters:
Expand All @@ -40,6 +41,7 @@ stages:
bob_userpass: 'BOB_USERPASS_MAC'
alice_passphrase: 'ALICE_PASSPHRASE_MAC'
alice_userpass: 'ALICE_USERPASS_MAC'
telegram_api_key: 'TELEGRAM_API_KEY'

- template: azure-pipelines-build-stage-job.yml # Template reference
parameters:
Expand All @@ -49,6 +51,7 @@ stages:
bob_userpass: 'BOB_USERPASS_WIN'
alice_passphrase: 'ALICE_PASSPHRASE_WIN'
alice_userpass: 'ALICE_USERPASS_WIN'
telegram_api_key: 'TELEGRAM_API_KEY'

- stage: Release
displayName: Release
Expand Down
1 change: 1 addition & 0 deletions mm2src/common/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ pub mod big_int_str;
pub mod crash_reports;
pub mod custom_futures;
pub mod duplex_mutex;
pub mod event_dispatcher;
pub mod for_tests;
pub mod grpc_web;
pub mod iguana_utils;
Expand Down
148 changes: 148 additions & 0 deletions mm2src/common/event_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
use crate::mm_ctx::MmArc;
use async_trait::async_trait;
use std::any::TypeId;
use std::collections::{HashMap, HashSet};

pub type Listeners<EventType> = Vec<Box<dyn EventListener<Event = EventType>>>;
pub type DispatchTable = HashMap<TypeId, Vec<usize>>;

#[async_trait]
pub trait EventListener: 'static + Send + Sync {
type Event;
async fn process_event_async(&self, ctx: MmArc, event: Self::Event);
fn get_desired_events(&self) -> Vec<TypeId>;
fn listener_id(&self) -> &'static str;
}

pub trait EventUniqueId {
fn event_id(&self) -> TypeId;
}

pub struct Dispatcher<EventType> {
listeners: Listeners<EventType>,
listeners_id: HashSet<String>,
dispatch_table: DispatchTable,
}

impl<EventType> Default for Dispatcher<EventType>
where
EventType: Clone + EventUniqueId,
{
fn default() -> Self {
Dispatcher {
listeners: vec![],
listeners_id: Default::default(),
dispatch_table: Default::default(),
}
}
}

impl<EventType: 'static> Dispatcher<EventType>
where
EventType: Clone + EventUniqueId,
{
pub fn add_listener(&mut self, listen: impl EventListener<Event = EventType>) {
if self.listeners_id.contains(listen.listener_id()) {
return;
}
self.listeners_id.insert(listen.listener_id().to_string());
let id = self.listeners.len();
let events = listen.get_desired_events();
self.listeners.push(Box::new(listen));
for ev in events {
self.dispatch_table.entry(ev).or_default().push(id);
}
}

pub async fn dispatch_async(&self, ctx: MmArc, ev: EventType) {
if let Some(interested) = self.dispatch_table.get(&ev.event_id()) {
for id in interested.iter().copied() {
self.listeners[id].process_event_async(ctx.clone(), ev.clone()).await;
}
}
}

#[cfg(test)]
pub fn nb_listeners(&self) -> usize { self.listeners.len() }
}

#[cfg(test)]
mod event_dispatcher_tests {
use crate::block_on;
use crate::event_dispatcher::{Dispatcher, EventListener, EventUniqueId};
use crate::mm_ctx::{MmArc, MmCtxBuilder};
use async_trait::async_trait;
use std::any::TypeId;
use std::ops::Deref;
use std::sync::Arc;
use uuid::Uuid;

#[derive(Clone, Default)]
struct EventSwapStatusChanged {
uuid: Uuid,
status: String,
}

impl EventSwapStatusChanged {
fn event_id() -> TypeId { TypeId::of::<EventSwapStatusChanged>() }
}

#[derive(Clone)]
enum AppEvents {
EventSwapStatusChanged(EventSwapStatusChanged),
}

impl EventUniqueId for AppEvents {
fn event_id(&self) -> TypeId {
match self {
AppEvents::EventSwapStatusChanged(_) => EventSwapStatusChanged::event_id(),
}
}
}

#[derive(Default)]
struct ListenerSwapStatusChanged;

#[derive(Clone)]
struct ListenerSwapStatusChangedArc(Arc<ListenerSwapStatusChanged>);

impl Deref for ListenerSwapStatusChangedArc {
type Target = ListenerSwapStatusChanged;
fn deref(&self) -> &ListenerSwapStatusChanged { &*self.0 }
}

#[async_trait]
impl EventListener for ListenerSwapStatusChangedArc {
type Event = AppEvents;

async fn process_event_async(&self, _ctx: MmArc, event: Self::Event) {
match event {
AppEvents::EventSwapStatusChanged(swap) => {
assert_eq!(swap.uuid.to_string(), "00000000-0000-0000-0000-000000000000");
assert_eq!(swap.status, "Started");
},
}
}

fn get_desired_events(&self) -> Vec<TypeId> { vec![EventSwapStatusChanged::event_id()] }

fn listener_id(&self) -> &'static str { "listener_swap_status_changed" }
}

#[test]
fn test_dispatcher() {
let mut dispatcher: Dispatcher<AppEvents> = Default::default();
let listener = ListenerSwapStatusChanged::default();
let res = ListenerSwapStatusChangedArc(Arc::new(listener));
dispatcher.add_listener(res.clone());
assert_eq!(dispatcher.nb_listeners(), 1);

let event = AppEvents::EventSwapStatusChanged(EventSwapStatusChanged {
uuid: Default::default(),
status: "Started".to_string(),
});
block_on(dispatcher.dispatch_async(MmCtxBuilder::new().into_mm_arc(), event));
dispatcher.add_listener(res);
assert_eq!(dispatcher.nb_listeners(), 1);
}
}
4 changes: 4 additions & 0 deletions mm2src/common/mm_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ pub struct MmCtx {
pub ordermatch_ctx: Mutex<Option<Arc<dyn Any + 'static + Send + Sync>>>,
pub rate_limit_ctx: Mutex<Option<Arc<dyn Any + 'static + Send + Sync>>>,
pub simple_market_maker_bot_ctx: Mutex<Option<Arc<dyn Any + 'static + Send + Sync>>>,
pub dispatcher_ctx: Mutex<Option<Arc<dyn Any + 'static + Send + Sync>>>,
pub message_service_ctx: Mutex<Option<Arc<dyn Any + 'static + Send + Sync>>>,
pub p2p_ctx: Mutex<Option<Arc<dyn Any + 'static + Send + Sync>>>,
pub peer_id: Constructible<String>,
/// The context belonging to the `coins` crate: `CoinsContext`.
Expand Down Expand Up @@ -121,6 +123,8 @@ impl MmCtx {
ordermatch_ctx: Mutex::new(None),
rate_limit_ctx: Mutex::new(None),
simple_market_maker_bot_ctx: Mutex::new(None),
dispatcher_ctx: Mutex::new(None),
message_service_ctx: Mutex::new(None),
p2p_ctx: Mutex::new(None),
peer_id: Constructible::default(),
coins_ctx: Mutex::new(None),
Expand Down
2 changes: 1 addition & 1 deletion mm2src/common/transport/transport.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ pub use wasm_http::{slurp_post_json, slurp_url};

pub type SlurpResult = Result<(StatusCode, HeaderMap, Vec<u8>), MmError<SlurpError>>;

#[derive(Debug, Display)]
#[derive(Debug, Deserialize, Display, Serialize)]
pub enum SlurpError {
#[display(fmt = "Error deserializing '{}' response: {}", uri, error)]
ErrorDeserializing { uri: String, error: String },
Expand Down
40 changes: 40 additions & 0 deletions mm2src/lp_dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
use crate::mm2::lp_ordermatch::TradingBotEvent;
use crate::mm2::lp_swap::MakerSwapStatusChanged;
use common::event_dispatcher::{Dispatcher, EventUniqueId};
use common::mm_ctx::{from_ctx, MmArc};
use futures::lock::Mutex as AsyncMutex;
use std::any::TypeId;
use std::sync::Arc;

#[derive(Clone)]
pub enum LpEvents {
MakerSwapStatusChanged(MakerSwapStatusChanged),
TradingBotEvent(TradingBotEvent),
}

impl From<TradingBotEvent> for LpEvents {
fn from(evt: TradingBotEvent) -> Self { LpEvents::TradingBotEvent(evt) }
}

impl EventUniqueId for LpEvents {
fn event_id(&self) -> TypeId {
match self {
LpEvents::MakerSwapStatusChanged(_) => MakerSwapStatusChanged::event_id(),
LpEvents::TradingBotEvent(event) => event.event_id(),
}
}
}

#[derive(Default)]
pub struct DispatcherContext {
pub dispatcher: AsyncMutex<Dispatcher<LpEvents>>,
}

impl DispatcherContext {
/// Obtains a reference to this crate context, creating it if necessary.
pub fn from_ctx(ctx: &MmArc) -> Result<Arc<DispatcherContext>, String> {
Ok(try_s!(from_ctx(&ctx.dispatcher_ctx, move || {
Ok(DispatcherContext::default())
})))
}
}
Loading

0 comments on commit fbd47e1

Please sign in to comment.