Skip to content

Commit 7890e48

Browse files
committed
feat: introduce a modern Connection replacement for CMAP
This introduces a new `Connection` type that is strictly callback based, for use with the forthcoming CMAP implementation. It is still an EventEmitter, but strictly for notification of command events, and coordinating events like reporting that the cluster time has been received.
1 parent 8c44044 commit 7890e48

File tree

2 files changed

+237
-0
lines changed

2 files changed

+237
-0
lines changed

lib/core/cmap/connection.js

+210
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,210 @@
1+
'use strict';
2+
3+
const EventEmitter = require('events');
4+
const MessageStream = require('./message_stream');
5+
const MongoError = require('../error').MongoError;
6+
const MongoWriteConcernError = require('../error').MongoWriteConcernError;
7+
const wp = require('../wireprotocol');
8+
const apm = require('../connection/apm');
9+
const updateSessionFromResponse = require('../sessions').updateSessionFromResponse;
10+
11+
const kStream = Symbol('stream');
12+
const kQueue = Symbol('queue');
13+
const kMessageStream = Symbol('messageStream');
14+
15+
class Connection extends EventEmitter {
16+
constructor(stream, options) {
17+
super(options);
18+
19+
this.bson = options.bson;
20+
this.description = { maxWireVersion: 5 };
21+
this.socketTimeout = typeof options.socketTimeout === 'number' ? options.socketTimeout : 360000;
22+
this.monitorCommands =
23+
typeof options.monitorCommands === 'boolean' ? options.monitorCommands : false;
24+
25+
// setup parser stream and message handling
26+
this[kQueue] = new Map();
27+
this[kMessageStream] = new MessageStream(options);
28+
this[kMessageStream].on('message', messageHandler(this));
29+
this[kStream] = stream;
30+
stream.on('error', () => {
31+
/* ignore errors, listen to `close` instead */
32+
});
33+
34+
stream.on('close', () => {
35+
this[kQueue].forEach(op => op.callback(new MongoError('Connection closed')));
36+
this[kQueue].clear();
37+
38+
this.emit('close');
39+
});
40+
41+
// hook the message stream up to the passed in stream
42+
stream.pipe(this[kMessageStream]);
43+
this[kMessageStream].pipe(stream);
44+
}
45+
46+
// the `connect` method stores the result of the handshake ismaster on the connection
47+
set ismaster(response) {
48+
this.description = response;
49+
}
50+
51+
destroy(options, callback) {
52+
if (typeof options === 'function') {
53+
callback = options;
54+
options = {};
55+
}
56+
57+
options = Object.assign({ force: false }, options);
58+
if (this[kStream] == null || this.destroyed) {
59+
this.destroyed = true;
60+
return;
61+
}
62+
63+
if (options.force) {
64+
this[kStream].destroy();
65+
this.destroyed = true;
66+
if (typeof callback === 'function') {
67+
callback(null, null);
68+
}
69+
70+
return;
71+
}
72+
73+
this[kStream].end(err => {
74+
this.destroyed = true;
75+
if (typeof callback === 'function') {
76+
callback(err, null);
77+
}
78+
});
79+
}
80+
81+
command(ns, cmd, options, callback) {
82+
// NOTE: The wire protocol methods will eventually be migrated to this class, but for now
83+
// we need to pretend we _are_ a server.
84+
const server = {
85+
description: this.description,
86+
s: {
87+
bson: this.bson,
88+
pool: { write: write.bind(this) }
89+
}
90+
};
91+
92+
wp.command(server, ns, cmd, options, callback);
93+
}
94+
}
95+
96+
function messageHandler(conn) {
97+
return function(message) {
98+
// always emit the message, in case we are streaming
99+
conn.emit('message', message);
100+
if (!conn[kQueue].has(message.responseTo)) {
101+
return;
102+
}
103+
104+
const operationDescription = conn[kQueue].get(message.responseTo);
105+
conn[kQueue].delete(message.responseTo);
106+
107+
const callback = operationDescription.cb;
108+
if (operationDescription.socketTimeoutOverride) {
109+
this[kStream].setSocketTimeout(this.socketTimeout);
110+
}
111+
112+
try {
113+
// Pass in the entire description because it has BSON parsing options
114+
message.parse(operationDescription);
115+
} catch (err) {
116+
callback(new MongoError(err));
117+
return;
118+
}
119+
120+
if (message.documents[0]) {
121+
const document = message.documents[0];
122+
const session = operationDescription.session;
123+
if (session) {
124+
updateSessionFromResponse(session, document);
125+
}
126+
127+
if (document.$clusterTime) {
128+
this.emit('clusterTimeReceived', document.$clusterTime);
129+
}
130+
131+
if (document.writeConcernError) {
132+
callback(new MongoWriteConcernError(document.writeConcernError, document));
133+
return;
134+
}
135+
136+
if (document.ok === 0 || document.$err || document.errmsg || document.code) {
137+
callback(new MongoError(document));
138+
return;
139+
}
140+
}
141+
142+
callback(null, operationDescription.fullResult ? message : message.documents[0]);
143+
};
144+
}
145+
146+
// Not meant to be called directly, the wire protocol methods call this assuming it is a `Pool` instance
147+
function write(command, options, callback) {
148+
if (typeof options === 'function') {
149+
callback = options;
150+
}
151+
152+
options = options || {};
153+
const operationDescription = {
154+
requestId: command.requestId,
155+
cb: callback,
156+
fullResult: typeof options.fullResult === 'boolean' ? options.fullResult : false,
157+
session: options.session,
158+
159+
// For BSON parsing
160+
promoteLongs: typeof options.promoteLongs === 'boolean' ? options.promoteLongs : true,
161+
promoteValues: typeof options.promoteValues === 'boolean' ? options.promoteValues : true,
162+
promoteBuffers: typeof options.promoteBuffers === 'boolean' ? options.promoteBuffers : false,
163+
raw: typeof options.raw === 'boolean' ? options.raw : false,
164+
165+
// NOTE: This property is set on the connection as part of `connect`, but should
166+
// eventually live in the `StreamDescription` attached to this connection.
167+
agreedCompressor: this.agreedCompressor
168+
};
169+
170+
if (typeof options.socketTimeout === 'number') {
171+
operationDescription.socketTimeoutOverride = true;
172+
this[kStream].setSocketTimeout(options.socketTimeout);
173+
}
174+
175+
// if command monitoring is enabled we need to modify the callback here
176+
if (this.monitorCommands) {
177+
this.emit('commandStarted', new apm.CommandStartedEvent(this, command));
178+
179+
operationDescription.started = process.hrtime();
180+
operationDescription.cb = (err, reply) => {
181+
if (err) {
182+
this.emit(
183+
'commandFailed',
184+
new apm.CommandFailedEvent(this, command, err, operationDescription.started)
185+
);
186+
} else {
187+
if (reply && reply.result && (reply.result.ok === 0 || reply.result.$err)) {
188+
this.emit(
189+
'commandFailed',
190+
new apm.CommandFailedEvent(this, command, reply.result, operationDescription.started)
191+
);
192+
} else {
193+
this.emit(
194+
'commandSucceeded',
195+
new apm.CommandSucceededEvent(this, command, reply, operationDescription.started)
196+
);
197+
}
198+
}
199+
200+
if (typeof cb === 'function') {
201+
callback(err, reply);
202+
}
203+
};
204+
}
205+
206+
this[kQueue].set(operationDescription.requestId, operationDescription);
207+
this[kMessageStream].writeCommand(command, operationDescription);
208+
}
209+
210+
module.exports = Connection;
+27
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
'use strict';
2+
3+
const Connection = require('../../../lib/core/cmap/connection');
4+
const connect = require('../../../lib/core/connection/connect');
5+
const expect = require('chai').expect;
6+
const BSON = require('bson');
7+
8+
describe('Connection', function() {
9+
it('should execute a command against a server', function(done) {
10+
const connectOptions = Object.assign(
11+
{ connectionType: Connection, bson: new BSON() },
12+
this.configuration.options
13+
);
14+
15+
connect(connectOptions, (err, conn) => {
16+
expect(err).to.not.exist;
17+
18+
conn.command('admin.$cmd', { ismaster: 1 }, (err, ismaster) => {
19+
expect(err).to.not.exist;
20+
expect(ismaster).to.exist;
21+
expect(ismaster.ok).to.equal(1);
22+
23+
conn.destroy(done);
24+
});
25+
});
26+
});
27+
});

0 commit comments

Comments
 (0)