diff --git a/lib/index.js b/lib/index.js index 1a29efb..56ddb90 100644 --- a/lib/index.js +++ b/lib/index.js @@ -2,7 +2,7 @@ var utils = require('./pouch-utils'); var version = require('./version'); -var ldj = require('ldjson-stream'); +var ndj = require('ndjson'); var through = require('through2').obj; var pick = require('lodash.pick'); var toBufferStream = require('./to-buffer-stream'); @@ -79,8 +79,18 @@ exports.plugin.load = utils.toPromise(function (readableStream, opts, callback) batchSize = DEFAULT_BATCH_SIZE; } + // We need this variable in order to call the callback only once. + // The stream is not closed when the 'error' event is emitted. + var error = null; + var queue = []; - readableStream.pipe(toBufferStream()).pipe(ldj.parse()).pipe(through(function (data, _, next) { + readableStream + .pipe(toBufferStream()) + .pipe(ndj.parse()) + .on('error', function (errorCatched) { + error = errorCatched; + }) + .pipe(through(function (data, _, next) { if (!data.docs) { return next(); } @@ -106,7 +116,7 @@ exports.plugin.load = utils.toPromise(function (readableStream, opts, callback) .pipe(this.createWriteStream({new_edits: false})) .on('error', callback) .on('finish', function () { - callback(null, {ok: true}); + callback(error, {ok: true}); }); }); diff --git a/lib/writable-stream.js b/lib/writable-stream.js index 094ed74..31df6eb 100644 --- a/lib/writable-stream.js +++ b/lib/writable-stream.js @@ -7,7 +7,7 @@ var ERROR_REV_CONFLICT = { name: 'conflict', message: 'Document update conflict' }; -var ldj = require('ldjson-stream'); +var ndj = require('ndjson'); var ERROR_MISSING_DOC = { status: 404, name: 'not_found', @@ -16,14 +16,14 @@ var ERROR_MISSING_DOC = { function WritableStreamPouch(opts, callback) { var api = this; api.instanceId = Math.random().toString(); - api.ldj = ldj.serialize(); + api.ndj = ndj.serialize(); api.localStore = {}; api.originalName = opts.name; // TODO: I would pass this in as a constructor opt, but // PouchDB changed how it clones in 5.0.0 so this broke api.setupStream = function (stream) { - api.ldj.pipe(stream); + api.ndj.pipe(stream); }; /* istanbul ignore next */ @@ -42,7 +42,7 @@ function WritableStreamPouch(opts, callback) { if (opts.new_edits === false) { // assume we're only getting this with new_edits=false, // since this adapter is just a replication target - this.ldj.write({docs: docs}, function () { + this.ndj.write({docs: docs}, function () { callback(null, docs.map(function (doc) { return { ok: true, @@ -70,7 +70,7 @@ function WritableStreamPouch(opts, callback) { }; api._close = function (callback) { - this.ldj.end(callback); + this.ndj.end(callback); }; api._getLocal = function (id, callback) { @@ -115,7 +115,7 @@ function WritableStreamPouch(opts, callback) { }; /* istanbul ignore else */ if ('last_seq' in doc) { - self.ldj.write({seq: doc.last_seq}, done); + self.ndj.write({seq: doc.last_seq}, done); } else { done(); } diff --git a/package.json b/package.json index 34f1b75..ef57ebb 100644 --- a/package.json +++ b/package.json @@ -36,7 +36,7 @@ "dependencies": { "argsarray": "0.0.1", "inherits": "~2.0.1", - "ldjson-stream": "^1.2.1", + "ndjson": "^1.4.3", "lodash.pick": "^4.0.0", "pouchdb-promise": "^5.4.4", "pouch-stream": "^0.4.0", diff --git a/test/test.js b/test/test.js index 0d0c228..7255c6c 100644 --- a/test/test.js +++ b/test/test.js @@ -36,7 +36,6 @@ dbs.split(',').forEach(function (db) { }); function tests(dbName, dbType) { - var db; var remote; var out; @@ -138,7 +137,6 @@ function tests(dbName, dbType) { }); it('should replicate same _revs into the dest db', function () { - var stream = new MemoryStream(); return db.bulkDocs([ @@ -209,7 +207,6 @@ function tests(dbName, dbType) { }); it('should dump to a string', function () { - var MemoryStream = require('memorystream'); var dumpedString = ''; @@ -248,5 +245,24 @@ function tests(dbName, dbType) { docs.rows[0].id.should.equal('1'); }); }); + + it('should reject the promise when the source is not a stream', function () { + return remote + .load('foo') + .catch(function (err) { + err.should.be.a('error', 'TypeError: readableStream.pipe is not a function'); + }); + }); + + it('should reject the promise when the json is wrongly formatted', function () { + var writeStream = new MemoryStream(); + writeStream.end('foo'); + + return remote + .load(writeStream) + .catch(function (err) { + err.should.be.a('error', 'Error: Could not parse row foo...'); + }); + }); }); }