Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ test: test-unit test-integration

test-integration:
echo 'DROPPING ALL COLLECTIONS IN "sails-mongo"'
mongo sails-mongo --eval 'db.dropDatabase()'
mongosh sails-mongo --eval 'db.dropDatabase()'
echo 'Running integration tests...'
@NODE_ENV=test node test/integration/runner.js

Expand Down
64 changes: 29 additions & 35 deletions lib/adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
var Connection = require('./connection');
var Collection = require('./collection');
var Errors = require('waterline-errors').adapter;
var ObjectId = require('mongodb').ObjectID;
var { ObjectId } = require('mongodb');
var _runJoins = require('waterline-cursor');
var util = require('util');
var _ = require('lodash');
Expand Down Expand Up @@ -38,36 +38,28 @@ module.exports = (function() {
password: null,
schema: false,

// Atlas flag - determines whether to use Atlas-specific configuration
atlas: false,

// Allow a URL Config String
url: null,

// Atlas specific defaults
authSource: 'admin',
replicaSet: null,

// DB Options
w: 1,
wtimeout: 0,
fsync: false,
journal: false,
readPreference: null,
nativeParser: false,
forceServerObjectId: false,
recordQueryStats: false,
retryMiliSeconds: 5000,
numberOfRetries: 5,
writeConcern: { w: 1 },
readPreference: 'primary',

// Server Options
ssl: false,
poolSize: 5,
socketOptions: {
noDelay: true,
keepAlive: 0,
connectTimeoutMS: 0,
socketTimeoutMS: 0
},
auto_reconnect: true,
disableDriverBSONSizeCheck: false,
reconnectInterval: 200

tls: false,
maxPoolSize: 5,
minPoolSize: 1,
connectTimeoutMS: 30000,
socketTimeoutMS: 360000,
retryWrites: true,
retryReads: true
},

/**
Expand Down Expand Up @@ -99,7 +91,7 @@ module.exports = (function() {

if(_err) {
return cb((function _createError(){
var msg = util.format('Failed to connect to MongoDB. Are you sure your configured Mongo instance is running?\n Error details:\n%s', util.inspect(_err, false, null));
var msg = util.format('Failed to connect to MongoDB. Are you sure your configured Mongo instance is running?\n Error details:\n%s', util.inspect(_err, false, null));
var err = new Error(msg);
err.originalError = _err;
return err;
Expand Down Expand Up @@ -136,20 +128,20 @@ module.exports = (function() {
var _connections = _.pluck(_.values(connections), 'connection');
if(!_connections.length) { return cb(); }

var dbs = _.pluck(_connections, 'db');
if(!dbs.length) { return cb(); }
var clients = _.pluck(_connections, 'client');
if(!clients.length) { return cb(); }

connections = {};
return async.each(dbs, function (db, onClosed) {
if(db === undefined) { return onClosed(); }
db.close(onClosed);
return async.each(clients, function (client, onClosed) {
if(client === undefined) { return onClosed(); }
client.close(onClosed);
}, cb);
}

if(!connections[conn]) return cb();

var dbConnection = connections[conn].connection.db;
dbConnection.close(function () {
var client = connections[conn].connection.client;
client.close(function () {
delete connections[conn];
cb();
});
Expand All @@ -171,10 +163,12 @@ module.exports = (function() {
var connectionObject = connections[connectionName];
var collection = connectionObject.collections[collectionName];
var schema = collection.schema;
var names = connectionObject.connection.db.listCollections(collectionName);
if(names.length > 0) return cb(null, schema);
cb();


connectionObject.connection.db.listCollections({ name: collectionName }).toArray(function(err, collections) {
if(err) return cb(err);
if(collections.length > 0) return cb(null, schema);
cb();
});
},

/**
Expand Down
79 changes: 37 additions & 42 deletions lib/collection.js
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ var _ = require('lodash'),
utils = require('./utils'),
Document = require('./document'),
Query = require('./query'),
ObjectId = require('mongodb').ObjectID,
{ ObjectId } = require('mongodb'),
Errors = require('waterline-errors').adapter;

/**
Expand Down Expand Up @@ -74,7 +74,8 @@ Collection.prototype.find = function find(criteria, cb) {
{ '$group': query.aggregateGroup }
];

return collection.aggregate(aggregate, function(err, results) {
return collection.aggregate(aggregate).toArray(function(err, results) {
if(err) return cb(err);

// Results have grouped by values under _id, so we extract them
var mapped = results.map(function(result) {
Expand All @@ -85,15 +86,15 @@ Collection.prototype.find = function find(criteria, cb) {
return result;
});

cb(err, mapped);
cb(null, mapped);
});
}

var where = query.criteria.where || {};
var queryOptions = _.omit(query.criteria, 'where');

// Run Normal Query on collection
collection.find(where, query.select, queryOptions).toArray(function(err, docs) {
collection.find(where, queryOptions).project(query.select).toArray(function(err, docs) {
if(err) return cb(err);
cb(null, utils.normalizeResults(docs, self.schema));
});
Expand Down Expand Up @@ -128,32 +129,24 @@ Collection.prototype.stream = function find(criteria, stream) {
var queryOptions = _.omit(query.criteria, 'where');

// Run Normal Query on collection
var dbStream = collection.find(where, queryOptions).stream();
var cursor = collection.find(where, queryOptions);

// For each data item
dbStream.on('data', function(item) {
// Pause stream
dbStream.pause();

cursor.on('data', function(item) {
cursor.pause();
var obj = utils.rewriteIds([item], self.schema)[0];

stream.write(obj, function() {
dbStream.resume();
cursor.resume();
});

});

// Handle error, an 'end' event will be emitted after this as well
dbStream.on('error', function(err) {
// Handle error
cursor.on('error', function(err) {
stream.end(err); // End stream
});

// all rows have been received
dbStream.on('close', function() {
stream.end();
});
// stream has ended
dbStream.on('end', function() {
// Handle end
cursor.on('end', function() {
stream.end();
});
};
Expand All @@ -178,9 +171,17 @@ Collection.prototype.insert = function insert(values, cb) {
return new Document(value, self.schema).values;
});

this.connection.db.collection(this.identity).insert(docs, function(err, results) {
this.connection.db.collection(this.identity).insertMany(docs, function(err, result) {
if(err) return cb(err);
cb(null, utils.rewriteIds(results.ops, self.schema));
if(!result || !result.insertedCount) return cb(null, []);

// Get inserted documents
self.connection.db.collection(self.identity)
.find({ _id: { $in: Object.values(result.insertedIds) }})
.toArray(function(err, docs) {
if(err) return cb(err);
cb(null, utils.rewriteIds(docs, self.schema));
});
});
};

Expand Down Expand Up @@ -220,19 +221,17 @@ Collection.prototype.update = function update(criteria, values, cb) {
// Lookup records being updated and grab their ID's
// Useful for later looking up the record after an insert
// Required because options may not contain an ID
collection.find(query.criteria.where, {_id: 1}).toArray(function(err, records) {
collection.find(query.criteria.where, { projection: { _id: 1 } }).toArray(function(err, records) {
if(err) return cb(err);
if(!records) return cb(Errors.NotFound);

// Build an array of records
var updatedRecords = [];

records.forEach(function(record) {
updatedRecords.push(record._id);
var updatedRecords = records.map(function(record) {
return record._id;
});

// Update the records
collection.update(query.criteria.where, { '$set': values }, { multi: true }, function(err, result) {
collection.updateMany(query.criteria.where, { '$set': values }, function(err, result) {
if(err) return cb(err);

// Look up newly inserted records to return the results of the update
Expand Down Expand Up @@ -269,24 +268,20 @@ Collection.prototype.destroy = function destroy(criteria, cb) {
}

var collection = this.connection.db.collection(self.identity);
collection.remove(query.criteria.where, function(err, results) {
collection.deleteMany(query.criteria.where, function(err, result) {
if(err) return cb(err);

// Force to array to meet Waterline API
var resultsArray = [];

// If result is not an array return an array
if(!Array.isArray(results)) {
resultsArray.push({ id: results });
return cb(null, resultsArray);
}

// Create a valid array of IDs
results.forEach(function(result) {
resultsArray.push({ id: result });
});
if(result && result.deletedCount) {
for(var i = 0; i < result.deletedCount; i++) {
resultsArray.push({ id: i });
}
}

cb(null, utils.rewriteIds(resultArray, self.schema));
cb(null, utils.rewriteIds(resultsArray, self.schema));
});
};

Expand Down Expand Up @@ -315,7 +310,7 @@ Collection.prototype.count = function count(criteria, cb) {
return cb(err);
}

this.connection.db.collection(this.identity).count(query.criteria.where, function(err, count) {
this.connection.db.collection(this.identity).countDocuments(query.criteria.where, function(err, count) {
if (err) return cb(err);
cb(null, count);
});
Expand Down Expand Up @@ -377,8 +372,8 @@ Collection.prototype._parseDefinition = function _parseDefinition(definition) {
});

// Set the identity
var ident = definition.tableName ? definition.tableName : definition.identity.toLowerCase();
this.identity = _.clone(ident);
var ident = definition.tableName ? definition.tableName : definition.identity.toLowerCase();
this.identity = _.clone(ident);
};

/**
Expand Down
Loading