Skip to content
Eugene Lazutkin edited this page Jun 12, 2022 · 54 revisions

Dashboard

Node.js CI NPM version TypeScript definitions on DefinitelyTyped Known Vulnerabilities

Useful links

About

stream-json is a micro-library, which provides a set of light-weight stream components to process huge JSON files with a minimal memory footprint. It can:

  • Parse JSON files far exceeding available memory.
    • Even individual primitive data items (keys, strings, and numbers) can be streamed piece-wise.
    • Processing humongous files can take minutes and even hours. Shaving even a microsecond from each operation can save a lot of time waiting for results. That's why all stream-json components were meticulously optimized.
  • Stream using a SAX-inspired event-based API.
  • Provide utilities to handle huge Django-like JSON database dumps.
  • Support JSON Streaming protocol.
  • Follows conventions of a no-dependency micro-library stream-chain.

It was meant to be a set of building blocks for data processing pipelines organized around JSON and JavaScript objects. Users can easily create their own "blocks" using provided facilities.

Companion projects:

  • stream-csv-as-json streams huge CSV files in a format compatible with stream-json: rows as arrays of string values. If a header row is used, it can stream rows as objects with named fields.
  • stream-chain is a micro helper to create data processing pipelines from streams, functions, generators, asynchronous functions, and present them as streams suitable to be building blocks for other pipelines.
  • stream-join is a micro helper useful for merging side channels to connect different parts of a linear pipeline for more complex data processing scenarios.

Documentation 1.x

This is an overview, which can be used as a cheat sheet. Click on individual components to see detailed API documentation with examples.

The main module

The main module returns a factory function, which produces instances of Parser decorated with emit().

Parser

The heart of the package is Parser — a streaming JSON parser, which consumes text and produces a stream of tokens. Both the standard JSON and JSON Streaming are supported.

const {parser} = require('stream-json');
const pipeline = fs.createReadStream('data.json').pipe(parser());

Sometimes we are presented with an incorrect JSON input. When it happens we can use a utility: Verifier, which provides an exact position of an error.

Filters

Filters can edit a stream of tokens on the fly. The following filters are provided:

  • Pick picks out a subobject for processing ignoring the rest.
    {total: 10000000, data: [...]}
    // Pick can isolate 'data' and remove the outer object completely:
    [...]
    const {pick} = require('stream-json/filters/Pick');
    const picked = pipeline.pipe(pick({filter: 'data'}));
    • Pick can be used to select one or more subobjects and streams them out as individual objects.
    • If Pick picks more than one subobject it is usually followed by StreamValues described below.
  • Replace replaces subobjects with something else or even removes them completely. It is used to remove unnecessary details, e.g., for performance reasons.
    [
      {data: {...}, extra: {...}},
      {data: {...}, extra: {...}},
      ...
    ]
    // Replace can remove 'extra' or replace it with something else,
    // like null (the default):
    [{data: {...}, extra: null}, ...]
    const {replace} = require('stream-json/filters/Replace');
    const replaced = pipeline.pipe(replace({filter: /^\d+\.extra\b/}));
    • Ignore removes subobjects completely. It is a helper class based on Replace.
      [{data: {...}, extra: {...}}, ...]
      // Ignore can remove 'extra':
      [{data: {...}}, ...]
      const {ignore} = require('stream-json/filters/Ignore');
      const ignored = pipeline.pipe(ignore({filter: /^\d+\.extra\b/}));
  • Filter filters out subobjects preserving an original shape of incoming data.
    {total: 10000000, data: [...]}
    // Filter can isolate 'data' preserving the original shape
    {data: [...]}
    const {filter} = require('stream-json/filters/Filter');
    const filtered = pipeline.pipe(filter({filter: /^data\b/}));

Filters are used after Parser and can be chained to achieve a desirable effect.

Streamers

In many cases working at a token level can be tedious. Frequently, while a source file is huge, individual data pieces are relatively small and can fit in memory. A typical example is a database dump.

Additionally, all streamers support efficient and flexible filtering of data items complimenting and sometimes eliminating token-level filters. See objectFilter option in StreamBase for more details.

stream-json provides the following streaming helpers:

  • StreamValues assumes that a token stream represents subsequent values and stream them out one by one. It usually happens in two major cases:
    • We parse a JSON Streaming source:
      1 "a" [] {} true
      // StreamValues will produce an object stream:
      {key: 0, value: 1}
      {key: 1, value: 'a'}
      {key: 2, value: []}
      {key: 3, value: {}}
      {key: 4, value: true}
      const {streamValues} = require('stream-json/streamers/StreamValues');
      const stream = pipeline.pipe(streamValues());
    • We Pick several subobjects:
      [{..., value: ...}, {..., value: ...}, ...]
      // Pick can isolate 'value' and stream them separately:
      [...]
      const stream = pipeline
        .pipe(pick({filter: /\bvalue\b/i}))
        .pipe(streamValues());
  • StreamArray assumes that a token stream represents an array of objects and streams out assembled JavaScript objects. Only one array is valid as input.
    [1, "a", [], {}, true]
    // StreamArray will produce an object stream:
    {key: 0, value: 1}
    {key: 1, value: 'a'}
    {key: 2, value: []}
    {key: 3, value: {}}
    {key: 4, value: true}
    const {streamArray} = require('stream-json/streamers/StreamArray');
    const stream = pipeline.pipe(streamArray());
  • StreamObject assumes that a token stream represents an object and streams out its top-level properties. Only one object is valid as input.
    {"a": 1, "b": "a", "c": [], "d": {}, "e": true}
    // StreamObject will produce an object stream:
    {key: 'a', value: 1}
    {key: 'b', value: 'a'}
    {key: 'c', value: []}
    {key: 'd', value: {}}
    {key: 'e', value: true}
    const {streamObject} = require('stream-json/streamers/StreamObject');
    const stream = pipeline.pipe(streamObject());

