class Mongo::Cluster::CursorReaper

A manager that sends kill cursors operations at regular intervals to close cursors that have been garbage collected without being exhausted.

@api private

@since 2.3.0

Constants

FREQUENCY

The default time interval for the cursor reaper to send pending kill cursors operations.

@since 2.3.0

Attributes

cluster[R]

Public Class Methods

new(cluster) click to toggle source

Create a cursor reaper.

@param [ Cluster ] cluster The cluster.

@api private

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 42
def initialize(cluster)
  @cluster = cluster
  @to_kill = {}
  @active_cursor_ids = Set.new
  @mutex = Mutex.new
end

Public Instance Methods

execute()
Alias for: kill_cursors
flush()
Alias for: kill_cursors
kill_cursors() click to toggle source

Execute all pending kill cursors operations.

@example Execute pending kill cursors operations.

cursor_reaper.kill_cursors

@api private

@since 2.3.0

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 121
def kill_cursors
  # TODO optimize this to batch kill cursor operations for the same
  # server/database/collection instead of killing each cursor
  # individually.

  loop do
    server_address_str = nil

    kill_spec = @mutex.synchronize do
      # Find a server that has any cursors scheduled for destruction.
      server_address_str, specs =
        @to_kill.detect { |server_address_str, specs| specs.any? }

      if specs.nil?
        # All servers have empty specs, nothing to do.
        return
      end

      # Note that this mutates the spec in the queue.
      # If the kill cursor operation fails, we don't attempt to
      # kill that cursor again.
      spec = specs.take(1).tap do |arr|
        specs.subtract(arr)
      end.first

      unless @active_cursor_ids.include?(spec.cursor_id)
        # The cursor was already killed, typically because it has
        # been iterated to completion. Remove the kill spec from
        # our records without doing any more work.
        spec = nil
      end

      spec
    end

    # If there was a spec to kill but its cursor was already killed,
    # look for another spec.
    next unless kill_spec

    # We could also pass kill_spec directly into the KillCursors
    # operation, though this would make that operation have a
    # different API from all of the other ones which accept hashes.
    spec = {
      cursor_ids: [kill_spec.cursor_id],
      coll_name: kill_spec.coll_name,
      db_name: kill_spec.db_name,
    }
    op = Operation::KillCursors.new(spec)

    server = cluster.servers.detect do |server|
      server.address.seed == server_address_str
    end

    unless server
      # TODO We currently don't have a server for the address that the
      # cursor is associated with. We should leave the cursor in the
      # queue to be killed at a later time (when the server comes back).
      next
    end

    options = {
      server_api: server.options[:server_api],
      service_id: kill_spec.service_id,
    }
    op.execute(server, context: Operation::Context.new(options: options))
  end
end
Also aliased as: execute, flush
register_cursor(id) click to toggle source

Register a cursor id as active.

@example Register a cursor as active.

cursor_reaper.register_cursor(id)

@param [ Integer ] id The id of the cursor to register as active.

@api private

@since 2.3.0

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 77
def register_cursor(id)
  if id.nil?
    raise ArgumentError, 'register_cursor called with nil cursor_id'
  end
  if id == 0
    raise ArgumentError, 'register_cursor called with cursor_id=0'
  end

  @mutex.synchronize do
    @active_cursor_ids << id
  end
end
schedule_kill_cursor(kill_spec, server) click to toggle source

Schedule a kill cursors operation to be eventually executed.

@param [ Cursor::KillSpec ] kill_spec The kill specification. @param [ Mongo::Server ] server The server to send the kill cursors

operation to.

@api private

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 58
def schedule_kill_cursor(kill_spec, server)
  @mutex.synchronize do
    if @active_cursor_ids.include?(kill_spec.cursor_id)
      @to_kill[server.address.seed] ||= Set.new
      @to_kill[server.address.seed] << kill_spec
    end
  end
end
unregister_cursor(id) click to toggle source

Unregister a cursor id, indicating that it’s no longer active.

@example Unregister a cursor.

cursor_reaper.unregister_cursor(id)

@param [ Integer ] id The id of the cursor to unregister.

@api private

@since 2.3.0

# File lib/mongo/cluster/reapers/cursor_reaper.rb, line 100
def unregister_cursor(id)
  if id.nil?
    raise ArgumentError, 'unregister_cursor called with nil cursor_id'
  end
  if id == 0
    raise ArgumentError, 'unregister_cursor called with cursor_id=0'
  end

  @mutex.synchronize do
    @active_cursor_ids.delete(id)
  end
end