Class: LogStash::Outputs::S3::FileRepository
- Inherits:
-
Object
- Object
- LogStash::Outputs::S3::FileRepository
show all
- Defined in:
- lib/logstash/outputs/s3/file_repository.rb
Defined Under Namespace
Classes: FactoryInitializer, PrefixedValue
Constant Summary
collapse
- DEFAULT_STATE_SWEEPER_INTERVAL_SECS =
60
- DEFAULT_STALE_TIME_SECS =
15 * 60
Instance Method Summary
collapse
Constructor Details
#initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) ⇒ FileRepository
Returns a new instance of FileRepository.
64
65
66
67
68
69
70
71
72
73
74
75
76
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 64
def initialize(tags, encoding, temporary_directory,
stale_time = DEFAULT_STALE_TIME_SECS,
sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
@prefixed_factories = Concurrent::Map.new
@sweeper_interval = sweeper_interval
@factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)
start_stale_sweeper
end
|
Instance Method Details
#each_factory(prefixes) {|factory| ... } ⇒ void
This method returns an undefined value.
Yields each non-deleted file factory while the current thread has exclusive access to it.
120
121
122
123
124
125
126
127
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 120
def each_factory(prefixes)
prefixes.each do |prefix_key|
prefix_val = @prefixed_factories.get(prefix_key)
prefix_val&.with_lock do |factory|
yield factory unless prefix_val.deleted?
end
end
end
|
#each_files ⇒ Object
82
83
84
85
86
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 82
def each_files
each_factory(keys) do |factory|
yield factory.current
end
end
|
#get_factory(prefix_key) {|factory| ... } ⇒ Object
Yields the file factory while the current thread has exclusive access to it, creating a new one if one does not exist or if the current one is being reaped by the stale watcher.
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 95
def get_factory(prefix_key)
prefix_val = @prefixed_factories.get(prefix_key)
prefix_val&.with_lock do |factory|
return yield(factory) unless prefix_val.deleted?
end
prefix_val = @prefixed_factories.compute(prefix_key) do |existing|
existing && !existing.deleted? ? existing : @factory_initializer.create_value(prefix_key)
end
prefix_val.with_lock { |factory| yield factory }
end
|
#get_file(prefix_key) ⇒ Object
129
130
131
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 129
def get_file(prefix_key)
get_factory(prefix_key) { |factory| yield factory.current }
end
|
#keys ⇒ Object
78
79
80
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 78
def keys
@prefixed_factories.keys
end
|
#remove_if_stale(prefix_key) ⇒ Object
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 141
def remove_if_stale(prefix_key)
@prefixed_factories.compute_if_present(prefix_key) do |prefixed_factory|
prefixed_factory.with_lock do |_|
if prefixed_factory.stale?
prefixed_factory.delete!
nil
else
prefixed_factory
end
end
end
end
|
#shutdown ⇒ Object
133
134
135
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 133
def shutdown
stop_stale_sweeper
end
|
#size ⇒ Object
137
138
139
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 137
def size
@prefixed_factories.size
end
|
#start_stale_sweeper ⇒ Object
159
160
161
162
163
164
165
166
167
168
169
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 159
def start_stale_sweeper
@stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
LogStash::Util.set_thread_name("S3, Stale factory sweeper")
@prefixed_factories.keys.each do |prefix|
remove_if_stale(prefix)
end
end
@stale_sweeper.execute
end
|
#stop_stale_sweeper ⇒ Object
171
172
173
|
# File 'lib/logstash/outputs/s3/file_repository.rb', line 171
def stop_stale_sweeper
@stale_sweeper.shutdown
end
|