Class: Fluent::Plugin::GcloudPubSubOutput
- Inherits:
-
Output
- Object
- Output
- Fluent::Plugin::GcloudPubSubOutput
- Includes:
- Fluent::PluginHelper::Inject
- Defined in:
- lib/fluent/plugin/out_gcloud_pubsub.rb
Constant Summary collapse
- DEFAULT_BUFFER_TYPE =
"memory"
- DEFAULT_FORMATTER_TYPE =
"json"
Instance Method Summary collapse
-
#configure(conf) ⇒ Object
rubocop:disable Metrics/MethodLength.
- #format(tag, time, record) ⇒ Object
- #formatted_to_msgpack_binary? ⇒ Boolean
- #multi_workers_ready? ⇒ Boolean
-
#start ⇒ Object
rubocop:enable Metrics/MethodLength.
- #write(chunk) ⇒ Object
Instance Method Details
#configure(conf) ⇒ Object
rubocop:disable Metrics/MethodLength
51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 |
# File 'lib/fluent/plugin/out_gcloud_pubsub.rb', line 51 def configure(conf) compat_parameters_convert(conf, :buffer, :formatter) super placeholder_validate!(:topic, @topic) @formatter = formatter_create if @compress_batches && !@attribute_keys.empty? # The attribute_keys option is implemented by extracting keys from the # record and setting them on the Pub/Sub message. # This is not possible in compressed mode, because we're sending just a # single Pub/Sub message that comprises many records, therefore the # attribute keys would clash. raise Fluent::ConfigError, ":attribute_keys cannot be used when compression is enabled" end @messages_published = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_per_batch") do ::Prometheus::Client.registry.histogram( :"#{@metric_prefix}_messages_published_per_batch", "Number of records published to Pub/Sub per buffer flush", {}, [1, 10, 50, 100, 250, 500, 1000], ) end @bytes_published = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_messages_published_bytes") do ::Prometheus::Client.registry.histogram( :"#{@metric_prefix}_messages_published_bytes", "Total size in bytes of the records published to Pub/Sub", {}, [100, 1000, 10_000, 100_000, 1_000_000, 5_000_000, 10_000_000], ) end @compression_enabled = Fluent::GcloudPubSub::Metrics.register_or_existing(:"#{@metric_prefix}_compression_enabled") do ::Prometheus::Client.registry.gauge( :"#{@metric_prefix}_compression_enabled", "Whether compression/batching is enabled", {}, ) end @compression_enabled.set(common_labels, @compress_batches ? 1 : 0) end |
#format(tag, time, record) ⇒ Object
103 104 105 106 107 108 109 110 |
# File 'lib/fluent/plugin/out_gcloud_pubsub.rb', line 103 def format(tag, time, record) record = inject_values_to_record(tag, time, record) attributes = {} @attribute_keys.each do |key| attributes[key] = record.delete(key) end [@formatter.format(tag, time, record), attributes].to_msgpack end |
#formatted_to_msgpack_binary? ⇒ Boolean
112 113 114 |
# File 'lib/fluent/plugin/out_gcloud_pubsub.rb', line 112 def formatted_to_msgpack_binary? true end |
#multi_workers_ready? ⇒ Boolean
116 117 118 |
# File 'lib/fluent/plugin/out_gcloud_pubsub.rb', line 116 def multi_workers_ready? true end |
#start ⇒ Object
rubocop:enable Metrics/MethodLength
98 99 100 101 |
# File 'lib/fluent/plugin/out_gcloud_pubsub.rb', line 98 def start super @publisher = Fluent::GcloudPubSub::Publisher.new @project, @key, @autocreate_topic, @metric_prefix end |
#write(chunk) ⇒ Object
120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 |
# File 'lib/fluent/plugin/out_gcloud_pubsub.rb', line 120 def write(chunk) topic = extract_placeholders(@topic, chunk.) = [] size = 0 chunk.msgpack_each do |msg, attr| msg = Fluent::GcloudPubSub::Message.new(msg, attr) if msg.bytesize > @max_message_size log.warn "Drop a message because its size exceeds `max_message_size`", size: msg.bytesize next end if .length + 1 > @max_messages || size + msg.bytesize > @max_total_size publish(topic, ) = [] size = 0 end << msg size += msg.bytesize end publish(topic, ) unless .empty? rescue Fluent::GcloudPubSub::RetryableError => e log.warn "Retryable error occurs. Fluentd will retry.", error_message: e.to_s, error_class: e.class.to_s raise e rescue StandardError => e log.error "unexpected error", error_message: e.to_s, error_class: e.class.to_s log.error_backtrace raise e end |