module Mongo::Operation::SessionsSupported
Shared behavior of operations that support a session.
@since 2.5.2 @api private
Constants
- READ_COMMANDS
- ZERO_TIMESTAMP
Private Instance Methods
Adds $readPreference field to the command document.
$readPreference is only sent when the server is a mongos, following the rules described in github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#passing-read-preference-to-mongos. The topology does not matter for figuring out whether to send $readPreference since the decision is always made based on server type.
$readPreference is sent to OP_MSG-grokking replica set members.
@param [ Hash ] sel Existing command document which will be mutated. @param [ Server::Connection
] connection The connection that the
operation will be executed on.
# File lib/mongo/operation/shared/sessions_supported.rb, line 168 def add_read_preference(sel, connection) Lint.assert_type(connection, Server::Connection) # https://github.com/mongodb/specifications/blob/master/source/server-selection/server-selection.rst#topology-type-single read_doc = if connection.description.standalone? # Read preference is never sent to standalones. nil elsif connection.server.load_balancer? read&.to_mongos elsif connection.description.mongos? # When server is a mongos: # - $readPreference is never sent when mode is 'primary' # - Otherwise $readPreference is sent # When mode is 'secondaryPreferred' $readPreference is currently # required to only be sent when a non-mode field (i.e. tag_sets) # is present, but this causes wrong behavior (DRIVERS-1642). read&.to_mongos elsif connection.server.cluster.single? # In Single topology: # - If no read preference is specified by the application, the driver # adds mode: primaryPreferred. # - If a read preference is specified by the application, the driver # replaces the mode with primaryPreferred. read_doc = if read BSON::Document.new(read.to_doc) else BSON::Document.new end if [nil, 'primary'].include?(read_doc['mode']) read_doc['mode'] = 'primaryPreferred' end read_doc else # In replica sets, read preference is passed to the server if one # is specified by the application, except for primary read preferences. read_doc = BSON::Document.new(read&.to_doc || {}) if [nil, 'primary'].include?(read_doc['mode']) nil else read_doc end end if read_doc sel['$readPreference'] = read_doc end end
# File lib/mongo/operation/shared/sessions_supported.rb, line 97 def add_write_concern!(sel) sel[:writeConcern] = write_concern.options if write_concern end
# File lib/mongo/operation/shared/sessions_supported.rb, line 101 def apply_autocommit!(selector) session.add_autocommit!(selector) end
Adds causal consistency document to the selector, if one can be constructed and the selector is for a startTransaction command.
When operations are performed in a transaction, only the first operation (the one which starts the transaction via startTransaction) is allowed to have a read concern, and with it the causal consistency document, specified.
# File lib/mongo/operation/shared/sessions_supported.rb, line 52 def apply_causal_consistency!(selector, connection) return unless selector[:startTransaction] apply_causal_consistency_if_possible(selector, connection) end
Adds causal consistency document to the selector, if one can be constructed.
In order for the causal consistency document to be constructed, causal consistency must be enabled for the session and the session must have the current operation time. Also, topology must be replica set or sharded cluster.
# File lib/mongo/operation/shared/sessions_supported.rb, line 65 def apply_causal_consistency_if_possible(selector, connection) if !connection.description.standalone? cc_doc = session.send(:causal_consistency_doc) if cc_doc rc_doc = (selector[:readConcern] || read_concern || {}).merge(cc_doc) selector[:readConcern] = Options::Mapper.transform_values_to_strings( rc_doc) end end end
# File lib/mongo/operation/shared/sessions_supported.rb, line 80 def apply_cluster_time!(selector, connection) if !connection.description.standalone? cluster_time = [ connection.cluster_time, session&.cluster_time, ].compact.max if cluster_time selector['$clusterTime'] = cluster_time end end end
# File lib/mongo/operation/shared/sessions_supported.rb, line 113 def apply_read_pref!(selector) session.apply_read_pref!(selector) if read_command?(selector) end
# File lib/mongo/operation/shared/sessions_supported.rb, line 216 def apply_session_options(sel, connection) apply_cluster_time!(sel, connection) sel[:txnNumber] = BSON::Int64.new(txn_num) if txn_num sel.merge!(lsid: session.session_id) apply_start_transaction!(sel) apply_causal_consistency!(sel, connection) apply_autocommit!(sel) apply_txn_opts!(sel) suppress_read_write_concern!(sel) validate_read_preference!(sel) apply_txn_num!(sel) if session.recovery_token && (sel[:commitTransaction] || sel[:abortTransaction]) then sel[:recoveryToken] = session.recovery_token end if session.snapshot? unless connection.description.server_version_gte?('5.0') raise Error::SnapshotSessionInvalidServerVersion end sel[:readConcern] = {level: 'snapshot'} if session.snapshot_timestamp sel[:readConcern][:atClusterTime] = session.snapshot_timestamp end end end
# File lib/mongo/operation/shared/sessions_supported.rb, line 105 def apply_start_transaction!(selector) session.add_start_transaction!(selector) end
# File lib/mongo/operation/shared/sessions_supported.rb, line 109 def apply_txn_num!(selector) session.add_txn_num!(selector) end
# File lib/mongo/operation/shared/sessions_supported.rb, line 117 def apply_txn_opts!(selector) session.add_txn_opts!(selector, read_command?(selector)) end
# File lib/mongo/operation/shared/sessions_supported.rb, line 245 def build_message(connection, context) if self.session != context.session if self.session raise Error::InternalDriverError, "Operation session #{self.session.inspect} does not match context session #{context.session.inspect}" else # Some operations are not constructed with sessions but are # executed in a context where a session is available. # This could be OK or a driver issue. # TODO investigate. end end super.tap do |message| if session = context.session # Serialize the message to detect client-side problems, # such as invalid BSON keys. The message will be serialized again # later prior to being sent to the connection. message.serialize(BSON::ByteBuffer.new) session.update_state! end end end
# File lib/mongo/operation/shared/sessions_supported.rb, line 129 def command(connection) if Lint.enabled? unless connection.is_a?(Server::Connection) raise Error::LintError, "Connection is not a Connection instance: #{connection}" end end sel = BSON::Document.new(selector(connection)) add_write_concern!(sel) sel[Protocol::Msg::DATABASE_IDENTIFIER] = db_name add_read_preference(sel, connection) if connection.features.sessions_enabled? apply_cluster_time!(sel, connection) if session && (acknowledged_write? || session.in_transaction?) apply_session_options(sel, connection) end elsif session && session.explicit? apply_session_options(sel, connection) end sel end
# File lib/mongo/operation/shared/sessions_supported.rb, line 76 def flags acknowledged_write? ? [] : [:more_to_come] end
# File lib/mongo/operation/shared/sessions_supported.rb, line 93 def read_command?(sel) READ_COMMANDS.any? { |c| sel[c] } end
# File lib/mongo/operation/shared/sessions_supported.rb, line 121 def suppress_read_write_concern!(selector) session.suppress_read_write_concern!(selector) end
# File lib/mongo/operation/shared/sessions_supported.rb, line 125 def validate_read_preference!(selector) session.validate_read_preference!(selector) if read_command?(selector) end