Skip to main content
โšก Calmops

Stream Processing Basics in JavaScript

Stream Processing Basics in JavaScript

Streams are powerful for processing large amounts of data efficiently. This article covers understanding streams, creating custom streams, and practical applications.

Introduction

Streams enable:

  • Memory-efficient data processing
  • Handling large files
  • Real-time data processing
  • Backpressure handling
  • Composable data pipelines

Understanding streams helps you:

  • Process large datasets
  • Build efficient applications
  • Handle real-time data
  • Manage memory effectively

Stream Fundamentals

What are Streams?

// Streams process data in chunks rather than all at once
// Benefits:
// - Memory efficient (don't load entire file)
// - Real-time processing
// - Backpressure handling
// - Composable pipelines

// Example: Reading a large file
const fs = require('fs');

// โŒ Bad: Load entire file into memory
const data = fs.readFileSync('large-file.txt', 'utf8');

// โœ… Good: Stream the file
const stream = fs.createReadStream('large-file.txt', 'utf8');
stream.on('data', chunk => {
  console.log('Chunk:', chunk);
});

Stream Events

// Streams emit events for different states
const fs = require('fs');
const stream = fs.createReadStream('file.txt');

// 'data' event: chunk of data available
stream.on('data', chunk => {
  console.log('Data:', chunk);
});

// 'end' event: no more data
stream.on('end', () => {
  console.log('Stream ended');
});

// 'error' event: error occurred
stream.on('error', error => {
  console.error('Error:', error);
});

// 'pause' event: stream paused
stream.on('pause', () => {
  console.log('Stream paused');
});

// 'resume' event: stream resumed
stream.on('resume', () => {
  console.log('Stream resumed');
});

Readable Streams

Creating Readable Streams

// โœ… Good: Create custom readable stream
const { Readable } = require('stream');

class CounterStream extends Readable {
  constructor(max) {
    super();
    this.current = 0;
    this.max = max;
  }

  _read() {
    if (this.current < this.max) {
      this.push(`${this.current}\n`);
      this.current++;
    } else {
      this.push(null); // Signal end of stream
    }
  }
}

// Usage
const counter = new CounterStream(5);
counter.on('data', chunk => {
  console.log('Count:', chunk.toString());
});

Practical Readable Stream

// โœ… Good: Stream data from API
const { Readable } = require('stream');

class APIStream extends Readable {
  constructor(url, pageSize = 10) {
    super({ objectMode: true });
    this.url = url;
    this.pageSize = pageSize;
    this.page = 1;
    this.exhausted = false;
  }

  async _read() {
    if (this.exhausted) return;

    try {
      const response = await fetch(
        `${this.url}?page=${this.page}&limit=${this.pageSize}`
      );
      const data = await response.json();

      if (data.items.length === 0) {
        this.exhausted = true;
        this.push(null);
        return;
      }

      this.push(data.items);
      this.page++;
    } catch (error) {
      this.destroy(error);
    }
  }
}

// Usage
const apiStream = new APIStream('https://api.example.com/items');
apiStream.on('data', items => {
  console.log('Items:', items);
});

Writable Streams

Creating Writable Streams

// โœ… Good: Create custom writable stream
const { Writable } = require('stream');

class LogStream extends Writable {
  _write(chunk, encoding, callback) {
    console.log('Log:', chunk.toString());
    callback(); // Signal completion
  }
}

// Usage
const logStream = new LogStream();
logStream.write('Hello');
logStream.write('World');
logStream.end();

Practical Writable Stream

// โœ… Good: Write to database
const { Writable } = require('stream');

class DatabaseStream extends Writable {
  constructor(db) {
    super({ objectMode: true });
    this.db = db;
    this.buffer = [];
  }

  _write(record, encoding, callback) {
    this.buffer.push(record);

    // Batch insert every 100 records
    if (this.buffer.length >= 100) {
      this.db.insertMany(this.buffer)
        .then(() => {
          this.buffer = [];
          callback();
        })
        .catch(callback);
    } else {
      callback();
    }
  }

  _final(callback) {
    // Insert remaining records
    if (this.buffer.length > 0) {
      this.db.insertMany(this.buffer)
        .then(() => callback())
        .catch(callback);
    } else {
      callback();
    }
  }
}

// Usage
const dbStream = new DatabaseStream(database);
readableStream.pipe(dbStream);

Transform Streams

Creating Transform Streams

// โœ… Good: Create custom transform stream
const { Transform } = require('stream');

class UppercaseTransform extends Transform {
  _transform(chunk, encoding, callback) {
    const uppercase = chunk.toString().toUpperCase();
    this.push(uppercase);
    callback();
  }
}

// Usage
const transform = new UppercaseTransform();
transform.on('data', chunk => {
  console.log('Transformed:', chunk.toString());
});

transform.write('hello');
transform.write('world');
transform.end();

Practical Transform Streams

// โœ… Good: Parse JSON lines
const { Transform } = require('stream');

class JSONLinesTransform extends Transform {
  constructor() {
    super({ objectMode: true });
    this.buffer = '';
  }

  _transform(chunk, encoding, callback) {
    this.buffer += chunk.toString();
    const lines = this.buffer.split('\n');
    this.buffer = lines.pop(); // Keep incomplete line

    for (const line of lines) {
      if (line.trim()) {
        try {
          const obj = JSON.parse(line);
          this.push(obj);
        } catch (error) {
          this.emit('error', error);
        }
      }
    }

    callback();
  }

