Skip to content

Commit b629b21

Browse files
authored
feat(db): add database-level aggregation
Fixes NODE-1783
1 parent 3b6957d commit b629b21

File tree

8 files changed

+514
-125
lines changed

8 files changed

+514
-125
lines changed

lib/collection.js

+2-93
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@ const deprecate = require('util').deprecate;
44
const deprecateOptions = require('./utils').deprecateOptions;
55
const checkCollectionName = require('./utils').checkCollectionName;
66
const ObjectID = require('mongodb-core').BSON.ObjectID;
7-
const AggregationCursor = require('./aggregation_cursor');
87
const MongoError = require('mongodb-core').MongoError;
98
const toError = require('./utils').toError;
109
const normalizeHintField = require('./utils').normalizeHintField;
@@ -19,10 +18,10 @@ const unordered = require('./bulk/unordered');
1918
const ordered = require('./bulk/ordered');
2019
const ChangeStream = require('./change_stream');
2120
const executeOperation = require('./utils').executeOperation;
22-
const applyWriteConcern = require('./utils').applyWriteConcern;
2321
const resolveReadPreference = require('./utils').resolveReadPreference;
2422

2523
// Operations
24+
const aggregate = require('./operations/aggregate').aggregate;
2625
const bulkWrite = require('./operations/collection_ops').bulkWrite;
2726
const checkForAtomicOperators = require('./operations/collection_ops').checkForAtomicOperators;
2827
const count = require('./operations/collection_ops').count;
@@ -1733,97 +1732,7 @@ Collection.prototype.aggregate = function(pipeline, options, callback) {
17331732
pipeline = args;
17341733
}
17351734

1736-
// Ignore readConcern option
1737-
let ignoreReadConcern = false;
1738-
1739-
// Build the command
1740-
const command = { aggregate: this.s.name, pipeline: pipeline };
1741-
1742-
// If out was specified
1743-
if (typeof options.out === 'string') {
1744-
pipeline.push({ $out: options.out });
1745-
// Ignore read concern
1746-
ignoreReadConcern = true;
1747-
} else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
1748-
ignoreReadConcern = true;
1749-
}
1750-
1751-
// Decorate command with writeConcern if out has been specified
1752-
if (
1753-
pipeline.length > 0 &&
1754-
pipeline[pipeline.length - 1]['$out'] &&
1755-
this.s.topology.capabilities().commandsTakeWriteConcern
1756-
) {
1757-
applyWriteConcern(command, { db: this.s.db, collection: this }, options);
1758-
}
1759-
1760-
// Have we specified collation
1761-
try {
1762-
decorateWithCollation(command, this, options);
1763-
} catch (err) {
1764-
if (typeof callback === 'function') return callback(err, null);
1765-
throw err;
1766-
}
1767-
1768-
// If we have bypassDocumentValidation set
1769-
if (options.bypassDocumentValidation === true) {
1770-
command.bypassDocumentValidation = options.bypassDocumentValidation;
1771-
}
1772-
1773-
// Do we have a readConcern specified
1774-
if (!ignoreReadConcern) {
1775-
decorateWithReadConcern(command, this, options);
1776-
}
1777-
1778-
// If we have allowDiskUse defined
1779-
if (options.allowDiskUse) command.allowDiskUse = options.allowDiskUse;
1780-
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
1781-
1782-
// If we are giving a hint
1783-
if (options.hint) command.hint = options.hint;
1784-
1785-
options = Object.assign({}, options);
1786-
// Ensure we have the right read preference inheritance
1787-
options.readPreference = resolveReadPreference(options, { db: this.s.db, collection: this });
1788-
1789-
// If explain has been specified add it
1790-
if (options.explain) {
1791-
if (command.readConcern || command.writeConcern) {
1792-
throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern');
1793-
}
1794-
command.explain = options.explain;
1795-
}
1796-
1797-
if (typeof options.comment === 'string') command.comment = options.comment;
1798-
1799-
// Validate that cursor options is valid
1800-
if (options.cursor != null && typeof options.cursor !== 'object') {
1801-
throw toError('cursor options must be an object');
1802-
}
1803-
1804-
options.cursor = options.cursor || {};
1805-
if (options.batchSize) options.cursor.batchSize = options.batchSize;
1806-
command.cursor = options.cursor;
1807-
1808-
// promiseLibrary
1809-
options.promiseLibrary = this.s.promiseLibrary;
1810-
1811-
// Set the AggregationCursor constructor
1812-
options.cursorFactory = AggregationCursor;
1813-
if (typeof callback !== 'function') {
1814-
if (!this.s.topology.capabilities()) {
1815-
throw new MongoError('cannot connect to server');
1816-
}
1817-
1818-
// Allow disk usage command
1819-
if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse;
1820-
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
1821-
1822-
// Execute the cursor
1823-
return this.s.topology.cursor(this.s.namespace, command, options);
1824-
}
1825-
1826-
return handleCallback(callback, null, this.s.topology.cursor(this.s.namespace, command, options));
1735+
return aggregate(this.s.db, this, pipeline, options, callback);
18271736
};
18281737

