Class: PeatioIrix::Binance

Inherits:
Peatio::Upstream::Base
  • Object
show all
Defined in:
lib/peatio_irix/binance.rb

Constant Summary collapse

MIN_INCREMENT_COUNT_TO_SNAPSHOT =
100
MIN_PERIOD_TO_SNAPSHOT =
5
MAX_PERIOD_TO_SNAPSHOT =
60

Instance Attribute Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(config) ⇒ Binance

WS huobi global websocket: ‘wss://api.huobi.pro/ws/’ WS for krw markets websocket: ‘wss://api-cloud.huobi.co.kr/ws/’


17
18
19
20
21
22
23
24
25
26
27
28
29
30
# File 'lib/peatio_irix/binance.rb', line 17

def initialize(config)
  super
  @connection = Faraday.new(url: (config['rest']).to_s) do |builder|
    builder.response :json
    builder.response :logger if config['debug']
    builder.adapter(@adapter)
    unless config['verify_ssl'].nil?
      builder.ssl[:verify] = config['verify_ssl']
    end
  end
  @ping_set = false
  @rest = (config['rest']).to_s
  @ws_url = (config['websocket']).to_s
end

Instance Attribute Details

#asksObject

Returns the value of attribute asks.


10
11
12
# File 'lib/peatio_irix/binance.rb', line 10

def asks
  @asks
end

#bidsObject

Returns the value of attribute bids.


10
11
12
# File 'lib/peatio_irix/binance.rb', line 10

def bids
  @bids
end

#increment_countObject

Returns the value of attribute increment_count.


10
11
12
# File 'lib/peatio_irix/binance.rb', line 10

def increment_count
  @increment_count
end

#sequence_numberObject

Returns the value of attribute sequence_number.


10
11
12
# File 'lib/peatio_irix/binance.rb', line 10

def sequence_number
  @sequence_number
end

#snapObject

Returns the value of attribute snap.


10
11
12
# File 'lib/peatio_irix/binance.rb', line 10

def snap
  @snap
end

#snapshot_timeObject

Returns the value of attribute snapshot_time.


10
11
12
# File 'lib/peatio_irix/binance.rb', line 10

def snapshot_time
  @snapshot_time
end

Instance Method Details

#detect_order(msg) ⇒ Object


57
58
59
60
61
62
63
64
65
66
# File 'lib/peatio_irix/binance.rb', line 57

def detect_order(msg)
  if @increment_count < MIN_INCREMENT_COUNT_TO_SNAPSHOT && @snapshot_time <= Time.now - MAX_PERIOD_TO_SNAPSHOT
    publish_snapshot
    @increment_count = 0
  elsif @increment_count >= MIN_INCREMENT_COUNT_TO_SNAPSHOT && @snapshot_time < Time.now - MIN_PERIOD_TO_SNAPSHOT
    publish_snapshot
    @increment_count = 0
  end
  fill_increment(msg)
end

#detect_trade(msg) ⇒ Object


116
117
118
119
120
121
122
123
124
125
126
# File 'lib/peatio_irix/binance.rb', line 116

def detect_trade(msg)
  trade =
    {
      tid: msg['t'],
      amount: msg['q'].to_d,
      price: msg['p'].to_d,
      date: msg['T'] / 1000,
      taker_type: msg['b'] > msg['a'] ? 'buy' : 'sell'
    }
  notify_public_trade(trade)
end

#fill_increment(inc) ⇒ Object


68
69
70
71
72
# File 'lib/peatio_irix/binance.rb', line 68

def fill_increment(inc)
  fill_side(inc, 'b')
  fill_side(inc, 'a')
  @increment_count += 1
end

#fill_side(inc, side) ⇒ Object


74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
# File 'lib/peatio_irix/binance.rb', line 74