  _flush(callback) {
    if (this.buffer.trim()) {
      try {
        const obj = JSON.parse(this.buffer);
        this.push(obj);
      } catch (error) {
        this.emit('error', error);
      }
    }
    callback();
  }
}

// Usage
const jsonStream = new JSONLinesTransform();
readableStream.pipe(jsonStream);
jsonStream.on('data', obj => {
  console.log('Parsed:', obj);
});
// โœ… Good: CSV to JSON transform
const { Transform } = require('stream');

class CSVToJSONTransform extends Transform {
  constructor(headers) {
    super({ objectMode: true });
    this.headers = headers;
    this.lineNumber = 0;
  }

  _transform(chunk, encoding, callback) {
    const lines = chunk.toString().split('\n');

    for (const line of lines) {
      if (line.trim()) {
        if (this.lineNumber === 0) {
          this.headers = line.split(',');
        } else {
          const values = line.split(',');
          const obj = {};
          this.headers.forEach((header, i) => {
            obj[header.trim()] = values[i]?.trim();
          });
          this.push(obj);
        }
        this.lineNumber++;
      }
    }

    callback();
  }
}

// Usage
const csvStream = new CSVToJSONTransform();
fs.createReadStream('data.csv').pipe(csvStream);
csvStream.on('data', obj => {
  console.log('Row:', obj);
});

Piping Streams

Basic Piping

// โœ… Good: Pipe streams together
const fs = require('fs');

// Read file โ†’ Write to stdout
fs.createReadStream('input.txt').pipe(process.stdout);

// Read file โ†’ Transform โ†’ Write file
fs.createReadStream('input.txt')
  .pipe(new UppercaseTransform())
  .pipe(fs.createWriteStream('output.txt'));

Error Handling in Pipes

// โœ… Good: Handle errors in pipe chain
const fs = require('fs');

fs.createReadStream('input.txt')
  .on('error', error => {
    console.error('Read error:', error);
  })
  .pipe(new UppercaseTransform())
  .on('error', error => {
    console.error('Transform error:', error);
  })
  .pipe(fs.createWriteStream('output.txt'))
  .on('error', error => {
    console.error('Write error:', error);
  })
  .on('finish', () => {
    console.log('Done');
  });

Backpressure Handling

// โœ… Good: Handle backpressure
const fs = require('fs');

const readable = fs.createReadStream('large-file.txt');
const writable = fs.createWriteStream('output.txt');

readable.on('data', chunk => {
  const canContinue = writable.write(chunk);

  if (!canContinue) {
    console.log('Backpressure: pausing read');
    readable.pause();
  }
});

writable.on('drain', () => {
  console.log('Drain: resuming read');
  readable.resume();
});

// Or use pipe (handles backpressure automatically)
readable.pipe(writable);

Practical Stream Patterns

Processing Large Files

// โœ… Good: Process large file line by line
const fs = require('fs');
const readline = require('readline');

const rl = readline.createInterface({
  input: fs.createReadStream('large-file.txt'),
  crlfDelay: Infinity
});

rl.on('line', line => {
  console.log('Line:', line);
  // Process line
});

rl.on('close', () => {
  console.log('Done');
});

Streaming API Response

// โœ… Good: Stream API response
async function streamAPI(url) {
  const response = await fetch(url);
  const reader = response.body.getReader();
  const decoder = new TextDecoder();

  while (true) {
    const { done, value } = await reader.read();
    if (done) break;

    const chunk = decoder.decode(value, { stream: true });
    console.log('Chunk:', chunk);
  }
}

// Usage
streamAPI('https://api.example.com/data');

Combining Multiple Streams

// โœ… Good: Combine multiple streams
const { PassThrough } = require('stream');

const combined = new PassThrough();

stream1.pipe(combined);
stream2.pipe(combined);
stream3.pipe(combined);

combined.on('data', chunk => {
  console.log('Combined:', chunk);
});

Best Practices

  1. Use streams for large data:

    // โœ… Good
    fs.createReadStream('large-file.txt').pipe(process.stdout);
    
  2. Handle errors properly:

    // โœ… Good
    stream.on('error', error => {
      console.error('Error:', error);
    });
    
  3. Use pipe for composition:

    // โœ… Good
    readable.pipe(transform).pipe(writable);
    
  4. Handle backpressure:

    // โœ… Good - pipe handles automatically
    readable.pipe(writable);
    

Common Mistakes

  1. Not handling errors:

    // โŒ Bad
    stream.pipe(destination);
    
    // โœ… Good
    stream.on('error', handleError).pipe(destination);
    
  2. Ignoring backpressure:

    // โŒ Bad
    readable.on('data', chunk => {
      writable.write(chunk);
    });
    
    // โœ… Good
    readable.pipe(writable);
    
  3. Loading entire file:

    // โŒ Bad - memory intensive
    const data = fs.readFileSync('large-file.txt');
    
    // โœ… Good - memory efficient
    fs.createReadStream('large-file.txt');
    

Summary

Streams are essential for efficient data processing. Key takeaways:

  • Readable streams for data sources
  • Writable streams for data destinations
  • Transform streams for data transformation
  • Pipe for composing streams
  • Handle backpressure automatically with pipe
  • Error handling at each stage
  • Memory-efficient processing

Next Steps

Comments