Streamers are used after Parser and optional filters. All of them support efficient filtering of objects while assembling: if it was determined that we have no interest in a certain object, it will be abandoned and skipped without spending any more time on it.

Essentials

Classes and functions to make streaming data processing enjoyable:

  • Assembler receives a token stream and assembles JavaScript objects. It is used as a building block for streamers.
    const {chain} = require('stream-chain');
    const Asm = require('stream-json/Assembler');
    
    const pipeline = chain([
      fs.createReadStream('data.json.gz'),
      zlib.createGunzip(),
      parser()
    ]);
    
    const asm = Asm.connectTo(pipeline);
    asm.on('done', asm => console.log(asm.current));
  • Disassembler is a Transform stream. It receives a stream of JavaScript objects and converts them to a token stream. It is useful to edit objects using stream algorithms and with an alternative source of objects.
    const {disassembler} = require('stream-json/Disassembler');
    
    const pipeline = chain([
      fs.createReadStream('array.json.gz'),
      zlib.createGunzip(),
      parser(),
      streamArray(),
      disassembler(),
      pick({filter: 'value'}),
      streamValues()
    ]);
  • Stringer is a Transform stream. It receives a token stream and converts it to text representing a JSON object. It is very useful when you want to edit a stream with filters and custom code, and save it back to a file.
    const {stringer} = require('stream-json/Stringer');
    
    chain([
      fs.createReadStream('data.json.gz'),
      zlib.createGunzip(),
      parser(),
      pick({filter: 'data'}),
      stringer(),
      zlib.createGzip(),
      fs.createWriteStream('edited.json.gz')
    ]);
  • Emitter is a Writable stream. It consumes a token stream and emits tokens as events on itself.
    const {emitter} = require('stream-json/Emitter');
    
    const e = emitter();
    
    chain([
      fs.createReadStream('data.json'),
      parser(),
      e
    ]);
    
    let counter = 0;
    e.on('startObject', () => ++counter);
    e.on('finish', () => console.log(counter, 'objects'));

Utilities

The following functions are included:

  • emit() listens to a token stream and emits tokens as events on that stream. This is a light-weight version of Emitter.
    const emit = require('stream-json/utils/emit');
    
    const pipeline = chain([
      fs.createReadStream('data.json'),
      parser()
    ]);
    emit(pipeline);
    
    let counter = 0;
    pipeline.on('startObject', () => ++counter);
    pipeline.on('finish', () => console.log(counter, 'objects'));
    When the main module is requested, it returns a function, which creates a Parser instance, then it applies emit() so the user can use this simple API for immediate processing.
  • withParser() creates an instance of Parser, creates an instance of a data stream with a provided function, connects them, and returns as a chain.
    const withParser = require('stream-json/utils/withParser');
    
    const pipeline = withParser(pick, {filter: 'data'});
    Each stream provided by stream-json implements withParser(options) as a static method:
    const StreamArray = require('stream-json/streamers/StreamArray');
    
    const pipeline = StreamArray.withParser();
  • Batch accepts items and packs them into an array of a configurable size. It can be used as a performance optimization helper.
  • Verifier reads a text input and either completes successfully or fails with an error. This error points to an offset, line, and position of an error. It is a Writable stream.
  • Utf8Stream reads buffers of text, which can cut off in the middle of a multibyte utf8 symbol. It sends downstream correctly sanitized text with all symbols properly decoded. It can be used as a base class for text-processing streams. All parsers use it as a foundation.

Special helpers

JSONL

In order to support JSONL more efficiently, the following helpers are provided:

  • jsonl/Parser reads a JSONL file and produces a stream of JavaScript objects like StreamValues.

  • jsonl/Stringer produces a JSONL file from a stream of JavaScript objects.

    const {stringer} = require('stream-json/jsonl/Stringer');
    const {parser} = require('stream-json/jsonl/Parser');
    const {chain}  = require('stream-chain');
    
    const fs = require('fs');
    const zlib = require('zlib');
    
    // roundtrips data
    const pipeline = chain([
      fs.createReadStream('sample1.jsonl.br'),
      zlib.createBrotliDecompress(),
      parser(),
      data => data.value,
      stringer(),
      zlib.createBrotliCompress(),
      fs.createWriteStream('sample2.jsonl.br')
    ]);

Advanced use

General FAQ

Frequently asked questions and tips can be found in FAQ.

Performance tuning

Performance considerations are discussed in a separate document dedicated to Performance.

Migrating from a previous version

Documentation 0.6.x

README, which includes the documentation.

Credits

The test file tests/sample.json.gz is a combination of several publicly available datasets merged and compressed with gzip:

The test file tests/sample.jsonl.gz is the first 100 rows from a snapshot of "Database of COVID-19 Research Articles" for 7/9/2020 publicly provided by CDC at this link: https://www.cdc.gov/library/docs/covid19/ONLY_New_Articles_9July2020_Excel.xlsx then converted to JSONL.