1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
use crate::logging::init_logging;
use crate::orders::Book;
use anyhow::Result;
use async_channel::{unbounded, Receiver, Sender};
use config::builder::{ConfigBuilder, DefaultState};
use config::{Config, ConfigError, Environment};
use serde::Deserialize;
use serde_json::Value;
use slog::{error, info, Logger};

pub type CfgBuilder = ConfigBuilder<DefaultState>;

/// Worker Context
///
///  A context is the basic configuration for a worker
#[derive(Clone)]
pub struct Context<T> {
    /// name of the worker
    pub name: String,
    /// configuration
    pub cfg: Config,
    /// logging
    pub logger: Logger,
    /// Use this to send messages to another worker
    pub sender: Sender<T>,
    /// Use this to receive messages from another worker
    pub receiver: Receiver<T>,
}

#[derive(Debug, Clone)]
pub struct WsInfo {
    pub name: String,
    pub url: String,
}

/// Websocket payload
///
/// This struct is used by the websocket consumer to communicate back to the main application
/// once a new websocket message arrives
#[derive(Debug, Clone)]
pub struct WsPayload {
    /// gateway name
    pub name: String,
    /// websocket url
    pub url: String,
    /// message payload
    pub value: Value,
}

/// Gateway command
///
/// This struct is used by the websocket consumer to communicate back to the main application
/// once a new websocket message arrives
#[derive(Debug, Clone)]
pub struct NewBookSnapshot {
    /// gateway name
    pub name: String,
}

/// Orderbook snapshot message
///
/// An orderbook snapshot is a representation of the order book at a given time.
/// It can be used as the starting point of an in-memory order book.
#[derive(Debug, Clone)]
pub struct BookSnapshot {
    /// gateway name
    pub name: String,
    /// sequence number (use this to discard deltas with sequence prior to this)
    pub sequence: usize,
    /// the order book snapshot
    pub book: Book,
}

/// Internal message enum
///
/// This enum is used to send messages between different coroutines
#[derive(Debug, Clone)]
pub enum InnerMessage {
    /// heartbeat message
    Heartbeat,
    /// clean exit
    Exit,
    /// exit with failure
    Failure,
    /// websocket message
    WsConnected(WsInfo),
    /// websocket disconnect
    WsDisconnected(WsInfo),
    /// websocket payload
    WsPayload(WsPayload),
    /// request for a new BookSnapshot
    NewBookSnapshot(NewBookSnapshot),
    /// Orderbook snapshot
    BookSnapshot(BookSnapshot),
}

/// A context for a courotine worker
pub type WorkerContext = Context<InnerMessage>;

pub fn create_config() -> CfgBuilder {
    Config::builder().add_source(Environment::default())
}

impl<T> Context<T> {
    pub fn new(name: &str, config: Option<Config>) -> Self {
        let (sender, receiver) = unbounded();
        let cfg = match config {
            Some(cfg) => cfg,
            None => create_config().build().expect("config"),
        };
        let logger = init_logging(&cfg).unwrap();
        Self {
            name: name.to_owned(),
            cfg,
            logger,
            sender,
            receiver,
        }
    }

    /// Get a value from config or a default one
    pub fn get_or<'de, C: Deserialize<'de>>(
        &self,
        key: &str,
        default: C,
    ) -> Result<C, ConfigError> {
        let v = self.cfg.get(key);

        if let Err(ConfigError::NotFound(_)) = v {
            Ok(default)
        } else {
            v
        }
    }

    pub fn try_send(&self, msg: T) {
        self.sender.try_send(msg).unwrap();
    }

    pub async fn send(&self, msg: T) {
        self.sender.send(msg).await.unwrap();
    }
}

pub async fn wrap_result(context: &WorkerContext, result: Result<()>) {
    match result {
        Ok(()) => {
            info!(context.logger, "{} - exited", context.name);
            context.send(InnerMessage::Exit).await;
        }
        Err(err) => {
            error!(
                context.logger,
                "{} - unexpected error - {}", context.name, err
            );
            context.send(InnerMessage::Failure).await;
        }
    };
}