Class: Peatio::Upstream::Base
- Inherits:
-
Object
- Object
- Peatio::Upstream::Base
- Defined in:
- lib/peatio/upstream/base.rb
Constant Summary collapse
- DEFAULT_DELAY =
1
- WEBSOCKET_CONNECTION_RETRY_DELAY =
2
Instance Attribute Summary collapse
-
#logger ⇒ Object
Returns the value of attribute logger.
Instance Method Summary collapse
- #build_error(response) ⇒ Object
-
#initialize(config) ⇒ Base
constructor
A new instance of Base.
- #mount ⇒ Object
- #notify_public_trade(trade) ⇒ Object
- #on_trade(trade) ⇒ Object
- #subscribe_orderbook(_market, _ws) ⇒ Object
- #subscribe_trades(_market, _ws) ⇒ Object
- #to_s ⇒ Object
- #trade_json(trade) ⇒ Object
- #ws_connect ⇒ Object
- #ws_connect_public ⇒ Object
- #ws_read_message(msg) ⇒ Object
- #ws_read_public_message(msg) ⇒ Object
Constructor Details
#initialize(config) ⇒ Base
Returns a new instance of Base.
11 12 13 14 15 16 17 18 19 20 21 22 |
# File 'lib/peatio/upstream/base.rb', line 11 def initialize(config) @host = config["rest"] @adapter = config[:faraday_adapter] || :em_synchrony @config = config @ws_status = false @market = config['source'] @target = config['target'] @public_trades_cb = [] @logger = Peatio::Logger.logger @peatio_mq = config['amqp'] mount end |
Instance Attribute Details
#logger ⇒ Object
Returns the value of attribute logger.
9 10 11 |
# File 'lib/peatio/upstream/base.rb', line 9 def logger @logger end |
Instance Method Details
#build_error(response) ⇒ Object
109 110 111 112 113 |
# File 'lib/peatio/upstream/base.rb', line 109 def build_error(response) JSON.parse(response.body) rescue StandardError => e "Code: #{response.env.status} Message: #{response.env.reason_phrase}" end |
#mount ⇒ Object
24 25 26 |
# File 'lib/peatio/upstream/base.rb', line 24 def mount @public_trades_cb << method(:on_trade) end |
#notify_public_trade(trade) ⇒ Object
101 102 103 |
# File 'lib/peatio/upstream/base.rb', line 101 def notify_public_trade(trade) @public_trades_cb.each {|cb| cb&.call(trade) } end |
#on_trade(trade) ⇒ Object
78 79 80 81 82 83 84 85 86 87 |
# File 'lib/peatio/upstream/base.rb', line 78 def on_trade(trade) logger.info { "Publishing trade event: #{trade.inspect}" } @peatio_mq.enqueue_event("public", @market, "trades", {trades: [trade]}) @peatio_mq.publish :trade, trade_json(trade), { headers: { type: :upstream, market: @market, } } end |
#subscribe_orderbook(_market, _ws) ⇒ Object
63 64 65 |
# File 'lib/peatio/upstream/base.rb', line 63 def subscribe_orderbook(_market, _ws) method_not_implemented end |
#subscribe_trades(_market, _ws) ⇒ Object
59 60 61 |
# File 'lib/peatio/upstream/base.rb', line 59 def subscribe_trades(_market, _ws) method_not_implemented end |
#to_s ⇒ Object
105 106 107 |
# File 'lib/peatio/upstream/base.rb', line 105 def to_s "Exchange::#{self.class} config: #{@opts}" end |
#trade_json(trade) ⇒ Object
89 90 91 92 93 94 95 96 97 98 99 |
# File 'lib/peatio/upstream/base.rb', line 89 def trade_json(trade) trade.deep_symbolize_keys! { id: trade[:tid], price: trade[:price], amount: trade[:amount], market_id: @market, created_at: Time.at(trade[:date]).utc.iso8601, taker_type: trade[:taker_type] } end |
#ws_connect ⇒ Object
28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
# File 'lib/peatio/upstream/base.rb', line 28 def ws_connect logger.info { "Websocket connecting to #{@ws_url}" } raise "websocket url missing for account #{id}" unless @ws_url @ws = WebSocket::Client::Simple.connect @ws_url @ws.on(:open) do |_e| subscribe_trades(@target, @ws) subscribe_orderbook(@target, @ws) logger.info { "Websocket connected" } end @ws.on(:message) do |msg| (msg) end @ws.on(:close) do |e| @ws = nil @ws_status = false logger.error "Websocket disconnected: #{e.code} Reason: #{e.reason}" Fiber.new do EM::Synchrony.sleep(WEBSOCKET_CONNECTION_RETRY_DELAY) ws_connect end.resume end end |
#ws_connect_public ⇒ Object
55 56 57 |
# File 'lib/peatio/upstream/base.rb', line 55 def ws_connect_public ws_connect end |
#ws_read_message(msg) ⇒ Object
71 72 73 74 75 76 |
# File 'lib/peatio/upstream/base.rb', line 71 def (msg) logger.debug {"received websocket message: #{msg.data}" } object = JSON.parse(msg.data) (object) end |
#ws_read_public_message(msg) ⇒ Object
67 68 69 |
# File 'lib/peatio/upstream/base.rb', line 67 def (msg) logger.info { "received public message: #{msg}" } end |