Class: LogStash::Outputs::S3::FileRepository
- Inherits:
-
Object
- Object
- LogStash::Outputs::S3::FileRepository
show all
- Defined in:
- lib/logstash/outputs/s3/file_repository.rb
Defined Under Namespace
Classes: FactoryInitializer, PrefixedValue
Constant Summary
collapse
- DEFAULT_STATE_SWEEPER_INTERVAL_SECS =
60
- DEFAULT_STALE_TIME_SECS =
15 * 60
Instance Method Summary
collapse
Constructor Details
#initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) ⇒ FileRepository
Returns a new instance of FileRepository.
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 56
def initialize(tags, encoding, temporary_directory,
stale_time = DEFAULT_STALE_TIME_SECS,
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
@prefixed_factories = Concurrent::Map.new
@sweeper_interval = sweeper_interval
@factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)
start_stale_sweeper
end
|
Instance Method Details
#each_files ⇒ Object
74
75
76
77
78
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 74
def each_files
@prefixed_factories.values.each do |prefixed_file|
prefixed_file.with_lock { |factory| yield factory.current }
end
end
|
#get_factory(prefix_key) ⇒ Object
81
82
83
84
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 81
def get_factory(prefix_key)
prefix_val = @prefixed_factories.fetch_or_store(prefix_key) { @factory_initializer.create_value(prefix_key) }
prefix_val.with_lock { |factory| yield factory }
end
|
#get_file(prefix_key) ⇒ Object
86
87
88
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 86
def get_file(prefix_key)
get_factory(prefix_key) { |factory| yield factory.current }
end
|
#keys ⇒ Object
70
71
72
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 70
def keys
@prefixed_factories.keys
end
|
#remove_stale(k, v) ⇒ Object
98
99
100
101
102
103
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 98
def remove_stale(k, v)
if v.stale?
@prefixed_factories.delete_pair(k, v)
v.delete!
end
end
|
#shutdown ⇒ Object
90
91
92
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 90
def shutdown
stop_stale_sweeper
end
|
#size ⇒ Object
94
95
96
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 94
def size
@prefixed_factories.size
end
|
#start_stale_sweeper ⇒ Object
105
106
107
108
109
110
111
112
113
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 105
def start_stale_sweeper
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
LogStash::Util.set_thread_name("S3, Stale factory sweeper")
@prefixed_factories.each { |k, v| remove_stale(k,v) }
end
@stale_sweeper.execute
end
|
#stop_stale_sweeper ⇒ Object
115
116
117
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 115
def stop_stale_sweeper
@stale_sweeper.shutdown
end
|