Class: Fluent::Plugin::GcloudPubSubInput

Inherits:
Input
  • Object
show all
Defined in:
lib/fluent/plugin/in_gcloud_pubsub.rb

Defined Under Namespace

Classes: FailedParseError, RPCServlet

Constant Summary collapse

DEFAULT_PARSER_TYPE =
'json'

Instance Method Summary collapse

Instance Method Details

#configure(conf) ⇒ Object



103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 103

def configure(conf)
  compat_parameters_convert(conf, :parser)
  super
  @rpc_srv = nil
  @rpc_thread = nil
  @stop_pull = false

  @extract_tag = if @tag_key.nil?
                   method(:static_tag)
                 else
                   method(:dynamic_tag)
                 end

  @parser = parser_create
end

#shutdownObject



134
135
136
137
138
139
140
141
142
143
144
145
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 134

def shutdown
  if @rpc_srv
    @rpc_srv.shutdown
    @rpc_srv = nil
  end
  if @rpc_thread
    @rpc_thread = nil
  end
  @stop_subscribing = true
  @subscribe_threads.each(&:join)
  super
end

#startObject



119
120
121
122
123
124
125
126
127
128
129
130
131
132
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 119

def start
  super
  start_rpc if @enable_rpc

  @subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription
  log.debug "connected subscription:#{@subscription} in project #{@project}"

  @emit_guard = Mutex.new
  @stop_subscribing = false
  @subscribe_threads = []
  @pull_threads.times do |idx|
    @subscribe_threads.push thread_create("in_gcloud_pubsub_subscribe_#{idx}".to_sym, &method(:subscribe))
  end
end

#start_pullObject



152
153
154
155
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 152

def start_pull
  @stop_pull = false
  log.info "start pull from subscription:#{@subscription}"
end

#status_of_pullObject



157
158
159
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 157

def status_of_pull
  @stop_pull ? 'stopped' : 'started'
end

#stop_pullObject



147
148
149
150
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 147

def stop_pull
  @stop_pull = true
  log.info "stop pull from subscription:#{@subscription}"
end