Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added replication support #267

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,24 @@ module.exports.connections = {
password : 'password',
database : 'MySQL Database Name'

// OR (replication / clusterPool)
replication: {
canRetry: true,
defaultSelector: 'RR',

read: [
{host: 'localhost'}
],

write: [
{host: 'localhost', user: 'username', password: 'password'}
],

readwrite: [
{host: 'localhost', user: 'username', password: 'password'}
]
},

// OR (explicit sets take precedence)
module : 'sails-mysql',
url : 'mysql2://USER:PASSWORD@HOST:PORT/DATABASENAME'
Expand Down
80 changes: 35 additions & 45 deletions lib/adapter.js
Original file line number Diff line number Diff line change
Expand Up @@ -84,23 +84,23 @@ module.exports = (function() {


// Direct access to query
query: function(connectionName, collectionName, query, data, cb, connection) {
query: function(connectionName, collectionName, query, data, cb, connection, queryType) {

if (_.isFunction(data)) {
cb = data;
data = null;
}

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __QUERY__, cb);
return spawnConnection(queryType || 'WRITE', connectionName, __QUERY__, cb);
} else {
__QUERY__(connection, cb);
}

function __QUERY__(connection, cb) {

// Run query
log('MySQL.query: ', query);
log('MySQL.query: ', connection._clusterId || '', query);

if (data) connection.query(query, data, cb);
else connection.query(query, cb);
Expand All @@ -114,7 +114,7 @@ module.exports = (function() {
describe: function(connectionName, collectionName, cb, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __DESCRIBE__, cb);
return spawnConnection('READ', connectionName, __DESCRIBE__, cb);
} else {
__DESCRIBE__(connection, cb);
}
Expand All @@ -132,8 +132,8 @@ module.exports = (function() {
var pkQuery = 'SHOW INDEX FROM ' + tableName;

// Run query
log('MySQL.describe: ', query);
log('MySQL.describe(pk): ', pkQuery);
log('MySQL.describe: ', connection._clusterId || '', query);
log('MySQL.describe(pk): ', connection._clusterId || '', pkQuery);

connection.query(query, function __DESCRIBE__(err, schema) {
if (err) {
Expand Down Expand Up @@ -191,7 +191,7 @@ module.exports = (function() {
var self = this;

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __DEFINE__, cb);
return spawnConnection('WRITE', connectionName, __DEFINE__, cb);
} else {
__DEFINE__(connection, cb);
}
Expand Down Expand Up @@ -222,7 +222,7 @@ module.exports = (function() {


// Run query
log('MYSQL.define: ', query);
log('MYSQL.define: ', connection._clusterId || '', query);

connection.query(query, function __DEFINE__(err, result) {
if (err) return cb(err);
Expand Down Expand Up @@ -250,7 +250,7 @@ module.exports = (function() {
}

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __DROP__, cb);
return spawnConnection('WRITE', connectionName, __DROP__, cb);
} else {
__DROP__(connection, cb);
}
Expand All @@ -270,7 +270,7 @@ module.exports = (function() {
var query = 'DROP TABLE ' + tableName;

// Run query
log('MYSQL.drop: ', query);
log('MYSQL.drop: ', connection._clusterId || '', query);

connection.query(query, function __DROP__(err, result) {
if (err) {
Expand All @@ -294,7 +294,7 @@ module.exports = (function() {
addAttribute: function (connectionName, collectionName, attrName, attrDef, cb, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __ADD_ATTRIBUTE__, cb);
return spawnConnection('WRITE', connectionName, __ADD_ATTRIBUTE__, cb);
} else {
__ADD_ATTRIBUTE__(connection, cb);
}
Expand All @@ -308,7 +308,7 @@ module.exports = (function() {
var query = sql.addColumn(tableName, attrName, attrDef);

// Run query
log('MYSQL.addAttribute: ', query);
log('MYSQL.addAttribute: ', connection._clusterId || '', query);

connection.query(query, function(err, result) {
if (err) return cb(err);
Expand All @@ -324,7 +324,7 @@ module.exports = (function() {
removeAttribute: function (connectionName, collectionName, attrName, cb, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __REMOVE_ATTRIBUTE__, cb);
return spawnConnection('WRITE', connectionName, __REMOVE_ATTRIBUTE__, cb);
} else {
__REMOVE_ATTRIBUTE__(connection, cb);
}
Expand All @@ -338,7 +338,7 @@ module.exports = (function() {
var query = sql.removeColumn(tableName, attrName);

// Run query
log('MYSQL.removeAttribute: ', query);
log('MYSQL.removeAttribute: ', connection._clusterId || '', query);

connection.query(query, function(err, result) {
if (err) return cb(err);
Expand All @@ -358,7 +358,7 @@ module.exports = (function() {
create: function(connectionName, collectionName, data, cb, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __CREATE__, cb);
return spawnConnection('WRITE', connectionName, __CREATE__, cb);
} else {
__CREATE__(connection, cb);
}
Expand Down Expand Up @@ -389,7 +389,7 @@ module.exports = (function() {
}

// Run query
log('MySQL.create: ', _query.query);
log('MySQL.create: ', connection._clusterId || '', _query.query);

connection.query(_query.query, function(err, result) {
if (err) return cb( handleQueryError(err) );
Expand Down Expand Up @@ -419,7 +419,7 @@ module.exports = (function() {
createEach: function (connectionName, collectionName, valuesList, cb, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __CREATE_EACH__, cb);
return spawnConnection('WRITE', connectionName, __CREATE_EACH__, cb);
} else {
__CREATE_EACH__(connection, cb);
}
Expand Down Expand Up @@ -453,7 +453,7 @@ module.exports = (function() {
}

// Run query
log('MySQL.createEach: ', _query.query);
log('MySQL.createEach: ', connection._clusterId || '', _query.query);

connection.query(_query.query, function(err, results) {
if (err) return cb( handleQueryError(err) );
Expand All @@ -480,7 +480,7 @@ module.exports = (function() {
var query = 'SELECT * FROM ' + mysql.escapeId(tableName) + ' WHERE ' + mysql.escapeId(pk) + ' IN (' + records + ');';

// Run Query returing results
log('MYSQL.createEach: ', query);
log('MYSQL.createEach: ', connection._clusterId || '', query);

connection.query(query, function(err, results) {
if(err) return cb(err);
Expand All @@ -502,7 +502,7 @@ module.exports = (function() {
join: function (connectionName, collectionName, options, cb, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __JOIN__, cb);
return spawnConnection('READ', connectionName, __JOIN__, cb);
} else {
__JOIN__(connection, cb);
}
Expand Down Expand Up @@ -798,7 +798,7 @@ module.exports = (function() {
find: function(connectionName, collectionName, options, cb, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __FIND__, cb);
return spawnConnection('READ', connectionName, __FIND__, cb);
} else {
__FIND__(connection, cb);
}
Expand Down Expand Up @@ -829,7 +829,7 @@ module.exports = (function() {
}

// Run query
log('MYSQL.find: ', _query.query[0]);
log('MYSQL.find: ', connection._clusterId || '', _query.query[0]);

connection.query(_query.query[0], function(err, result) {
if(err) return cb(err);
Expand All @@ -845,7 +845,7 @@ module.exports = (function() {
count: function(connectionName, collectionName, options, cb, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __COUNT__, cb);
return spawnConnection('READ', connectionName, __COUNT__, cb);
} else {
__COUNT__(connection, cb);
}
Expand Down Expand Up @@ -876,7 +876,7 @@ module.exports = (function() {
}

// Run query
log('MYSQL.count: ', _query.query[0]);
log('MYSQL.count: ', connection._clusterId || '', _query.query[0]);

connection.query(_query.query[0], function(err, result) {
if(err) return cb(err);
Expand All @@ -892,7 +892,7 @@ module.exports = (function() {
stream: function(connectionName, collectionName, options, stream, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __STREAM__);
return spawnConnection('READ', connectionName, __STREAM__);
} else {
__STREAM__(connection);
}
Expand All @@ -904,21 +904,10 @@ module.exports = (function() {
var tableName = collectionName;

// Build find query
var schema = connectionObject.schema;
var _query;

var sequel = new Sequel(schema, sqlOptions);

// Build a query for the specific query strategy
try {
_query = sequel.find(collectionName, options);
} catch(e) {
return cb(e);
}
var query = _query.query[0];
var query = sql.selectQuery(tableName, options);

// Run query
log('MySQL.stream: ', query);
log('MySQL.stream: ', connection._clusterId || '', query);

var dbStream = connection.query(query);

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems odd, I may have squished someone elses commit here.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try pulling in the latest from master

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bump? :)

Expand Down Expand Up @@ -951,7 +940,7 @@ module.exports = (function() {
update: function(connectionName, collectionName, options, values, cb, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __UPDATE__, cb);
return spawnConnection('WRITE', connectionName, __UPDATE__, cb);
} else {
__UPDATE__(connection, cb);
}
Expand All @@ -974,7 +963,7 @@ module.exports = (function() {
return cb(e);
}

log('MySQL.update(before): ', _query.query[0]);
log('MySQL.update(before): ', connection._clusterId || '', _query.query[0]);

connection.query(_query.query[0], function(err, results) {
if(err) return cb(err);
Expand Down Expand Up @@ -1009,7 +998,7 @@ module.exports = (function() {
}

// Run query
log('MySQL.update: ', _query.query);
log('MySQL.update: ', connection._clusterId || '', _query.query);

connection.query(_query.query, function(err, result) {
if (err) return cb( handleQueryError(err) );
Expand All @@ -1031,7 +1020,7 @@ module.exports = (function() {
}

// Run query
log('MySQL.update(after): ', _query.query[0]);
log('MySQL.update(after): ', connection._clusterId || '', _query.query[0]);

connection.query(_query.query[0], function(err, result) {
if(err) return cb(err);
Expand All @@ -1047,7 +1036,7 @@ module.exports = (function() {
destroy: function(connectionName, collectionName, options, cb, connection) {

if(_.isUndefined(connection)) {
return spawnConnection(connectionName, __DESTROY__, cb);
return spawnConnection('WRITE', connectionName, __DESTROY__, cb);
} else {
__DESTROY__(connection, cb);
}
Expand Down Expand Up @@ -1079,7 +1068,7 @@ module.exports = (function() {
},

destroyRecords: ['findRecords', function(next) {
log('MySQL.destroy: ', _query.query);
log('MySQL.destroy: ', connection._clusterId || '', _query.query);

connection.query(_query.query, next);
}]
Expand Down Expand Up @@ -1117,8 +1106,9 @@ module.exports = (function() {
* @param {Function} fn
* @param {[type]} cb
*/
function spawnConnection(connectionName, fn, cb) {
function spawnConnection(queryType, connectionName, fn, cb) {

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be careful that this doesn't break anywhere. Did you check for all uses? Does this get exposed to user code? (if so, it's a BC break).

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looked like an internal call, but someone else may be more aware of the ramifications.

_spawnConnection(
queryType,
getConnectionObject(connectionName),
fn,
wrapCallback(cb)
Expand Down
35 changes: 34 additions & 1 deletion lib/connections/register.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,13 @@ var utils = require('../utils');

module.exports = {};

function inheritConfigProperties (source, dest, properties) {
properties.forEach(function (name) {
if (_.isUndefined(dest[name])) {
dest[name] = source[name];
}
});
}

module.exports.configure = function ( connections ) {

Expand Down Expand Up @@ -70,9 +77,35 @@ module.exports.configure = function ( connections ) {

var activeConnection = connections[connection.identity];

// Create a connection pool cluster if configured to do so.
// (and set up the necessary `releaseConnection` functionality to drain it.)
if (activeConnection.config.replication) {
activeConnection.connection.poolCluster = mysql.createPoolCluster({
canRetry: activeConnection.config.replication.canRetry || true,
defaultSelector: activeConnection.config.replication.defaultSelector || 'RR'
});

activeConnection.config.replication.read = activeConnection.config.replication.read || [];
activeConnection.config.replication.write = activeConnection.config.replication.write || [];
activeConnection.config.replication.readwrite = activeConnection.config.replication.readwrite || [];

activeConnection.config.replication.read = activeConnection.config.replication.read.concat(activeConnection.config.replication.readwrite);
activeConnection.config.replication.write = activeConnection.config.replication.write.concat(activeConnection.config.replication.readwrite);

activeConnection.config.replication.read.forEach(function (config, index) {
inheritConfigProperties(activeConnection.config, config, ['user', 'password', 'database']);
activeConnection.connection.poolCluster.add('READ' + index, config);
});

activeConnection.config.replication.write.forEach(function (config, index) {
inheritConfigProperties(activeConnection.config, config, ['user', 'password', 'database']);
activeConnection.connection.poolCluster.add('WRITE' + index, config);
});

activeConnection.connection.releaseConnection = _releaseConnection.poolfully;
// Create a connection pool if configured to do so.
// (and set up the necessary `releaseConnection` functionality to drain it.)
if (activeConnection.config.pool) {
} else if (activeConnection.config.pool) {
activeConnection.connection.pool = mysql.createPool(activeConnection.config);
activeConnection.connection.releaseConnection = _releaseConnection.poolfully;
}
Expand Down
Loading