bulk_write.js 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. 'use strict';
  2. const applyRetryableWrites = require('../utils').applyRetryableWrites;
  3. const applyWriteConcern = require('../utils').applyWriteConcern;
  4. const MongoError = require('../core').MongoError;
  5. const OperationBase = require('./operation').OperationBase;
  6. class BulkWriteOperation extends OperationBase {
  7. constructor(collection, operations, options) {
  8. super(options);
  9. this.collection = collection;
  10. this.operations = operations;
  11. }
  12. execute(callback) {
  13. const coll = this.collection;
  14. const operations = this.operations;
  15. let options = this.options;
  16. // Add ignoreUndfined
  17. if (coll.s.options.ignoreUndefined) {
  18. options = Object.assign({}, options);
  19. options.ignoreUndefined = coll.s.options.ignoreUndefined;
  20. }
  21. // Create the bulk operation
  22. const bulk =
  23. options.ordered === true || options.ordered == null
  24. ? coll.initializeOrderedBulkOp(options)
  25. : coll.initializeUnorderedBulkOp(options);
  26. // Do we have a collation
  27. let collation = false;
  28. // for each op go through and add to the bulk
  29. try {
  30. for (let i = 0; i < operations.length; i++) {
  31. // Get the operation type
  32. const key = Object.keys(operations[i])[0];
  33. // Check if we have a collation
  34. if (operations[i][key].collation) {
  35. collation = true;
  36. }
  37. // Pass to the raw bulk
  38. bulk.raw(operations[i]);
  39. }
  40. } catch (err) {
  41. return callback(err, null);
  42. }
  43. // Final options for retryable writes and write concern
  44. let finalOptions = Object.assign({}, options);
  45. finalOptions = applyRetryableWrites(finalOptions, coll.s.db);
  46. finalOptions = applyWriteConcern(finalOptions, { db: coll.s.db, collection: coll }, options);
  47. const writeCon = finalOptions.writeConcern ? finalOptions.writeConcern : {};
  48. const capabilities = coll.s.topology.capabilities();
  49. // Did the user pass in a collation, check if our write server supports it
  50. if (collation && capabilities && !capabilities.commandsTakeCollation) {
  51. return callback(new MongoError('server/primary/mongos does not support collation'));
  52. }
  53. // Execute the bulk
  54. bulk.execute(writeCon, finalOptions, (err, r) => {
  55. // We have connection level error
  56. if (!r && err) {
  57. return callback(err, null);
  58. }
  59. r.insertedCount = r.nInserted;
  60. r.matchedCount = r.nMatched;
  61. r.modifiedCount = r.nModified || 0;
  62. r.deletedCount = r.nRemoved;
  63. r.upsertedCount = r.getUpsertedIds().length;
  64. r.upsertedIds = {};
  65. r.insertedIds = {};
  66. // Update the n
  67. r.n = r.insertedCount;
  68. // Inserted documents
  69. const inserted = r.getInsertedIds();
  70. // Map inserted ids
  71. for (let i = 0; i < inserted.length; i++) {
  72. r.insertedIds[inserted[i].index] = inserted[i]._id;
  73. }
  74. // Upserted documents
  75. const upserted = r.getUpsertedIds();
  76. // Map upserted ids
  77. for (let i = 0; i < upserted.length; i++) {
  78. r.upsertedIds[upserted[i].index] = upserted[i]._id;
  79. }
  80. // Return the results
  81. callback(null, r);
  82. });
  83. }
  84. }
  85. module.exports = BulkWriteOperation;