Class: Fluent::Plugin::GcloudPubSubInput

Inherits:
Input
  • Object
show all
Includes:
Compressable
Defined in:
lib/fluent/plugin/in_gcloud_pubsub.rb

Defined Under Namespace

Classes: FailedParseError, RPCServlet

Constant Summary collapse

DEFAULT_PARSER_TYPE =
'json'

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 108

def configure(conf)
  compat_parameters_convert(conf, :parser)
  super
  @rpc_srv = nil
  @rpc_thread = nil
  @stop_pull = false

  @extract_tag = if @tag_key.nil?
                   method(:static_tag)
                 else
                   method(:dynamic_tag)
                 end

  @parser = parser_create
  @decompress = if @decompression == 'gzip'
                  method(:gzip_decompress)
                else
                  method(:no_decompress)
                end
end

#shutdownObject



144
145
146
147
148
149
150
151
152
153
154
155
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 144

def shutdown
  if @rpc_srv
    @rpc_srv.shutdown
    @rpc_srv = nil
  end
  if @rpc_thread
    @rpc_thread = nil
  end
  @stop_subscribing = true
  @subscribe_threads.each(&:join)
  super
end

#startObject



129
130
131
132
133
134
135
136
137
138
139
140
141
142
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 129

def start
  super
  start_rpc if @enable_rpc

  @subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription
  log.debug "connected subscription:#{@subscription} in project #{@project}"

  @emit_guard = Mutex.new
  @stop_subscribing = false
  @subscribe_threads = []
  @pull_threads.times do |idx|
    @subscribe_threads.push thread_create("in_gcloud_pubsub_subscribe_#{idx}".to_sym, &method(:subscribe))
  end
end

#start_pullObject



162
163
164
165
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 162

def start_pull
  @stop_pull = false
  log.info "start pull from subscription:#{@subscription}"
end

#status_of_pullObject



167
168
169
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 167

def status_of_pull
  @stop_pull ? 'stopped' : 'started'
end

#stop_pullObject



157
158
159
160
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 157

def stop_pull
  @stop_pull = true
  log.info "stop pull from subscription:#{@subscription}"
end