Class: LogStash::Outputs::S3::FileRepository

Inherits:
Object
  • Object
show all
Defined in:
lib/logstash/outputs/s3/file_repository.rb

Defined Under Namespace

Classes: FactoryInitializer, PrefixedValue

Constant Summary collapse

DEFAULT_STATE_SWEEPER_INTERVAL_SECS =
60
DEFAULT_STALE_TIME_SECS =
15 * 60

Instance Method Summary collapse

Constructor Details

#initialize(tags, encoding, temporary_directory, stale_time = DEFAULT_STALE_TIME_SECS, sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS) ⇒ FileRepository

Returns a new instance of FileRepository.



64
65
66
67
68
69
70
71
72
73
74
75
76
# File 'lib/logstash/outputs/s3/file_repository.rb', line 64

def initialize(tags, encoding, temporary_directory,
               stale_time = DEFAULT_STALE_TIME_SECS,
               sweeper_interval = DEFAULT_STATE_SWEEPER_INTERVAL_SECS)
  # The path need to contains the prefix so when we start
  # logtash after a crash we keep the remote structure
  @prefixed_factories = Concurrent::Map.new

  @sweeper_interval = sweeper_interval

  @factory_initializer = FactoryInitializer.new(tags, encoding, temporary_directory, stale_time)

  start_stale_sweeper
end

Instance Method Details

#each_factory(prefixes) {|factory| ... } ⇒ void

This method returns an undefined value.

Yields each non-deleted file factory while the current thread has exclusive access to it.

Parameters:

  • prefixes (Array<String>)

    : the prefix keys

Yield Parameters:



120
121
122
123
124
125
126
127
# File 'lib/logstash/outputs/s3/file_repository.rb', line 120

def each_factory(prefixes)
  prefixes.each do |prefix_key|
    prefix_val = @prefixed_factories.get(prefix_key)
    prefix_val&.with_lock do |factory|
      yield factory unless prefix_val.deleted?
    end
  end
end

#each_filesObject



82
83
84
85
86
# File 'lib/logstash/outputs/s3/file_repository.rb', line 82

def each_files
  each_factory(keys) do |factory|
    yield factory.current
  end
end

#get_factory(prefix_key) {|factory| ... } ⇒ Object

Yields the file factory while the current thread has exclusive access to it, creating a new one if one does not exist or if the current one is being reaped by the stale watcher.

Parameters:

  • prefix_key (String)

    : the prefix key

Yield Parameters:

  • factory (TemporaryFileFactory)

    : a temporary file factory that this thread has exclusive access to

Yield Returns:

  • (Object)

    : a value to return; should NOT be the factory, which should be contained by the exclusive access scope.

Returns:

  • (Object)

    : the value returned by the provided block



95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
# File 'lib/logstash/outputs/s3/file_repository.rb', line 95

def get_factory(prefix_key)

  # fast-path: if factory exists and is not deleted, yield it with exclusive access and return
  prefix_val = @prefixed_factories.get(prefix_key)
  prefix_val&.with_lock do |factory|
    # intentional local-jump to ensure deletion detection
    # is done inside the exclusive access.
    return yield(factory) unless prefix_val.deleted?
  end

  # slow-path:
  # the Concurrent::Map#get operation is lock-free, but may have returned an entry that was being deleted by
  # another thread (such as via stale detection). If we failed to retrieve a value, or retrieved one that had
  # been marked deleted, use the atomic Concurrent::Map#compute to retrieve a non-deleted entry.
  prefix_val = @prefixed_factories.compute(prefix_key) do |existing|
    existing && !existing.deleted? ? existing : @factory_initializer.create_value(prefix_key)
  end
  prefix_val.with_lock { |factory| yield factory }
end

#get_file(prefix_key) ⇒ Object



129
130
131
# File 'lib/logstash/outputs/s3/file_repository.rb', line 129

def get_file(prefix_key)
  get_factory(prefix_key) { |factory| yield factory.current }
end

#keysObject



78
79
80
# File 'lib/logstash/outputs/s3/file_repository.rb', line 78

def keys
  @prefixed_factories.keys
end

#remove_if_stale(prefix_key) ⇒ Object



141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
# File 'lib/logstash/outputs/s3/file_repository.rb', line 141

def remove_if_stale(prefix_key)
  # we use the ATOMIC `Concurrent::Map#compute_if_present` to atomically
  # detect the staleness, mark a stale prefixed factory as deleted, and delete from the map.
  @prefixed_factories.compute_if_present(prefix_key) do |prefixed_factory|
    # once we have retrieved an instance, we acquire exclusive access to it
    # for stale detection, marking it as deleted before releasing the lock
    # and causing it to become deleted from the map.
    prefixed_factory.with_lock do |_|
      if prefixed_factory.stale?
        prefixed_factory.delete! # mark deleted to prevent reuse
        nil # cause deletion
      else
        prefixed_factory # keep existing
      end
    end
  end
end

#shutdownObject



133
134
135
# File 'lib/logstash/outputs/s3/file_repository.rb', line 133

def shutdown
  stop_stale_sweeper
end

#sizeObject



137
138
139
# File 'lib/logstash/outputs/s3/file_repository.rb', line 137

def size
  @prefixed_factories.size
end

#start_stale_sweeperObject



159
160
161
162
163
164
165
166
167
168
169
# File 'lib/logstash/outputs/s3/file_repository.rb', line 159

def start_stale_sweeper
  @stale_sweeper = Concurrent::TimerTask.new(:execution_interval => @sweeper_interval) do
    LogStash::Util.set_thread_name("S3, Stale factory sweeper")

    @prefixed_factories.keys.each do |prefix|
      remove_if_stale(prefix)
    end
  end

  @stale_sweeper.execute
end

#stop_stale_sweeperObject



171
172
173
# File 'lib/logstash/outputs/s3/file_repository.rb', line 171

def stop_stale_sweeper
  @stale_sweeper.shutdown
end