Streams – Readable Stream, Writable Stream, Duplex Stream and Transform Stream

Overview

In Node.js, we use Streams for an efficient way of network communication, reading and writing of file, standard input/output etc.,

A stream receives or sends data in the form of chunks, so it provides a better user experience. Imagine watching a movie on web, if browser waits for entire movie to download, user gets bored waiting for it to load. Instead, browser should be able to play or consume the movie in chunks for better user experience.

Streaming data in chunks will also save resources, imaging that we are uploading a video to the server. If application accepts entire video file at once, it may need significant memory. If many upload requests come at once, your application may go out of memory too. However, if we receive data in chunks, and if we process it in chunks, it doesn’t need much memory. Node.js streams can read and write data in chunks.

Types of Streams

There are 4 types of Streams, Readable stream, Writable stream, Duplex stream and Transform stream.

  1. Readable stream – provides required events and functions to read data from the stream.
  2. Writable stream – provides events and functions to write data to a stream.
  3. Duplex stream – object is both readable as well as writable.
  4. Transform stream – is a Duplex stream which we can use to modify or transform the data as we write or read from a stream.

Working with Streams

There are 2 ways in which you work with streams.

  1. Using the built-in streams from modules, fs, http, zlib etc.,
  2. Creating your own stream from Readable and Writable classes

There is a built-in stream class for almost all the operations. In a very special scenario, you might end up creating your own Stream classes. In this section we will build our own streams, in further sections we will use various built-in streams.

Writable stream

A Writable stream represents the destination to which we write the data.

Methods

  • Writable.write(chunk, encoding, callback) – writes a chunk to stream
  • Writable.end(chunk, encoding, callback) – ends the stream
  • Writable.cork() – subsequent data will be buffered in memory
  • Writable.uncork() – flush the data from memory that was buffered after calling cork() method

Events

  • finish – triggers when we call end method on the stream
  • error – triggers when an error occurs while writing data to stream
  • pipe – triggers when we pass this stream into pipe method of Readable stream
  • unpipe – triggers when unpipe() method called on Readable stream passing this stream
  • drain – triggers when you can write again to the stream once it reaches highWaterMark

Attributes

  • writable – true if one can write data to stream, false if we can’t write
  • writableEnded – true if end method is already called
  • writableFinished – true if end method is called and all data is flushed
  • writableHighWaterMark – tells us how much data we can write in a chunk

Writable Stream Implementation Example

In this example, we are considering the dataArray as our destination.

const { Writable } = require("stream");

const writableStream = new Writable({
    write(chunk,encoding,callback) {
        this.dataArray[this.index] = chunk.toString();
        this.index++;
        console.log("Data written to stream.");
        callback();
    }
});

writableStream.dataArray = [];
writableStream.index = 0;

writableStream.on('finish', () => {
  console.log('Finish event called.');
  console.log("Is stream writable? " + writableStream.writable);
});

writableStream.on('close', () => {
  console.log('Close event called.');
  console.log("Is stream writable? " + writableStream.writable);
});

writableStream.on('error', () => {
  console.log('Error occurred.');
});

console.log("Is stream writable? " + writableStream.writable);
writableStream.write("alpha");
writableStream.end("beta");
writableStream.write("gamma");
console.log(writableStream.dataArray);
writableStream.destroy();

Output:

Is stream writable? true
Data written to stream.
Data written to stream.
Error occurred.
[ 'alpha', 'beta' ]
Finish event called.
Is stream writable? false

Readable streams

A Readable stream help to pass data to writable/duplex/transform streams. So, basically it represents the stream which points to the source of data and read data in chunks.

An example of Readable stream is fs.ReadStream.

Methods

  • pause() – switch the flowing mode to pause mode and stops emitting data event
  • resume() – switch the pause mode to flowing mode and starts emitting data event
  • pipe(destination) – writes data into destination whenever data is available to read
  • unpipe(destination) – data will no longer will be fed into destination stream automatically
  • read() – returns chunk of data from internal buffer; returns null in case no data is available in buffer
  • push(chunk) – adds chunk of data to internal queue which can be read by read() method or passed to the listener method; pushing null will end the stream.

Events

  • data – A readable stream will emit data event whenever there is data available in a stream to consume.
  • end – A readable stream will emit “end” event when there is no more data available in a stream to consume.
  • error – A readable stream will emit “error” event when there is an error while reading data from the stream.
  • close – A readable stream will emit “close” event when the data source object is closed.

Attributes

  • readable – true if Readable stream is not yet ended
  • readableFlowing – null if no listener is added; true if in flowing mode; false if pause() or unpipe() methods are called on Readable stream
  • readableHighWaterMark – value of highWaterMark passed while creating the Readable stream

Readable Stream Reading Modes

Readable streams have 2 reading modes:

  1. Pause Mode – When we do not add listeners to a readable stream, it will be in Pause Mode. In this mode, we have to call read() method explicitly to read data
  2. Flowing Mode – When we add listener or pipe it to destination, it will be in Flowing Mode. In this mode, data will be automatically read and fed into subscribed listeners or a data is piped into destination using pipe method on Readable stream

