Class: Fluent::FlumeInput::FluentFlumeHandler

Inherits:
Object
  • Object
show all
Defined in:
lib/fluent/plugin/in_flume.rb

Instance Attribute Summary collapse

Instance Method Summary collapse

Instance Attribute Details

#add_prefixObject

Returns the value of attribute add_prefix.



114
115
116
# File 'lib/fluent/plugin/in_flume.rb', line 114

def add_prefix
  @add_prefix
end

#default_tagObject

Returns the value of attribute default_tag.



113
114
115
# File 'lib/fluent/plugin/in_flume.rb', line 113

def default_tag
  @default_tag
end

#logObject

Returns the value of attribute log.



116
117
118
# File 'lib/fluent/plugin/in_flume.rb', line 116

def log
  @log
end

#msg_formatObject

Returns the value of attribute msg_format.



115
116
117
# File 'lib/fluent/plugin/in_flume.rb', line 115

def msg_format
  @msg_format
end

#routerObject

Returns the value of attribute router.



117
118
119
# File 'lib/fluent/plugin/in_flume.rb', line 117

def router
  @router
end

#tag_fieldObject

Returns the value of attribute tag_field.



112
113
114
# File 'lib/fluent/plugin/in_flume.rb', line 112

def tag_field
  @tag_field
end

Instance Method Details

#ackedAppend(evt) ⇒ Object



146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
# File 'lib/fluent/plugin/in_flume.rb', line 146

def ackedAppend(evt)
  begin
    record = create_record(evt)
    if @tag_field
      tag = evt.fieldss[@tag_field] || @default_tag
      unless tag
        return # ignore
      end
    else
      tag = @default_tag
    end
    timestamp = evt.timestamp.to_i
    if @add_prefix
      router.emit(@add_prefix + '.' + tag, timestamp, record)
    else
      router.emit(tag, timestamp, record)
    end
    return EventStatus::ACK
  rescue => e
    log.error "unexpected error", :error=>$!.to_s
    log.error_backtrace
    return EventStatus::ERR
  end
end

#append(evt) ⇒ Object



119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# File 'lib/fluent/plugin/in_flume.rb', line 119

def append(evt)
  begin
    record = create_record(evt)
    if @tag_field
      tag = evt.fieldss[@tag_field] || @default_tag
      unless tag
        return # ignore
      end
    else
      tag = @default_tag
    end
    timestamp = evt.timestamp.to_i
    if @add_prefix
      router.emit(@add_prefix + '.' + tag, timestamp, record)
    else
      router.emit(tag, timestamp, record)
    end
  rescue => e
    log.error "unexpected error", :error=>$!.to_s
    log.error_backtrace
  end
end

#closeObject



171
172
# File 'lib/fluent/plugin/in_flume.rb', line 171

def close()
end

#rawAppend(evt) ⇒ Object



142
143
144
# File 'lib/fluent/plugin/in_flume.rb', line 142

def rawAppend(evt)
  log.error "rawAppend is not implemented yet: #{evt}"
end