Class: Fluent::Plugin::GcloudPubSubInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::GcloudPubSubInput
show all
- Includes:
- Compressable
- Defined in:
- lib/fluent/plugin/in_gcloud_pubsub.rb
Defined Under Namespace
Classes: FailedParseError, RPCServlet
Constant Summary
collapse
- DEFAULT_PARSER_TYPE =
'json'
Instance Method Summary
collapse
Instance Method Details
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 108
def configure(conf)
compat_parameters_convert(conf, :parser)
super
@rpc_srv = nil
@rpc_thread = nil
@stop_pull = false
@extract_tag = if @tag_key.nil?
method(:static_tag)
else
method(:dynamic_tag)
end
@parser = parser_create
@decompress = if @decompression == 'gzip'
method(:gzip_decompress)
else
method(:no_decompress)
end
end
|
#shutdown ⇒ Object
144
145
146
147
148
149
150
151
152
153
154
155
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 144
def shutdown
if @rpc_srv
@rpc_srv.shutdown
@rpc_srv = nil
end
if @rpc_thread
@rpc_thread = nil
end
@stop_subscribing = true
@subscribe_threads.each(&:join)
super
end
|
#start ⇒ Object
129
130
131
132
133
134
135
136
137
138
139
140
141
142
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 129
def start
super
start_rpc if @enable_rpc
@subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription
log.debug "connected subscription:#{@subscription} in project #{@project}"
@emit_guard = Mutex.new
@stop_subscribing = false
@subscribe_threads = []
@pull_threads.times do |idx|
@subscribe_threads.push thread_create("in_gcloud_pubsub_subscribe_#{idx}".to_sym, &method(:subscribe))
end
end
|
#start_pull ⇒ Object
162
163
164
165
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 162
def start_pull
@stop_pull = false
log.info "start pull from subscription:#{@subscription}"
end
|
#status_of_pull ⇒ Object
167
168
169
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 167
def status_of_pull
@stop_pull ? 'stopped' : 'started'
end
|
#stop_pull ⇒ Object
157
158
159
160
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 157
def stop_pull
@stop_pull = true
log.info "stop pull from subscription:#{@subscription}"
end
|