123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104 |
- 'use strict';
- const applyRetryableWrites = require('../utils').applyRetryableWrites;
- const applyWriteConcern = require('../utils').applyWriteConcern;
- const MongoError = require('../core').MongoError;
- const OperationBase = require('./operation').OperationBase;
- class BulkWriteOperation extends OperationBase {
- constructor(collection, operations, options) {
- super(options);
- this.collection = collection;
- this.operations = operations;
- }
- execute(callback) {
- const coll = this.collection;
- const operations = this.operations;
- let options = this.options;
- // Add ignoreUndfined
- if (coll.s.options.ignoreUndefined) {
- options = Object.assign({}, options);
- options.ignoreUndefined = coll.s.options.ignoreUndefined;
- }
- // Create the bulk operation
- const bulk =
- options.ordered === true || options.ordered == null
- ? coll.initializeOrderedBulkOp(options)
- : coll.initializeUnorderedBulkOp(options);
- // Do we have a collation
- let collation = false;
- // for each op go through and add to the bulk
- try {
- for (let i = 0; i < operations.length; i++) {
- // Get the operation type
- const key = Object.keys(operations[i])[0];
- // Check if we have a collation
- if (operations[i][key].collation) {
- collation = true;
- }
- // Pass to the raw bulk
- bulk.raw(operations[i]);
- }
- } catch (err) {
- return callback(err, null);
- }
- // Final options for retryable writes and write concern
- let finalOptions = Object.assign({}, options);
- finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
- finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);
- const writeCon = finalOptions.writeConcern ? finalOptions.writeConcern : {};
- const capabilities = coll.s.topology.capabilities();
- // Did the user pass in a collation, check if our write server supports it
- if (collation && capabilities && !capabilities.commandsTakeCollation) {
- return callback(new MongoError('server/primary/mongos does not support collation'));
- }
- // Execute the bulk
- bulk.execute(writeCon, finalOptions, (err, r) => {
- // We have connection level error
- if (!r && err) {
- return callback(err, null);
- }
- r.insertedCount = r.nInserted;
- r.matchedCount = r.nMatched;
- r.modifiedCount = r.nModified || 0;
- r.deletedCount = r.nRemoved;
- r.upsertedCount = r.getUpsertedIds().length;
- r.upsertedIds = {};
- r.insertedIds = {};
- // Update the n
- r.n = r.insertedCount;
- // Inserted documents
- const inserted = r.getInsertedIds();
- // Map inserted ids
- for (let i = 0; i < inserted.length; i++) {
- r.insertedIds[inserted[i].index] = inserted[i]._id;
- }
- // Upserted documents
- const upserted = r.getUpsertedIds();
- // Map upserted ids
- for (let i = 0; i < upserted.length; i++) {
- r.upsertedIds[upserted[i].index] = upserted[i]._id;
- }
- // Return the results
- callback(null, r);
- });
- }
- }
- module.exports = BulkWriteOperation;
|