Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/bulk redis payload processing #9

Open
wants to merge 16 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions lib/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,12 @@ let Config = {
},
},
},
// Max number of redis payloads we attempt to process before just triggering a requery
maxRedisEventsToProcess: 300,
// Debounce interval after which we flush the queued redis payloads
debounceInterval: 100,
// Maximum wait time for flushing queued redis payloads
maxDebounceWait: 3000
};

export default Config;
17 changes: 5 additions & 12 deletions lib/mongo/lib/dispatchers.js
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { Meteor } from 'meteor/meteor';
import { DDPServer } from 'meteor/ddp-server';
import { EJSON } from 'meteor/ejson';
import { Events, RedisPipe } from '../../constants';
import RedisSubscriptionManager from '../../redis/RedisSubscriptionManager';
Expand All @@ -8,21 +7,15 @@ import getDedicatedChannel from '../../utils/getDedicatedChannel';
import Config from '../../config';
import OptimisticInvocation from '../OptimisticInvocation';

const dispatchEvents = function(optimistic, collectionName, channels, events) {
const dispatchEvents = function dispatchEventsFn(optimistic, collectionName, channels, events) {
if (optimistic) {
OptimisticInvocation.withValue(true, () => {
OptimisticInvocation.withValue(true, () => {
events.forEach(event => {
const docId = event[RedisPipe.DOC]._id;
const dedicatedChannel = getDedicatedChannel(
collectionName,
docId
);
RedisSubscriptionManager.process(dedicatedChannel, event);

channels.forEach(channelName => {
RedisSubscriptionManager.process(channelName, event);
});
const dedicatedChannel = getDedicatedChannel(collectionName, docId);
RedisSubscriptionManager.process(dedicatedChannel, [event]);
});
channels.forEach(channel => RedisSubscriptionManager.process(channel, events));
});
}

Expand Down
33 changes: 15 additions & 18 deletions lib/processors/actions/requery.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,40 +4,37 @@ import { MongoIDMap } from '../../cache/mongoIdMap';

/**
* @param observableCollection
* @param newCommer
* @param event
* @param modifiedFields
* @param documentMap
*/
export default function (observableCollection, newCommer, event, modifiedFields) {
export default function (observableCollection, documentMap) {
const { store, selector, options } = observableCollection;

const newStore = new MongoIDMap();
const freshIds = observableCollection.collection.find(
selector, { ...options, fields: { _id: 1 } }).fetch();
freshIds.forEach(doc => newStore.set(doc._id, doc));

let added = false;
store.compareWith(newStore, {
// Any documents found only on the left store
// should be removed
leftOnly(docId) {
observableCollection.remove(docId);
},
// Any documents found in both and with documentMap entries
// have received redis updates indicating there are changes
both(docId) {
if (documentMap[docId]) {
observableCollection.change(documentMap[docId])
}
},
// Any documents only present in the right store are newly
// added
rightOnly(docId) {
if (newCommer && EJSON.equals(docId, newCommer._id)) {
added = true;
observableCollection.add(newCommer);
if (documentMap[docId]) {
observableCollection.add(documentMap[docId]);
} else {
observableCollection.addById(docId);
}
}
});

// if we have an update, and we have a newcommer, that new commer may be inside the ids
// TODO: maybe refactor this in a separate action (?)
if (newCommer
&& Events.UPDATE === event
&& modifiedFields
&& !added
&& store.has(newCommer._id)) {
observableCollection.change(newCommer, modifiedFields);
}
}
88 changes: 51 additions & 37 deletions lib/processors/default.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,13 @@
import { Events } from '../constants';
import { Meteor } from 'meteor/meteor';
import Config from '../config';
import RedisPipe, { Events } from '../constants';
import requery from './actions/requery';

/**
* @param observableCollection
* @param event
* @param doc
* @param modifiedFields
*/
export default function(observableCollection, event, doc, modifiedFields) {
switch (event) {
case Events.INSERT:
handleInsert(observableCollection, doc);
break;
case Events.UPDATE:
handleUpdate(observableCollection, doc, modifiedFields);
break;
case Events.REMOVE:
handleRemove(observableCollection, doc);
break;
default:
throw new Meteor.Error(`Invalid event specified: ${event}`);
}
}

/**
* @param observableCollection
* @param doc
*/
const handleInsert = function(observableCollection, doc) {
const handleInsert = (observableCollection, doc) => {
if (
!observableCollection.contains(doc._id) &&
observableCollection.isEligible(doc)
Expand All @@ -36,30 +17,63 @@ const handleInsert = function(observableCollection, doc) {
};

/**
* @param observableCollection
* @param doc
* @param modifiedFields
*/
const handleUpdate = function(observableCollection, doc, modifiedFields) {
* @param observableCollection
* @param doc
* @param modifiedFields
*/
const handleUpdate = (observableCollection, doc, modifiedFields) => {
if (observableCollection.isEligible(doc)) {
if (observableCollection.contains(doc._id)) {
observableCollection.change(doc, modifiedFields);
} else {
observableCollection.add(doc);
}
} else {
if (observableCollection.contains(doc._id)) {
observableCollection.remove(doc._id);
}
} else if (observableCollection.contains(doc._id)) {
observableCollection.remove(doc._id);
}
};

/**
* @param observableCollection
* @param doc
*/
const handleRemove = function(observableCollection, doc) {
* @param observableCollection
* @param doc
*/
const handleRemove = (observableCollection, doc) => {
if (observableCollection.contains(doc._id)) {
observableCollection.remove(doc._id);
}
};

/**
* @param observableCollection
* @param events
* @param documentMap
*/
export default function(observableCollection, events, documentMap) {
const needsRequery = events.length > Config.maxRedisEventsToProcess;

if (needsRequery) {
requery(observableCollection, documentMap);
return;
}

for (let i = 0; i < events.length; i++) {
const event = events[i];
const docId = event[RedisPipe.DOC]._id;
const modifiedFields = event[RedisPipe.FIELDS];
const doc = documentMap[docId];

switch (event[RedisPipe.EVENT]) {
case Events.INSERT:
handleInsert(observableCollection, doc);
break;
case Events.UPDATE:
handleUpdate(observableCollection, doc, modifiedFields);
break;
case Events.REMOVE:
handleRemove(observableCollection, doc);
break;
default:
throw new Meteor.Error(`Invalid event specified: ${event}`);
}
}
}
94 changes: 54 additions & 40 deletions lib/processors/direct.js
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
import { Events } from '../constants';
import { Meteor } from 'meteor/meteor';
import Config from '../config';
import RedisPipe, { Events } from '../constants';
import requery from './actions/requery';

/**
* @param observableCollection
* @param event
* @param doc
* @param modifiedFields
*/
export default function(observableCollection, event, doc, modifiedFields) {
switch (event) {
case Events.UPDATE:
handleUpdate(observableCollection, doc, modifiedFields);
break;
case Events.REMOVE:
handleRemove(observableCollection, doc);
break;
case Events.INSERT:
handleInsert(observableCollection, doc);
break;
default:
throw new Meteor.Error(`Invalid event specified: ${event}`);
}
}

/**
* @param observableCollection
* @param doc
*/
const handleInsert = function(observableCollection, doc) {
const handleInsert = (observableCollection, doc) => {
if (
!observableCollection.contains(doc._id) &&
observableCollection.isEligible(doc)
Expand All @@ -36,11 +18,11 @@ const handleInsert = function(observableCollection, doc) {
};

/**
* @param observableCollection
* @param doc
* @param modifiedFields
*/
const handleUpdate = function(observableCollection, doc, modifiedFields) {
* @param observableCollection
* @param doc
* @param modifiedFields
*/
const handleUpdate = (observableCollection, doc, modifiedFields) => {
const otherSelectors = observableCollection.__containsOtherSelectorsThanId;

if (otherSelectors) {
Expand All @@ -50,24 +32,56 @@ const handleUpdate = function(observableCollection, doc, modifiedFields) {
} else {
observableCollection.add(doc);
}
} else {
if (observableCollection.contains(doc._id)) {
} else if (observableCollection.contains(doc._id)) {
observableCollection.remove(doc._id);
}
}
} else if (observableCollection.contains(doc._id)) {
observableCollection.change(doc, modifiedFields);
} else {
if (observableCollection.contains(doc._id)) {
observableCollection.change(doc, modifiedFields);
} else {
observableCollection.add(doc);
}
observableCollection.add(doc);
}
};

/**
* @param observableCollection
* @param doc
*/
const handleRemove = function(observableCollection, doc) {
* @param observableCollection
* @param doc
*/
const handleRemove = (observableCollection, doc) => {
observableCollection.remove(doc._id);
};


/**
* @param observableCollection
* @param events
* @param documentMap
*/
export default function(observableCollection, events, documentMap) {
const needsRequery = events.length > Config.maxRedisEventsToProcess;

if (needsRequery) {
requery(observableCollection, documentMap);
return;
}

for (let i = 0; i < events.length; i++) {
const event = events[i];
const docId = event[RedisPipe.DOC]._id;
const modifiedFields = event[RedisPipe.FIELDS];
const doc = documentMap[docId];

switch (event[RedisPipe.EVENT]) {
case Events.INSERT:
handleInsert(observableCollection, doc);
break;
case Events.UPDATE:
handleUpdate(observableCollection, doc, modifiedFields);
break;
case Events.REMOVE:
handleRemove(observableCollection, doc);
break;
default:
throw new Meteor.Error(`Invalid event specified: ${event}`);
}
}
}
2 changes: 1 addition & 1 deletion lib/processors/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const StrategyProcessorMap = {
export { getStrategy }

/**
* @param strategy
* @param {String} strategy
* @returns {*}
*/
export function getProcessor(strategy) {
Expand Down
Loading
Loading