Skip to content

Commit

Permalink
(#55) - improve the error handling
Browse files Browse the repository at this point in the history
My issue

I want the `load()` API Promise to reject when the JSON is wrong formatted.

My solution

- Upgrade the ldjson-stream dependency (renamed to ndjson: max-mapper/ndjson@0b2bcd0).
- Add a listener for the error event.
  • Loading branch information
oliviertassinari authored and nolanlawson committed Sep 2, 2016
1 parent adcc213 commit ea9aff7
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 13 deletions.
16 changes: 13 additions & 3 deletions lib/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

var utils = require('./pouch-utils');
var version = require('./version');
var ldj = require('ldjson-stream');
var ndj = require('ndjson');
var through = require('through2').obj;
var pick = require('lodash.pick');
var toBufferStream = require('./to-buffer-stream');
Expand Down Expand Up @@ -79,8 +79,18 @@ exports.plugin.load = utils.toPromise(function (readableStream, opts, callback)
batchSize = DEFAULT_BATCH_SIZE;
}

// We need this variable in order to call the callback only once.
// The stream is not closed when the 'error' event is emitted.
var error = null;

var queue = [];
readableStream.pipe(toBufferStream()).pipe(ldj.parse()).pipe(through(function (data, _, next) {
readableStream
.pipe(toBufferStream())
.pipe(ndj.parse())
.on('error', function (errorCatched) {
error = errorCatched;
})
.pipe(through(function (data, _, next) {
if (!data.docs) {
return next();
}
Expand All @@ -106,7 +116,7 @@ exports.plugin.load = utils.toPromise(function (readableStream, opts, callback)
.pipe(this.createWriteStream({new_edits: false}))
.on('error', callback)
.on('finish', function () {
callback(null, {ok: true});
callback(error, {ok: true});
});
});

Expand Down
12 changes: 6 additions & 6 deletions lib/writable-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ var ERROR_REV_CONFLICT = {
name: 'conflict',
message: 'Document update conflict'
};
var ldj = require('ldjson-stream');
var ndj = require('ndjson');
var ERROR_MISSING_DOC = {
status: 404,
name: 'not_found',
Expand All @@ -16,14 +16,14 @@ var ERROR_MISSING_DOC = {
function WritableStreamPouch(opts, callback) {
var api = this;
api.instanceId = Math.random().toString();
api.ldj = ldj.serialize();
api.ndj = ndj.serialize();
api.localStore = {};
api.originalName = opts.name;

// TODO: I would pass this in as a constructor opt, but
// PouchDB changed how it clones in 5.0.0 so this broke
api.setupStream = function (stream) {
api.ldj.pipe(stream);
api.ndj.pipe(stream);
};

/* istanbul ignore next */
Expand All @@ -42,7 +42,7 @@ function WritableStreamPouch(opts, callback) {
if (opts.new_edits === false) {
// assume we're only getting this with new_edits=false,
// since this adapter is just a replication target
this.ldj.write({docs: docs}, function () {
this.ndj.write({docs: docs}, function () {
callback(null, docs.map(function (doc) {
return {
ok: true,
Expand Down Expand Up @@ -70,7 +70,7 @@ function WritableStreamPouch(opts, callback) {
};

api._close = function (callback) {
this.ldj.end(callback);
this.ndj.end(callback);
};

api._getLocal = function (id, callback) {
Expand Down Expand Up @@ -115,7 +115,7 @@ function WritableStreamPouch(opts, callback) {
};
/* istanbul ignore else */
if ('last_seq' in doc) {
self.ldj.write({seq: doc.last_seq}, done);
self.ndj.write({seq: doc.last_seq}, done);
} else {
done();
}
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@
"dependencies": {
"argsarray": "0.0.1",
"inherits": "~2.0.1",
"ldjson-stream": "^1.2.1",
"ndjson": "^1.4.3",
"lodash.pick": "^4.0.0",
"pouchdb-promise": "^5.4.4",
"pouch-stream": "^0.4.0",
Expand Down
22 changes: 19 additions & 3 deletions test/test.js
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ dbs.split(',').forEach(function (db) {
});

function tests(dbName, dbType) {

var db;
var remote;
var out;
Expand Down Expand Up @@ -138,7 +137,6 @@ function tests(dbName, dbType) {
});

it('should replicate same _revs into the dest db', function () {

var stream = new MemoryStream();

return db.bulkDocs([
Expand Down Expand Up @@ -209,7 +207,6 @@ function tests(dbName, dbType) {
});

it('should dump to a string', function () {

var MemoryStream = require('memorystream');

var dumpedString = '';
Expand Down Expand Up @@ -248,5 +245,24 @@ function tests(dbName, dbType) {
docs.rows[0].id.should.equal('1');
});
});

it('should reject the promise when the source is not a stream', function () {
return remote
.load('foo')
.catch(function (err) {
err.should.be.a('error', 'TypeError: readableStream.pipe is not a function');
});
});

it('should reject the promise when the json is wrongly formatted', function () {
var writeStream = new MemoryStream();
writeStream.end('foo');

return remote
.load(writeStream)
.catch(function (err) {
err.should.be.a('error', 'Error: Could not parse row foo...');
});
});
});
}

0 comments on commit ea9aff7

Please sign in to comment.