Skip to content

Commit 36bc1fd

Browse files
mbroadstdaprahamian
authored andcommitted
feat(execute-operation): allow execution with server selection
This is the first step to moving all server selection into the operation executor. An aspect on the operation called `EXECUTE_WITH_SELECTION` indicates that the operation will use a variant of the `execute` method which accepts a selected `Server` instance. NOTE: the retry logic is current read specific.
1 parent 781fcfa commit 36bc1fd

File tree

2 files changed

+53
-2
lines changed

2 files changed

+53
-2
lines changed

lib/operations/connect.js

+1
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ const validOptionNames = [
133133
'minSize',
134134
'monitorCommands',
135135
'retryWrites',
136+
'retryReads',
136137
'useNewUrlParser',
137138
'useUnifiedTopology',
138139
'serverSelectionTimeoutMS',

lib/operations/execute_operation.js

+52-2
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
const MongoError = require('../core').MongoError;
44
const Aspect = require('./operation').Aspect;
55
const OperationBase = require('./operation').OperationBase;
6+
const ReadPreference = require('../core').ReadPreference;
7+
const isRetryableError = require('../core/error').isRetryableError;
68

79
/**
810
* Executes the given operation with provided arguments.
@@ -65,7 +67,11 @@ function executeOperation(topology, operation, callback) {
6567
);
6668

6769
try {
68-
return operation.execute(handler);
70+
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
71+
return executeWithServerSelection(topology, operation, handler);
72+
} else {
73+
return operation.execute(handler);
74+
}
6975
} catch (e) {
7076
handler(e);
7177
throw e;
@@ -76,11 +82,55 @@ function executeOperation(topology, operation, callback) {
7682
const handler = makeExecuteCallback(resolve, reject);
7783

7884
try {
79-
return operation.execute(handler);
85+
if (operation.hasAspect(Aspect.EXECUTE_WITH_SELECTION)) {
86+
return executeWithServerSelection(topology, operation, handler);
87+
} else {
88+
return operation.execute(handler);
89+
}
8090
} catch (e) {
8191
handler(e);
8292
}
8393
});
8494
}
8595

96+
function executeWithServerSelection(topology, operation, callback) {
97+
const readPreference = operation.readPreference || ReadPreference.primary;
98+
const shouldRetryReads = topology.s.options.retryReads !== false;
99+
100+
function callbackWithRetry(err, result) {
101+
if (err == null) {
102+
return callback(null, result);
103+
}
104+
105+
if (!isRetryableError(err)) {
106+
return callback(err);
107+
}
108+
109+
// select a new server, and attempt to retry the operation
110+
topology.selectServer(readPreference, (err, server) => {
111+
if (err) {
112+
callback(err, null);
113+
return;
114+
}
115+
116+
operation.execute(server, callback);
117+
});
118+
}
119+
120+
// select a server, and execute the operation against it
121+
topology.selectServer(readPreference, (err, server) => {
122+
if (err) {
123+
callback(err, null);
124+
return;
125+
}
126+
127+
if (operation.hasAspect(Aspect.RETRYABLE) && shouldRetryReads) {
128+
operation.execute(server, callbackWithRetry);
129+
return;
130+
}
131+
132+
operation.execute(server, callback);
133+
});
134+
}
135+
86136
module.exports = executeOperation;

0 commit comments

Comments
 (0)