The DataStax Node.js driver provides a set of utilities for concurrent query execution, to facilitate executing multiple queries in parallel while controlling the concurrency level.
The concurrent execution API can useful when, for example, you want to insert a large group of rows from an Array
or
a Stream
and evaluate failures, if any, at the end.
When an Array
of arrays is provided, one query per each item in the Array
will be executed, using each item as
parameters.
const query = 'INSERT INTO table1 (id, value) VALUES (?, ?)';
const parameters = [[1, 'a'], [2, 'b'], [3, 'c'], ]; // ...
const result = await executeConcurrent(client, query, parameters);
You can visit the code examples in the driver repository to check out a working example.
When a Stream
instance is provided the driver will read from the input stream and execute one query per item
emitted. The driver will throttle reads of the input stream based on the concurrency level configured and the
amount of current in-flight requests.
The Stream
instance should be a readable, in object mode, and emit Array
instances. Per each item emitted, one
query will be executed.
const stream = csvStream.pipe(transformLineToArrayStream);
const result = await executeConcurrent(client, query, stream);
const queryAndParameters = [
{ query: 'INSERT INTO videos (id, name, user_id) VALUES (?, ?, ?)',
params: [ id, name, userId ] },
{ query: 'INSERT INTO user_videos (user_id, id, name) VALUES (?, ?, ?)',
params: [ userId, id, name ] },
{ query: 'INSERT INTO latest_videos (id, name, user_id) VALUES (?, ?, ?)',
params: [ id, name, userId ] },
];
const result = await executeConcurrent(client, queryAndParameters);
When setting raiseOnFirstError
to false
, the driver will continue to execute the queries even when one or more
errors are encountered. The returned Promise
will be resolved and you can inspect the property errors
to obtain
each individual error information.
const result = await executeConcurrent(client, query, parameters, { raiseOnFirstError: false });
for (let err of result.errors) {
// ...
}
Use the concurrencyLevel
option property to set the maximum amount of requests that can be executed simultaneously.
It defaults to 100
.
const result = await executeConcurrent(client, query, parameters, { concurrencyLevel: 200 });
Note that increasing the amount of simultaneous requests will result in further queueing at the driver level and the server nodes level. You should find the optimal to get high throughput and low latency, based on your cluster size and hardware specifications. Using a higher concurrency level setting than optimal might result in query timeouts.
In the case you want the driver to collect each individual ResultSet
instance, you can use the collectResults
flag.
const result = await executeConcurrent(client, query, parameters, { collectResults: true });
for (let rs of result.resultItems) {
// ...
}