By default, a readable stream will be in pause mode.

Pause Mode to Flowing Mode

Readable stream switches from pause mode to flowing mode in below cases:

  • When you subscribe for a “data” event handler on a readable stream
  • When you call pipe() method on Readable stream to pipe the output to a Writable stream
  • When you call resume() method on Readable stream

Flowing Mode to Pause Mode

Readable stream switches from flowing mode to pause mode in below cases:

  • When you call pause() method on Readable stream
  • When you call unpipe() method on Readable stream

Three Flowing states of Readable stream

  1. When there are no listeners subscribed on Readable stream, we say that flowing state is null. Attribute readableFlowing value will be null.
  2. When we add a listener, stream state changes to flowing state. Attribute readableFlowing value changes to true.
  3. When we call pause() method or unpipe() method, stream state changes to non-flowing state. Attribute readableFlowing value changes to false.

Consuming Data from Readable Stream

We can consume data from the stream either of below 2 ways, but not both of them together.

  1. By subscribing to events
  2. By using pipe function

For example, you want to read a file and write the contents to another file, you can straight away use pipe function to achieve what you are looking for. Pipe function will automatically pipe all the chunks of the data from readable stream to writable stream.

However, if you want to perform this in a more customized way, you can use events based approach.

Streams inherit from EventEmitter, so we can use the published events to perform actions on the data while reading or writing from the stream.

Readable Stream Implementation Pause Mode Example

const stream = require("stream");

const readable = new stream.Readable({
  read(size) {
    this.push(this.dataArray[this.index]);
    this.index++
  }
});

readable.dataArray = ["alpha","beta"];
readable.index = 0;

console.log(readable.read().toString());
console.log(readable.read().toString());

Output:

alpha
beta

Readable Stream Implementation Flowing Mode with Listener

const stream = require("stream");

const readable = new stream.Readable({
  read(size) {
    this.push(this.dataArray[this.index]);
    this.index++
  }
});

readable.dataArray = ["alpha","beta"];
readable.index = 0;

readable.on("data", function(data) {
  console.log(data.toString());
});

Readable Stream Implementation Flowing Mode using Pipe method

const stream = require("stream");

const readable = new stream.Readable({
  read(size) {
    this.push(this.dataArray[this.index]);
    this.index++
  }
});

readable.dataArray = ["alpha","beta"];
readable.index = 0;

readable.pipe(process.stdout);

Duplex Stream

A Duplex is an object from which one can read as well as write into it. It implements both Readable and Writable. So, it acquires the methods and events from both Readable and Writable.

Duplex Stream Implementation

In this below program, we’ll implement Duplex stream. The objective of below program is that, we pass an array to Duplex stream’s constructor, then Duplex need to iterate over the list and push each element one by one. Also, if we pass Duplex stream as destination to pipe method, it will call the write method and print the values that were pushed from read method.

Since it is both Readable and Writable, it needs to implement both read and write methods. We are also passing our Duplex instance to the pipe method of itself. As MyArrayDuplex is both Readable and Writable, we are passing itself to the pipe method. This pipe method will first invoke the read method of Duplex. Within the read method, we iterate over the data and call the push method to whoever is subscribed or piped.

const stream = require("stream");

class MyArrayDuplex extends stream.Duplex {
  
  constructor(arr) {
    super();
    this.dataArr = arr;
  }

  _read(size) {
    console.log("In Duplex.read method...");
   
    this.dataArr.forEach(element => {
      this.push(element);
    });

    this.push(null);
    console.log("Exiting Duplex.read method...");
  }

  _write(chunk, encoding, callback) {
    console.log("In Duplex.write method...");
    console.log(chunk.toString());
    callback();
    console.log("Exiting Duplex.write method...");
  }
}

var arr = ["alpha","beta"];
myDuplex = new MyArrayDuplex(arr);
myDuplex.pipe(myDuplex);

Output:

In Duplex.read method...
Exiting Duplex.read method...
In Duplex.write method...
alpha
Exiting Duplex.write method...
In Duplex.write method...
beta
Exiting Duplex.write method...

Transform

A Transform stream is a Duplex stream which reads data from a source and can transform it before pushing it further. When we implement Transform, we need not implement read and write methods, instead we need to implement a transform method.

Transform Stream Implementation

We implement a simple use case using Transform stream. The use case is that we pipe a Readable stream with Transform stream, and Transform stream with a standard out stream. The Readable stream reads a string and pushes it via pipe method, the Transform stream reads this string and converts it to uppercase and pushes it further, which is printed to screen by the standard out stream.

const stream = require("stream");

helloWorld = new stream.Readable({
  read(size) {
    this.push("Hello World");
    this.push(null);
  }
});

class MyCaseTransformer extends stream.Transform {
  _transform(data, encoding, callback) {
    console.log("In transform method...");
    this.push(data.toString().toUpperCase());
    callback();
    console.log("\nExiting transform method...");
  }
}

caseTransformer = new MyCaseTransformer();
helloWorld.pipe(caseTransformer).pipe(process.stdout);

Output:

In transform method...
HELLO WORLD
Exiting transform method...