Skip to main content

Stream Processing Basics in JavaScript

Created: May 8, 2026 Larry Qu 6 min read

Streams are powerful for processing large amounts of data efficiently. This article covers understanding streams, creating custom streams, and practical applications.

Introduction

Streams enable:

  • Memory-efficient data processing
  • Handling large files
  • Real-time data processing
  • Backpressure handling
  • Composable data pipelines

Understanding streams helps you:

  • Process large datasets
  • Build efficient applications
  • Handle real-time data
  • Manage memory effectively

Stream Fundamentals

What are Streams?

// Streams process data in chunks rather than all at once
// Benefits:
// - Memory efficient (don't load entire file)
// - Real-time processing
// - Backpressure handling
// - Composable pipelines

// Example: Reading a large file
const fs = require('fs');

// ❌ Bad: Load entire file into memory
const data = fs.readFileSync('large-file.txt', 'utf8');

// ✅ Good: Stream the file
const stream = fs.createReadStream('large-file.txt', 'utf8');
stream.on('data', chunk => {
  console.log('Chunk:', chunk);
});

Stream Events

// Streams emit events for different states
const fs = require('fs');
const stream = fs.createReadStream('file.txt');

// 'data' event: chunk of data available
stream.on('data', chunk => {
  console.log('Data:', chunk);
});

// 'end' event: no more data
stream.on('end', () => {
  console.log('Stream ended');
});

// 'error' event: error occurred
stream.on('error', error => {
  console.error('Error:', error);
});

// 'pause' event: stream paused
stream.on('pause', () => {
  console.log('Stream paused');
});

// 'resume' event: stream resumed
stream.on('resume', () => {
  console.log('Stream resumed');
});

Readable Streams

Creating Readable Streams

// ✅ Good: Create custom readable stream
const { Readable } = require('stream');

class CounterStream extends Readable {
  constructor(max) {
    super();
    this.current = 0;
    this.max = max;
  }

  _read() {
    if (this.current < this.max) {
      this.push(`${this.current}\n`);
      this.current++;
    } else {
      this.push(null); // Signal end of stream
    }
  }
}

// Usage
const counter = new CounterStream(5);
counter.on('data', chunk => {
  console.log('Count:', chunk.toString());
});

Practical Readable Stream

// ✅ Good: Stream data from API
const { Readable } = require('stream');

class APIStream extends Readable {
  constructor(url, pageSize = 10) {
    super({ objectMode: true });
    this.url = url;
    this.pageSize = pageSize;
    this.page = 1;
    this.exhausted = false;
  }

  async _read() {
    if (this.exhausted) return;

    try {
      const response = await fetch(
        `${this.url}?page=${this.page}&limit=${this.pageSize}`
      );
      const data = await response.json();

      if (data.items.length === 0) {
        this.exhausted = true;
        this.push(null);
        return;
      }

      this.push(data.items);
      this.page++;
    } catch (error) {
      this.destroy(error);
    }
  }
}

// Usage
const apiStream = new APIStream('https://api.example.com/items');
apiStream.on('data', items => {
  console.log('Items:', items);
});

Writable Streams

Creating Writable Streams

// ✅ Good: Create custom writable stream
const { Writable } = require('stream');

class LogStream extends Writable {
  _write(chunk, encoding, callback) {
    console.log('Log:', chunk.toString());
    callback(); // Signal completion
  }
}

// Usage
const logStream = new LogStream();
logStream.write('Hello');
logStream.write('World');
logStream.end();

Practical Writable Stream

// ✅ Good: Write to database
const { Writable } = require('stream');

class DatabaseStream extends Writable {
  constructor(db) {
    super({ objectMode: true });
    this.db = db;
    this.buffer = [];
  }

  _write(record, encoding, callback) {
    this.buffer.push(record);

    // Batch insert every 100 records
    if (this.buffer.length >= 100) {
      this.db.insertMany(this.buffer)
        .then(() => {
          this.buffer = [];
          callback();
        })
        .catch(callback);
    } else {
      callback();
    }
  }

  _final(callback) {
    // Insert remaining records
    if (this.buffer.length > 0) {
      this.db.insertMany(this.buffer)
        .then(() => callback())
        .catch(callback);
    } else {
      callback();
    }
  }
}

// Usage
const dbStream = new DatabaseStream(database);
readableStream.pipe(dbStream);

Transform Streams

Creating Transform Streams

// ✅ Good: Create custom transform stream
const { Transform } = require('stream');

class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const uppercase = chunk.toString().toUpperCase();
    this.push(uppercase);
    callback();
  }
}

// Usage
const transform = new UppercaseTransform();
transform.on('data', chunk => {
  console.log('Transformed:', chunk.toString());
});

transform.write('hello');
transform.write('world');
transform.end();

Practical Transform Streams

// ✅ Good: Parse JSON lines
const { Transform } = require('stream');

class JSONLinesTransform extends Transform {
  constructor() {
    super({ objectMode: true });
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep incomplete line

    for (const line of lines) {
      if (line.trim()) {
        try {
          const obj = JSON.parse(line);
          this.push(obj);
        } catch (error) {
          this.emit('error', error);
        }
      }
    }

    callback();
  }