def fill_side(inc, side)
  inc[side].each do |price_point|
    price = price_point[0].to_d
    amount = price_point[1].to_d
    if amount.zero?
      @snap[side].delete_if { |point| point[0] == price.to_s }
    else
      @snap[side].delete_if { |point| point[0] == price.to_s }
      @snap[side] << [price.to_s, amount.to_s]
    end
    if side == 'b'
      @bids.delete_if { |point| point[0] == price }
      @bids << [price.to_s, amount.to_s]
    elsif side == 'a'
      @asks.delete_if { |point| point[0] == price }
      @asks << [price.to_s, amount.to_s]
    end
  end
end

#publish_incrementObject


94
95
96
97
98
99
100
101
102
103
104
105
106
# File 'lib/peatio_irix/binance.rb', line 94

def publish_increment
  inc = {}
  inc['b'] = @bids.sort.reverse if @bids.present?
  inc['a'] = @asks.sort if @asks.present?
  if inc.present?
    @sequence_number += 1
    @peatio_mq.enqueue_event('public', @market, 'ob-inc',
                             bids: inc['b'], asks: inc['a'],
                             sequence: @sequence_number)
  end
  @bids = []
  @asks = []
end

#publish_snapshotObject


108
109
110
111
112
113
114
# File 'lib/peatio_irix/binance.rb', line 108

def publish_snapshot
  @snapshot_time = Time.now
  @peatio_mq.enqueue_event('public', @market, 'ob-snap',
                           bids: @snap['b'].sort.reverse,
                           asks: @snap['a'].sort,
                           sequence: @sequence_number)
end

#subscribe_orderbook(market, ws) ⇒ Object


156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
# File 'lib/peatio_irix/binance.rb', line 156

def subscribe_orderbook(market, ws)
  return unless @config['orderbook_proxy']

  @sequence_number = 0
  @increment_count = 0
  @snapshot_time = Time.now
  @bids = []
  @asks = []
  @snap = { 'a' => [], 'b' => [] }

  sub = {
    method: 'SUBSCRIBE',
    params: [
      "#{market}@depth@100ms"
    ],
    id: 1
  }

  EM.next_tick do
    ws.send(JSON.generate(sub))
  end
  Fiber.new do
    EM::Synchrony.add_periodic_timer(0.2) do
      publish_increment
    end
  end.resume
end

#subscribe_trades(market, ws) ⇒ Object


140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# File 'lib/peatio_irix/binance.rb', line 140

def subscribe_trades(market, ws)
  return unless @config['trade_proxy']

  sub = {
    method: 'SUBSCRIBE',
    params: [
      "#{market}@trade"
    ],
    id: 1
  }

  EM.next_tick do
    ws.send(JSON.generate(sub))
  end
end

#ws_connectObject


128
129
130
131
132
133
134
135
136
137
138
# File 'lib/peatio_irix/binance.rb', line 128

def ws_connect
  super
  return if @ping_set

  Fiber.new do
    EM::Synchrony.add_periodic_timer(80) do
      @ws.send(JSON.dump('ping' => Time.now.to_i))
    end
  end.resume
  @ping_set = true
end

#ws_read_message(msg) ⇒ Object


32
33
34
35
36
37
38
39
40
41
# File 'lib/peatio_irix/binance.rb', line 32

def ws_read_message(msg)
  response =  JSON.parse msg.data
  return if response.key? 'result'

  # data = Zlib::GzipReader.new(StringIO.new(msg.data.map(&:chr).join)).read
  # Rails.logger.debug { "received websocket message: #{data}" }

  # object = JSON.parse(data)
  ws_read_public_message(response)
end

#ws_read_public_message(msg) ⇒ Object


43
44
45
46
47
48
49
50
51
52
53
54
55
# File 'lib/peatio_irix/binance.rb', line 43

def ws_read_public_message(msg)
  if msg['ping'].present?
    @ws.send(JSON.dump(pong: msg['ping']))
    return
  end

  case msg['stream']
  when /#{@target}@trade/
    detect_trade(msg['data'])
  when /#{@target}@depth@100ms/
    detect_order(msg['data'])
  end
end