Class: Fluent::Plugin::GcloudPubSubInput
- Inherits:
-
Input
- Object
- Input
- Fluent::Plugin::GcloudPubSubInput
show all
- Defined in:
- lib/fluent/plugin/in_gcloud_pubsub.rb
Defined Under Namespace
Classes: FailedParseError, RPCServlet
Constant Summary
collapse
- DEFAULT_PARSER_TYPE =
'json'
Instance Method Summary
collapse
Instance Method Details
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 103
def configure(conf)
compat_parameters_convert(conf, :parser)
super
@rpc_srv = nil
@rpc_thread = nil
@stop_pull = false
@extract_tag = if @tag_key.nil?
method(:static_tag)
else
method(:dynamic_tag)
end
@parser = parser_create
end
|
#shutdown ⇒ Object
134
135
136
137
138
139
140
141
142
143
144
145
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 134
def shutdown
if @rpc_srv
@rpc_srv.shutdown
@rpc_srv = nil
end
if @rpc_thread
@rpc_thread = nil
end
@stop_subscribing = true
@subscribe_threads.each(&:join)
super
end
|
#start ⇒ Object
119
120
121
122
123
124
125
126
127
128
129
130
131
132
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 119
def start
super
start_rpc if @enable_rpc
@subscriber = Fluent::GcloudPubSub::Subscriber.new @project, @key, @topic, @subscription
log.debug "connected subscription:#{@subscription} in project #{@project}"
@emit_guard = Mutex.new
@stop_subscribing = false
@subscribe_threads = []
@pull_threads.times do |idx|
@subscribe_threads.push thread_create("in_gcloud_pubsub_subscribe_#{idx}".to_sym, &method(:subscribe))
end
end
|
#start_pull ⇒ Object
152
153
154
155
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 152
def start_pull
@stop_pull = false
log.info "start pull from subscription:#{@subscription}"
end
|
#status_of_pull ⇒ Object
157
158
159
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 157
def status_of_pull
@stop_pull ? 'stopped' : 'started'
end
|
#stop_pull ⇒ Object
147
148
149
150
|
# File 'lib/fluent/plugin/in_gcloud_pubsub.rb', line 147
def stop_pull
@stop_pull = true
log.info "stop pull from subscription:#{@subscription}"
end
|