  _flush(callback) {
    if (this.buffer.trim()) {
      try {
        const obj = JSON.parse(this.buffer);
        this.push(obj);
      } catch (error) {
        this.emit('error', error);
      }
    }
    callback();
  }
}

// Usage
const jsonStream = new JSONLinesTransform();
readableStream.pipe(jsonStream);
jsonStream.on('data', obj => {
  console.log('Parsed:', obj);
});
// ✅ Good: CSV to JSON transform
const { Transform } = require('stream');

class CSVToJSONTransform extends Transform {
  constructor(headers) {
    super({ objectMode: true });
    this.headers = headers;
    this.lineNumber = 0;
  }

  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');

    for (const line of lines) {
      if (line.trim()) {
        if (this.lineNumber === 0) {
          this.headers = line.split(',');
        } else {
          const values = line.split(',');
          const obj = {};
          this.headers.forEach((header, i) => {
            obj[header.trim()] = values[i]?.trim();
          });
          this.push(obj);
        }
        this.lineNumber++;
      }
    }

    callback();
  }
}

// Usage
const csvStream = new CSVToJSONTransform();
fs.createReadStream('data.csv').pipe(csvStream);
csvStream.on('data', obj => {
  console.log('Row:', obj);
});

Piping Streams

Basic Piping

// ✅ Good: Pipe streams together
const fs = require('fs');

// Read file → Write to stdout
fs.createReadStream('input.txt').pipe(process.stdout);

// Read file → Transform → Write file
fs.createReadStream('input.txt')
  .pipe(new UppercaseTransform())
  .pipe(fs.createWriteStream('output.txt'));

Error Handling in Pipes

// ✅ Good: Handle errors in pipe chain
const fs = require('fs');

fs.createReadStream('input.txt')
  .on('error', error => {
    console.error('Read error:', error);
  })
  .pipe(new UppercaseTransform())
  .on('error', error => {
    console.error('Transform error:', error);
  })
  .pipe(fs.createWriteStream('output.txt'))
  .on('error', error => {
    console.error('Write error:', error);
  })
  .on('finish', () => {
    console.log('Done');
  });

Backpressure Handling

// ✅ Good: Handle backpressure
const fs = require('fs');

const readable = fs.createReadStream('large-file.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', chunk => {
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    console.log('Backpressure: pausing read');
    readable.pause();
  }
});

writable.on('drain', () => {
  console.log('Drain: resuming read');
  readable.resume();
});

// Or use pipe (handles backpressure automatically)
readable.pipe(writable);

Practical Stream Patterns

Processing Large Files

// ✅ Good: Process large file line by line
const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
  input: fs.createReadStream('large-file.txt'),
  crlfDelay: Infinity
});

rl.on('line', line => {
  console.log('Line:', line);
  // Process line
});

rl.on('close', () => {
  console.log('Done');
});

Streaming API Response

// ✅ Good: Stream API response
async function streamAPI(url) {
  const response = await fetch(url);
  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value, { stream: true });
    console.log('Chunk:', chunk);
  }
}

// Usage
streamAPI('https://api.example.com/data');

Combining Multiple Streams

// ✅ Good: Combine multiple streams
const { PassThrough } = require('stream');

const combined = new PassThrough();

stream1.pipe(combined);
stream2.pipe(combined);
stream3.pipe(combined);

combined.on('data', chunk => {
  console.log('Combined:', chunk);
});

Best Practices

  1. Use streams for large data:
    // ✅ Good
    fs.createReadStream('large-file.txt').pipe(process.stdout);
    ```javascript
    
  2. Handle errors properly:
    // ✅ Good
    stream.on('error', error => {
      console.error('Error:', error);
    });
    ```javascript
    
  3. Use pipe for composition:
    // ✅ Good
    readable.pipe(transform).pipe(writable);
    ```javascript
    
  4. Handle backpressure:
    // ✅ Good - pipe handles automatically
    readable.pipe(writable);
    ```javascript
    

Common Mistakes

  1. Not handling errors:
    // ❌ Bad
    stream.pipe(destination);
    
    // ✅ Good
    stream.on('error', handleError).pipe(destination);
    ```javascript
    
  2. Ignoring backpressure:
    // ❌ Bad
    readable.on('data', chunk => {
      writable.write(chunk);
    });
    
    // ✅ Good
    readable.pipe(writable);
    ```javascript
    
  3. Loading entire file:
    // ❌ Bad - memory intensive
    const data = fs.readFileSync('large-file.txt');
    
    // ✅ Good - memory efficient
    fs.createReadStream('large-file.txt');
    

Summary

Streams are essential for efficient data processing. Key takeaways:

  • Readable streams for data sources
  • Writable streams for data destinations
  • Transform streams for data transformation
  • Pipe for composing streams
  • Handle backpressure automatically with pipe
  • Error handling at each stage
  • Memory-efficient processing

Next Steps

Resources

Comments

Share this article

Scan to read on mobile