18291738
/**

lib/db.js

+46
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ const CONSTANTS = require('./constants');
2323

2424
// Operations
2525
const addUser = require('./operations/db_ops').addUser;
26+
const aggregate = require('./operations/aggregate').aggregate;
2627
const collections = require('./operations/db_ops').collections;
2728
const createCollection = require('./operations/db_ops').createCollection;
2829
const createIndex = require('./operations/db_ops').createIndex;
@@ -263,6 +264,44 @@ Db.prototype.command = function(command, options, callback) {
263264
return executeOperation(this.s.topology, executeCommand, [this, command, options, callback]);
264265
};
265266

267+
/**
268+
* Execute an aggregation framework pipeline against the database, needs MongoDB >= 3.6
269+
* @method
270+
* @param {object} [pipeline=[]] Array containing all the aggregation framework commands for the execution.
271+
* @param {object} [options] Optional settings.
272+
* @param {(ReadPreference|string)} [options.readPreference] The preferred read preference (ReadPreference.PRIMARY, ReadPreference.PRIMARY_PREFERRED, ReadPreference.SECONDARY, ReadPreference.SECONDARY_PREFERRED, ReadPreference.NEAREST).
273+
* @param {object} [options.cursor] Return the query as cursor, on 2.6 > it returns as a real cursor on pre 2.6 it returns as an emulated cursor.
274+
* @param {number} [options.cursor.batchSize] The batchSize for the cursor
275+
* @param {boolean} [options.explain=false] Explain returns the aggregation execution plan (requires mongodb 2.6 >).
276+
* @param {boolean} [options.allowDiskUse=false] allowDiskUse lets the server know if it can use disk to store temporary results for the aggregation (requires mongodb 2.6 >).
277+
* @param {number} [options.maxTimeMS] maxTimeMS specifies a cumulative time limit in milliseconds for processing operations on the cursor. MongoDB interrupts the operation at the earliest following interrupt point.
278+
* @param {boolean} [options.bypassDocumentValidation=false] Allow driver to bypass schema validation in MongoDB 3.2 or higher.
279+
* @param {boolean} [options.raw=false] Return document results as raw BSON buffers.
280+
* @param {boolean} [options.promoteLongs=true] Promotes Long values to number if they fit inside the 53 bits resolution.
281+
* @param {boolean} [options.promoteValues=true] Promotes BSON values to native types where possible, set to false to only receive wrapper types.
282+
* @param {boolean} [options.promoteBuffers=false] Promotes Binary BSON values to native Node Buffers.
283+
* @param {object} [options.collation] Specify collation (MongoDB 3.4 or higher) settings for update operation (see 3.4 documentation for available fields).
284+
* @param {string} [options.comment] Add a comment to an aggregation command
285+
* @param {string|object} [options.hint] Add an index selection hint to an aggregation command
286+
* @param {ClientSession} [options.session] optional session to use for this operation
287+
* @param {Database~aggregationCallback} callback The command result callback
288+
* @return {(null|AggregationCursor)}
289+
*/
290+
Db.prototype.aggregate = function(pipeline, options, callback) {
291+
if (typeof options === 'function') {
292+
callback = options;
293+
options = {};
294+
}
295+
296+
// If we have no options or callback we are doing
297+
// a cursor based aggregation
298+
if (options == null && callback == null) {
299+
options = {};
300+
}
301+
302+
return aggregate(this, '1', pipeline, options, callback);
303+
};
304+
266305
/**
267306
* Return the Admin db instance
268307
* @method
@@ -281,6 +320,13 @@ Db.prototype.admin = function() {
281320
* @param {Collection} collection The collection instance.
282321
*/
283322

