Stream Processing Basics in JavaScript
Streams are powerful for processing large amounts of data efficiently. This article covers understanding streams, creating custom streams, and practical applications.
Introduction
Streams enable:
- Memory-efficient data processing
- Handling large files
- Real-time data processing
- Backpressure handling
- Composable data pipelines
Understanding streams helps you:
- Process large datasets
- Build efficient applications
- Handle real-time data
- Manage memory effectively
Stream Fundamentals
What are Streams?
// Streams process data in chunks rather than all at once
// Benefits:
// - Memory efficient (don't load entire file)
// - Real-time processing
// - Backpressure handling
// - Composable pipelines
// Example: Reading a large file
const fs = require('fs');
// โ Bad: Load entire file into memory
const data = fs.readFileSync('large-file.txt', 'utf8');
// โ
Good: Stream the file
const stream = fs.createReadStream('large-file.txt', 'utf8');
stream.on('data', chunk => {
console.log('Chunk:', chunk);
});
Stream Events
// Streams emit events for different states
const fs = require('fs');
const stream = fs.createReadStream('file.txt');
// 'data' event: chunk of data available
stream.on('data', chunk => {
console.log('Data:', chunk);
});
// 'end' event: no more data
stream.on('end', () => {
console.log('Stream ended');
});
// 'error' event: error occurred
stream.on('error', error => {
console.error('Error:', error);
});
// 'pause' event: stream paused
stream.on('pause', () => {
console.log('Stream paused');
});
// 'resume' event: stream resumed
stream.on('resume', () => {
console.log('Stream resumed');
});
Readable Streams
Creating Readable Streams
// โ
Good: Create custom readable stream
const { Readable } = require('stream');
class CounterStream extends Readable {
constructor(max) {
super();
this.current = 0;
this.max = max;
}
_read() {
if (this.current < this.max) {
this.push(`${this.current}\n`);
this.current++;
} else {
this.push(null); // Signal end of stream
}
}
}
// Usage
const counter = new CounterStream(5);
counter.on('data', chunk => {
console.log('Count:', chunk.toString());
});
Practical Readable Stream
// โ
Good: Stream data from API
const { Readable } = require('stream');
class APIStream extends Readable {
constructor(url, pageSize = 10) {
super({ objectMode: true });
this.url = url;
this.pageSize = pageSize;
this.page = 1;
this.exhausted = false;
}
async _read() {
if (this.exhausted) return;
try {
const response = await fetch(
`${this.url}?page=${this.page}&limit=${this.pageSize}`
);
const data = await response.json();
if (data.items.length === 0) {
this.exhausted = true;
this.push(null);
return;
}
this.push(data.items);
this.page++;
} catch (error) {
this.destroy(error);
}
}
}
// Usage
const apiStream = new APIStream('https://api.example.com/items');
apiStream.on('data', items => {
console.log('Items:', items);
});
Writable Streams
Creating Writable Streams
// โ
Good: Create custom writable stream
const { Writable } = require('stream');
class LogStream extends Writable {
_write(chunk, encoding, callback) {
console.log('Log:', chunk.toString());
callback(); // Signal completion
}
}
// Usage
const logStream = new LogStream();
logStream.write('Hello');
logStream.write('World');
logStream.end();
Practical Writable Stream
// โ
Good: Write to database
const { Writable } = require('stream');
class DatabaseStream extends Writable {
constructor(db) {
super({ objectMode: true });
this.db = db;
this.buffer = [];
}
_write(record, encoding, callback) {
this.buffer.push(record);
// Batch insert every 100 records
if (this.buffer.length >= 100) {
this.db.insertMany(this.buffer)
.then(() => {
this.buffer = [];
callback();
})
.catch(callback);
} else {
callback();
}
}
_final(callback) {
// Insert remaining records
if (this.buffer.length > 0) {
this.db.insertMany(this.buffer)
.then(() => callback())
.catch(callback);
} else {
callback();
}
}
}
// Usage
const dbStream = new DatabaseStream(database);
readableStream.pipe(dbStream);
Transform Streams
Creating Transform Streams
// โ
Good: Create custom transform stream
const { Transform } = require('stream');
class UppercaseTransform extends Transform {
_transform(chunk, encoding, callback) {
const uppercase = chunk.toString().toUpperCase();
this.push(uppercase);
callback();
}
}
// Usage
const transform = new UppercaseTransform();
transform.on('data', chunk => {
console.log('Transformed:', chunk.toString());
});
transform.write('hello');
transform.write('world');
transform.end();
Practical Transform Streams
// โ
Good: Parse JSON lines
const { Transform } = require('stream');
class JSONLinesTransform extends Transform {
constructor() {
super({ objectMode: true });
this.buffer = '';
}
_transform(chunk, encoding, callback) {
this.buffer += chunk.toString();
const lines = this.buffer.split('\n');
this.buffer = lines.pop(); // Keep incomplete line
for (const line of lines) {
if (line.trim()) {
try {
const obj = JSON.parse(line);
this.push(obj);
} catch (error) {
this.emit('error', error);
}
}
}
callback();
}
_flush(callback) {
if (this.buffer.trim()) {
try {
const obj = JSON.parse(this.buffer);
this.push(obj);
} catch (error) {
this.emit('error', error);
}
}
callback();
}
}
// Usage
const jsonStream = new JSONLinesTransform();
readableStream.pipe(jsonStream);
jsonStream.on('data', obj => {
console.log('Parsed:', obj);
});
// โ
Good: CSV to JSON transform
const { Transform } = require('stream');
class CSVToJSONTransform extends Transform {
constructor(headers) {
super({ objectMode: true });
this.headers = headers;
this.lineNumber = 0;
}
_transform(chunk, encoding, callback) {
const lines = chunk.toString().split('\n');
for (const line of lines) {
if (line.trim()) {
if (this.lineNumber === 0) {
this.headers = line.split(',');
} else {
const values = line.split(',');
const obj = {};
this.headers.forEach((header, i) => {
obj[header.trim()] = values[i]?.trim();
});
this.push(obj);
}
this.lineNumber++;
}
}
callback();
}
}
// Usage
const csvStream = new CSVToJSONTransform();
fs.createReadStream('data.csv').pipe(csvStream);
csvStream.on('data', obj => {
console.log('Row:', obj);
});
Piping Streams
Basic Piping
// โ
Good: Pipe streams together
const fs = require('fs');
// Read file โ Write to stdout
fs.createReadStream('input.txt').pipe(process.stdout);
// Read file โ Transform โ Write file
fs.createReadStream('input.txt')
.pipe(new UppercaseTransform())
.pipe(fs.createWriteStream('output.txt'));
Error Handling in Pipes
// โ
Good: Handle errors in pipe chain
const fs = require('fs');
fs.createReadStream('input.txt')
.on('error', error => {
console.error('Read error:', error);
})
.pipe(new UppercaseTransform())
.on('error', error => {
console.error('Transform error:', error);
})
.pipe(fs.createWriteStream('output.txt'))
.on('error', error => {
console.error('Write error:', error);
})
.on('finish', () => {
console.log('Done');
});
Backpressure Handling
// โ
Good: Handle backpressure
const fs = require('fs');
const readable = fs.createReadStream('large-file.txt');
const writable = fs.createWriteStream('output.txt');
readable.on('data', chunk => {
const canContinue = writable.write(chunk);
if (!canContinue) {
console.log('Backpressure: pausing read');
readable.pause();
}
});
writable.on('drain', () => {
console.log('Drain: resuming read');
readable.resume();
});
// Or use pipe (handles backpressure automatically)
readable.pipe(writable);
Practical Stream Patterns
Processing Large Files
// โ
Good: Process large file line by line
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface({
input: fs.createReadStream('large-file.txt'),
crlfDelay: Infinity
});
rl.on('line', line => {
console.log('Line:', line);
// Process line
});
rl.on('close', () => {
console.log('Done');
});
Streaming API Response
// โ
Good: Stream API response
async function streamAPI(url) {
const response = await fetch(url);
const reader = response.body.getReader();
const decoder = new TextDecoder();
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value, { stream: true });
console.log('Chunk:', chunk);
}
}
// Usage
streamAPI('https://api.example.com/data');
Combining Multiple Streams
// โ
Good: Combine multiple streams
const { PassThrough } = require('stream');
const combined = new PassThrough();
stream1.pipe(combined);
stream2.pipe(combined);
stream3.pipe(combined);
combined.on('data', chunk => {
console.log('Combined:', chunk);
});
Best Practices
-
Use streams for large data:
// โ Good fs.createReadStream('large-file.txt').pipe(process.stdout); -
Handle errors properly:
// โ Good stream.on('error', error => { console.error('Error:', error); }); -
Use pipe for composition:
// โ Good readable.pipe(transform).pipe(writable); -
Handle backpressure:
// โ Good - pipe handles automatically readable.pipe(writable);
Common Mistakes
-
Not handling errors:
// โ Bad stream.pipe(destination); // โ Good stream.on('error', handleError).pipe(destination); -
Ignoring backpressure:
// โ Bad readable.on('data', chunk => { writable.write(chunk); }); // โ Good readable.pipe(writable); -
Loading entire file:
// โ Bad - memory intensive const data = fs.readFileSync('large-file.txt'); // โ Good - memory efficient fs.createReadStream('large-file.txt');
Summary
Streams are essential for efficient data processing. Key takeaways:
- Readable streams for data sources
- Writable streams for data destinations
- Transform streams for data transformation
- Pipe for composing streams
- Handle backpressure automatically with pipe
- Error handling at each stage
- Memory-efficient processing
Related Resources
- Stream API - Node.js
- Readable Streams - Node.js
- Writable Streams - Node.js
- Transform Streams - Node.js
- Streams Handbook
Next Steps
- Learn about Advanced Promise Patterns
- Explore Concurrency Patterns in JavaScript
- Study Async Generators and Iteration
- Practice with real stream applications
- Build data processing pipelines
Comments