Method: Mongo::Database#watch

Defined in:
lib/mongo/database.rb

#watch(pipeline = [], options = {}) ⇒ ChangeStream

Note:

A change stream only allows ‘majority’ read concern.

Note:

This helper method is preferable to running a raw aggregation with a $changeStream stage, for the purpose of supporting resumability.

As of version 3.6 of the MongoDB server, a “$changeStream“ pipeline stage is supported in the aggregation framework. As of version 4.0, this stage allows users to request that notifications are sent for all changes that occur in the client’s database.

Examples:

Get change notifications for a given database..

database.watch([{ '$match' => { operationType: { '$in' => ['insert', 'replace'] } } }])

Parameters:

  • pipeline (Array<Hash>) (defaults to: [])

    Optional additional filter operators.

  • options (Hash) (defaults to: {})

    The change stream options.

Options Hash (options):

  • :full_document (String)

    Allowed values: nil, ‘default’, ‘updateLookup’, ‘whenAvailable’, ‘required’.

    The default is to not send a value (i.e. nil), which is equivalent to ‘default’. By default, the change notification for partial updates will include a delta describing the changes to the document.

    When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from some time after the change occurred.

    When set to ‘whenAvailable’, configures the change stream to return the post-image of the modified document for replace and update change events if the post-image for this event is available.

    When set to ‘required’, the same behavior as ‘whenAvailable’ except that an error is raised if the post-image is not available.

  • :full_document_before_change (String)

    Allowed values: nil, ‘whenAvailable’, ‘required’, ‘off’.

    The default is to not send a value (i.e. nil), which is equivalent to ‘off’.

    When set to ‘whenAvailable’, configures the change stream to return the pre-image of the modified document for replace, update, and delete change events if it is available.

    When set to ‘required’, the same behavior as ‘whenAvailable’ except that an error is raised if the pre-image is not available.

  • :resume_after (BSON::Document, Hash)

    Specifies the logical starting point for the new change stream.

  • :max_await_time_ms (Integer)

    The maximum amount of time for the server to wait on new documents to satisfy a change stream query.

  • :batch_size (Integer)

    The number of documents to return per batch.

  • :collation (BSON::Document, Hash)

    The collation to use.

  • :session (Session)

    The session to use.

  • :start_at_operation_time (BSON::Timestamp)

    Only return changes that occurred after the specified timestamp. Any command run against the server will return a cluster time that can be used here. Only recognized by server versions 4.0+.

  • :comment (Object)

    A user-provided comment to attach to this command.

  • :show_expanded_events (Boolean)

    Enables the server to send the ‘expanded’ list of change stream events. The list of additional events included with this flag set are: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection, refineCollectionShardKey.

Returns:

  • (ChangeStream)

    The change stream object.

Since:

  • 2.6.0



526
527
528
529
530
531
532
533
534
535
# File 'lib/mongo/database.rb', line 526

def watch(pipeline = [], options = {})
  view_options = options.dup
  view_options[:cursor_type] = :tailable_await if options[:max_await_time_ms]

  Mongo::Collection::View::ChangeStream.new(
    Mongo::Collection::View.new(collection("#{COMMAND}.aggregate"), {}, view_options),
    pipeline,
    Mongo::Collection::View::ChangeStream::DATABASE,
    options)
end