323+
/**
324+
* The callback format for an aggregation call
325+
* @callback Database~aggregationCallback
326+
* @param {MongoError} error An error instance representing the error during the execution.
327+
* @param {AggregationCursor} cursor The cursor if the aggregation command was executed successfully.
328+
*/
329+
284330
const collectionKeys = [
285331
'pkFactory',
286332
'readPreference',

lib/operations/aggregate.js

+121
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
'use strict';
2+
3+
const AggregationCursor = require('../aggregation_cursor');
4+
const applyWriteConcern = require('../utils').applyWriteConcern;
5+
const decorateWithCollation = require('../utils').decorateWithCollation;
6+
const decorateWithReadConcern = require('../utils').decorateWithReadConcern;
7+
const handleCallback = require('../utils').handleCallback;
8+
const MongoError = require('mongodb-core').MongoError;
9+
const resolveReadPreference = require('../utils').resolveReadPreference;
10+
const toError = require('../utils').toError;
11+
12+
const DB_AGGREGATE_COLLECTION = 1;
13+
14+
/**
15+
* Perform an aggregate operation. See Collection.prototype.aggregate or Db.prototype.aggregate for more information.
16+
*
17+
* @method
18+
* @param {Db} db A Db instance.
19+
* @param {Collection|string} coll A collection instance or the string '1', used for db.aggregate.
20+
* @param {object} [pipeline=[]] Array containing all the aggregation framework commands for the execution.
21+
* @param {object} [options] Optional settings. See Collection.prototype.aggregate or Db.prototype.aggregate for a list of options.
22+
* @param {Db~aggregationCallback|Collection~aggregationCallback} callback The command result callback
23+
*/
24+
function aggregate(db, coll, pipeline, options, callback) {
25+
const isDbAggregate = typeof coll === 'string';
26+
const target = isDbAggregate ? db : coll;
27+
const topology = target.s.topology;
28+
let ignoreReadConcern = false;
29+
30+
if (typeof options.out === 'string') {
31+
pipeline = pipeline.concat({ $out: options.out });
32+
ignoreReadConcern = true;
33+
} else if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out']) {
34+
ignoreReadConcern = true;
35+
}
36+
37+
let command;
38+
let namespace;
39+
let optionSources;
40+
41+
if (isDbAggregate) {
42+
command = { aggregate: DB_AGGREGATE_COLLECTION, pipeline: pipeline };
43+
namespace = `${db.s.databaseName}.${DB_AGGREGATE_COLLECTION}`;
44+
45+
optionSources = { db };
46+
} else {
47+
command = { aggregate: coll.s.name, pipeline: pipeline };
48+
namespace = coll.s.namespace;
49+
50+
optionSources = { db: coll.s.db, collection: coll };
51+
}
52+
53+
const takesWriteConcern = topology.capabilities().commandsTakeWriteConcern;
54+
55+
if (!ignoreReadConcern) {
56+
decorateWithReadConcern(command, target, options);
57+
}
58+
59+
if (pipeline.length > 0 && pipeline[pipeline.length - 1]['$out'] && takesWriteConcern) {
60+
applyWriteConcern(command, optionSources, options);
61+
}
62+
63+
try {
64+
decorateWithCollation(command, target, options);
65+
} catch (err) {
66+
if (typeof callback === 'function') return callback(err, null);
67+
throw err;
68+
}
69+
70+
if (options.bypassDocumentValidation === true) {
71+
command.bypassDocumentValidation = options.bypassDocumentValidation;
72+
}
73+
74+
if (typeof options.allowDiskUse === 'boolean') command.allowDiskUse = options.allowDiskUse;
75+
if (typeof options.maxTimeMS === 'number') command.maxTimeMS = options.maxTimeMS;
76+
77+
if (options.hint) command.hint = options.hint;
78+
79+
options = Object.assign({}, options);
80+
81+
// Ensure we have the right read preference inheritance
82+
options.readPreference = resolveReadPreference(options, optionSources);
83+
84+
if (options.explain) {
85+
if (command.readConcern || command.writeConcern) {
86+
throw toError('"explain" cannot be used on an aggregate call with readConcern/writeConcern');
87+
}
88+
command.explain = options.explain;
89+
}
90+
91+
if (typeof options.comment === 'string') command.comment = options.comment;
92+
93+
// Validate that cursor options is valid
94+
if (options.cursor != null && typeof options.cursor !== 'object') {
95+
throw toError('cursor options must be an object');
96+
}
97+
98+
options.cursor = options.cursor || {};
99+
if (options.batchSize) options.cursor.batchSize = options.batchSize;
100+
command.cursor = options.cursor;
101+
102+
// promiseLibrary
103+
options.promiseLibrary = target.s.promiseLibrary;
104+
105+
// Set the AggregationCursor constructor
106+
options.cursorFactory = AggregationCursor;
107+
108+
if (typeof callback !== 'function') {
109+
if (!topology.capabilities()) {
110+
throw new MongoError('cannot connect to server');
111+
}
112+
113+
return topology.cursor(namespace, command, options);
114+
}
115+
116+
return handleCallback(callback, null, topology.cursor(namespace, command, options));
117+
}
118+
119+
module.exports = {
120+
aggregate
121+
};

test/functional/aggregation_tests.js

+36
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,42 @@ describe('Aggregation', function() {
9494
}
9595
});
9696

97+
it('should correctly execute db.aggregate() with $currentOp', {
98+
metadata: {
99+
requires: {
100+
mongodb: '>=3.6.0',
101+
topology: ['single', 'replicaset', 'sharded', 'ssl', 'heap', 'wiredtiger']
102+
}
103+
},
104+
105+
test: function(done) {
106+
const client = this.configuration.newClient({ w: 1 }, { poolSize: 1 });
107+
108+
client.connect(function(err, client) {
109+
expect(err).to.be.null;
110+
111+
// get admin db for $currentOp
112+
const db = client.db('admin');
113+
114+
db.aggregate([{ $currentOp: {} }], (err, cursor) => {
115+
expect(err).to.be.null;
116+
117+
cursor.toArray((err, result) => {
118+
expect(err).to.be.null;
119+
120+
expect(result[0].command.aggregate).to.equal(1);
121+
expect(result[0].command.pipeline).to.eql([{ $currentOp: {} }]);
122+
expect(result[0].command.cursor).to.deep.equal({});
123+
expect(result[0].command['$db']).to.equal('admin');
124+
125+
client.close();
126+
done();
127+
});
128+
});
129+
});
130+
}
131+
});
132+
97133
/**
98134
* Correctly call the aggregation framework using a pipeline expressed as an argument list.
99135
*

0 commit comments

Comments
 (0)