Method: Mongo::Client#watch

Defined in:
lib/mongo/client.rb

#watch(pipeline = [], options = {}) ⇒ ChangeStream

Note:

A change stream only allows ‘majority’ read concern.

Note:

This helper method is preferable to running a raw aggregation with a $changeStream stage, for the purpose of supporting resumability.

As of version 3.6 of the MongoDB server, a “$changeStream“ pipeline stage is supported in the aggregation framework. As of version 4.0, this stage allows users to request that notifications are sent for all changes that occur in the client’s cluster.

Examples:

Get change notifications for the client’s cluster.

client.watch([{ '$match' => { operationType: { '$in' => ['insert', 'replace'] } } }])

Options Hash (options):

  • :full_document (String)

    Allowed values: nil, ‘default’, ‘updateLookup’, ‘whenAvailable’, ‘required’.

    The default is to not send a value (i.e. nil), which is equivalent to ‘default’. By default, the change notification for partial updates will include a delta describing the changes to the document.

    When set to ‘updateLookup’, the change notification for partial updates will include both a delta describing the changes to the document as well as a copy of the entire document that was changed from some time after the change occurred.

    When set to ‘whenAvailable’, configures the change stream to return the post-image of the modified document for replace and update change events if the post-image for this event is available.

    When set to ‘required’, the same behavior as ‘whenAvailable’ except that an error is raised if the post-image is not available.

  • :full_document_before_change (String)

    Allowed values: nil, ‘whenAvailable’, ‘required’, ‘off’.

    The default is to not send a value (i.e. nil), which is equivalent to ‘off’.

    When set to ‘whenAvailable’, configures the change stream to return the pre-image of the modified document for replace, update, and delete change events if it is available.

    When set to ‘required’, the same behavior as ‘whenAvailable’ except that an error is raised if the pre-image is not available.

  • :resume_after (BSON::Document, Hash)

    Specifies the logical starting point for the new change stream.

  • :max_await_time_ms (Integer)

    The maximum amount of time for the server to wait on new documents to satisfy a change stream query.

  • :batch_size (Integer)

    The number of documents to return per batch.

  • :collation (BSON::Document, Hash)

    The collation to use.

  • :session (Session)

    The session to use.

  • :start_at_operation_time (BSON::Timestamp)

    Only return changes that occurred at or after the specified timestamp. Any command run against the server will return a cluster time that can be used here. Only recognized by server versions 4.0+.

  • :comment (Object)

    A user-provided comment to attach to this command.

  • :show_expanded_events (Boolean)

    Enables the server to send the ‘expanded’ list of change stream events. The list of additional events included with this flag set are: createIndexes, dropIndexes, modify, create, shardCollection, reshardCollection, refineCollectionShardKey.

Since:

  • 2.6.0



1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
# File 'lib/mongo/client.rb', line 1112

def watch(pipeline = [], options = {})
  return use(Database::ADMIN).watch(pipeline, options) unless database.name == Database::ADMIN

  view_options = options.dup
  view_options[:cursor_type] = :tailable_await if options[:max_await_time_ms]

  Mongo::Collection::View::ChangeStream.new(
    Mongo::Collection::View.new(self["#{Database::COMMAND}.aggregate"], {}, view_options),
    pipeline,
    Mongo::Collection::View::ChangeStream::CLUSTER,
    options)
end