Overview
In Node.js, we use Streams for an efficient way of network communication, reading and writing of file, standard input/output etc.,
A stream receives or sends data in the form of chunks, so it provides a better user experience. Imagine watching a movie on web, if browser waits for entire movie to download, user gets bored waiting for it to load. Instead, browser should be able to play or consume the movie in chunks for better user experience.
Streaming data in chunks will also save resources, imaging that we are uploading a video to the server. If application accepts entire video file at once, it may need significant memory. If many upload requests come at once, your application may go out of memory too. However, if we receive data in chunks, and if we process it in chunks, it doesn’t need much memory. Node.js streams can read and write data in chunks.
Types of Streams
There are 4 types of Streams, Readable stream, Writable stream, Duplex stream and Transform stream.
- Readable stream – provides required events and functions to read data from the stream.
- Writable stream – provides events and functions to write data to a stream.
- Duplex stream – object is both readable as well as writable.
- Transform stream – is a Duplex stream which we can use to modify or transform the data as we write or read from a stream.
Working with Streams
There are 2 ways in which you work with streams.
- Using the built-in streams from modules, fs, http, zlib etc.,
- Creating your own stream from Readable and Writable classes
There is a built-in stream class for almost all the operations. In a very special scenario, you might end up creating your own Stream classes. In this section we will build our own streams, in further sections we will use various built-in streams.
Writable stream
A Writable stream represents the destination to which we write the data.
Methods
Writable.write(chunk, encoding, callback)
– writes a chunk to streamWritable.end(chunk, encoding, callback)
– ends the streamWritable.cork()
– subsequent data will be buffered in memoryWritable.uncork()
– flush the data from memory that was buffered after calling cork() method
Events
finish
– triggers when we call end method on the streamerror
– triggers when an error occurs while writing data to streampipe
– triggers when we pass this stream intopipe
method ofReadable
streamunpipe
– triggers whenunpipe()
method called onReadable
stream passing this streamdrain
– triggers when you can write again to the stream once it reacheshighWaterMark
Attributes
- writable – true if one can write data to stream, false if we can’t write
- writableEnded – true if end method is already called
- writableFinished – true if end method is called and all data is flushed
- writableHighWaterMark – tells us how much data we can write in a chunk
Writable Stream Implementation Example
In this example, we are considering the dataArray as our destination.
const { Writable } = require("stream");
const writableStream = new Writable({
write(chunk,encoding,callback) {
this.dataArray[this.index] = chunk.toString();
this.index++;
console.log("Data written to stream.");
callback();
}
});
writableStream.dataArray = [];
writableStream.index = 0;
writableStream.on('finish', () => {
console.log('Finish event called.');
console.log("Is stream writable? " + writableStream.writable);
});
writableStream.on('close', () => {
console.log('Close event called.');
console.log("Is stream writable? " + writableStream.writable);
});
writableStream.on('error', () => {
console.log('Error occurred.');
});
console.log("Is stream writable? " + writableStream.writable);
writableStream.write("alpha");
writableStream.end("beta");
writableStream.write("gamma");
console.log(writableStream.dataArray);
writableStream.destroy();
Output:
Is stream writable? true
Data written to stream.
Data written to stream.
Error occurred.
[ 'alpha', 'beta' ]
Finish event called.
Is stream writable? false
Readable streams
A Readable stream help to pass data to writable/duplex/transform streams. So, basically it represents the stream which points to the source of data and read data in chunks.
An example of Readable stream is fs.ReadStream.
Methods
- pause() – switch the flowing mode to pause mode and stops emitting data event
- resume() – switch the pause mode to flowing mode and starts emitting data event
- pipe(destination) – writes data into destination whenever data is available to read
- unpipe(destination) – data will no longer will be fed into destination stream automatically
- read() – returns chunk of data from internal buffer; returns null in case no data is available in buffer
- push(chunk) – adds chunk of data to internal queue which can be read by read() method or passed to the listener method; pushing null will end the stream.
Events
- data – A readable stream will emit data event whenever there is data available in a stream to consume.
- end – A readable stream will emit “end” event when there is no more data available in a stream to consume.
- error – A readable stream will emit “error” event when there is an error while reading data from the stream.
- close – A readable stream will emit “close” event when the data source object is closed.
Attributes
- readable – true if Readable stream is not yet ended
- readableFlowing – null if no listener is added; true if in flowing mode; false if pause() or unpipe() methods are called on Readable stream
- readableHighWaterMark – value of highWaterMark passed while creating the Readable stream
Readable Stream Reading Modes
Readable streams have 2 reading modes:
- Pause Mode – When we do not add listeners to a readable stream, it will be in Pause Mode. In this mode, we have to call read() method explicitly to read data
- Flowing Mode – When we add listener or pipe it to destination, it will be in Flowing Mode. In this mode, data will be automatically read and fed into subscribed listeners or a data is piped into destination using pipe method on Readable stream
By default, a readable stream will be in pause mode.
Pause Mode to Flowing Mode
Readable stream switches from pause mode to flowing mode in below cases:
- When you subscribe for a “data” event handler on a readable stream
- When you call pipe() method on Readable stream to pipe the output to a Writable stream
- When you call resume() method on Readable stream
Flowing Mode to Pause Mode
Readable stream switches from flowing mode to pause mode in below cases:
- When you call pause() method on Readable stream
- When you call unpipe() method on Readable stream
Three Flowing states of Readable stream
- When there are no listeners subscribed on Readable stream, we say that flowing state is null. Attribute
readableFlowing
value will be null. - When we add a listener, stream state changes to flowing state. Attribute
readableFlowing
value changes to true. - When we call
pause()
method orunpipe()
method, stream state changes to non-flowing state. AttributereadableFlowing
value changes to false.
Consuming Data from Readable Stream
We can consume data from the stream either of below 2 ways, but not both of them together.
- By subscribing to events
- By using pipe function
For example, you want to read a file and write the contents to another file, you can straight away use pipe function to achieve what you are looking for. Pipe function will automatically pipe all the chunks of the data from readable stream to writable stream.
However, if you want to perform this in a more customized way, you can use events based approach.
Streams inherit from EventEmitter
, so we can use the published events to perform actions on the data while reading or writing from the stream.
Readable Stream Implementation Pause Mode Example
const stream = require("stream");
const readable = new stream.Readable({
read(size) {
this.push(this.dataArray[this.index]);
this.index++
}
});
readable.dataArray = ["alpha","beta"];
readable.index = 0;
console.log(readable.read().toString());
console.log(readable.read().toString());
Output:
alpha
beta
Readable Stream Implementation Flowing Mode with Listener
const stream = require("stream");
const readable = new stream.Readable({
read(size) {
this.push(this.dataArray[this.index]);
this.index++
}
});
readable.dataArray = ["alpha","beta"];
readable.index = 0;
readable.on("data", function(data) {
console.log(data.toString());
});
Readable Stream Implementation Flowing Mode using Pipe method
const stream = require("stream");
const readable = new stream.Readable({
read(size) {
this.push(this.dataArray[this.index]);
this.index++
}
});
readable.dataArray = ["alpha","beta"];
readable.index = 0;
readable.pipe(process.stdout);
Duplex Stream
A Duplex is an object from which one can read as well as write into it. It implements both Readable and Writable. So, it acquires the methods and events from both Readable and Writable.
Duplex Stream Implementation
In this below program, we’ll implement Duplex stream. The objective of below program is that, we pass an array to Duplex stream’s constructor, then Duplex need to iterate over the list and push each element one by one. Also, if we pass Duplex stream as destination to pipe method, it will call the write method and print the values that were pushed from read method.
Since it is both Readable and Writable, it needs to implement both read and write methods. We are also passing our Duplex instance to the pipe method of itself. As MyArrayDuplex is both Readable and Writable, we are passing itself to the pipe method. This pipe method will first invoke the read method of Duplex. Within the read method, we iterate over the data and call the push method to whoever is subscribed or piped.
const stream = require("stream");
class MyArrayDuplex extends stream.Duplex {
constructor(arr) {
super();
this.dataArr = arr;
}
_read(size) {
console.log("In Duplex.read method...");
this.dataArr.forEach(element => {
this.push(element);
});
this.push(null);
console.log("Exiting Duplex.read method...");
}
_write(chunk, encoding, callback) {
console.log("In Duplex.write method...");
console.log(chunk.toString());
callback();
console.log("Exiting Duplex.write method...");
}
}
var arr = ["alpha","beta"];
myDuplex = new MyArrayDuplex(arr);
myDuplex.pipe(myDuplex);
Output:
In Duplex.read method...
Exiting Duplex.read method...
In Duplex.write method...
alpha
Exiting Duplex.write method...
In Duplex.write method...
beta
Exiting Duplex.write method...
Transform
A Transform stream is a Duplex stream which reads data from a source and can transform it before pushing it further. When we implement Transform, we need not implement read and write methods, instead we need to implement a transform method.
Transform Stream Implementation
We implement a simple use case using Transform stream. The use case is that we pipe a Readable stream with Transform stream, and Transform stream with a standard out stream. The Readable stream reads a string and pushes it via pipe method, the Transform stream reads this string and converts it to uppercase and pushes it further, which is printed to screen by the standard out stream.
const stream = require("stream");
helloWorld = new stream.Readable({
read(size) {
this.push("Hello World");
this.push(null);
}
});
class MyCaseTransformer extends stream.Transform {
_transform(data, encoding, callback) {
console.log("In transform method...");
this.push(data.toString().toUpperCase());
callback();
console.log("\nExiting transform method...");
}
}
caseTransformer = new MyCaseTransformer();
helloWorld.pipe(caseTransformer).pipe(process.stdout);
Output:
In transform method...
HELLO WORLD
Exiting transform method...