Skip to main content
โšก Calmops

Rust for Data Engineering: Data Pipelines, ETL, and Apache Arrow

Rust for Data Engineering: Data Pipelines, ETL, and Apache Arrow

TL;DR: This guide explores using Rust for data engineering tasks. You’ll learn Apache Arrow integration, DuckDB for analytics, building data pipelines, and ETL patterns with Rust.


Introduction

Rust is becoming popular for data engineering due to:

  • Performance - Near C performance for data processing
  • Memory Safety - No garbage collection pauses
  • Parallelism - Easy parallel data processing
  • Ecosystem - Apache Arrow, DuckDB, Polars bindings

Apache Arrow Integration

Installation

[dependencies]
arrow = "50.0"
arrow-array = "50.0"
arrow-schema = "50.0"
arrow-flight = "50.0"

Working with Arrow Arrays

use arrow_array::{Int32Array, RecordBatch, ArrayRef};
use arrow_schema::{Schema, Field, DataType};
use std::sync::Arc;

pub fn create_sample_batch() -> RecordBatch {
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
        Field::new("value", DataType::Float64, true),
    ]));
    
    let id_array = Int32Array::from(vec![1, 2, 3, 4, 5]);
    let name_array = arrow_array::StringArray::from(vec![
        "alice", "bob", "charlie", "david", "eve"
    ]);
    let value_array = arrow_array::Float64Array::from(vec![
        Some(1.5), Some(2.5), None, Some(4.5), Some(5.5)
    ]);
    
    RecordBatch::try_new(
        schema,
        vec![
            Arc::new(id_array) as ArrayRef,
            Arc::new(name_array) as ArrayRef,
            Arc::new(value_array) as ArrayRef,
        ],
    ).unwrap()
}

DuckDB with Rust

Querying Data

[dependencies]
duckdb = { version = "0.10", features = ["bundled"] }
use duckdb::{Connection, Result};

pub fn query_with_duckdb() -> Result<()> {
    let conn = Connection::open_in_memory()?;
    
    // Create table
    conn.execute(
        "CREATE TABLE users (id INTEGER, name TEXT, value REAL)",
        [],
    )?;
    
    // Insert data
    conn.execute(
        "INSERT INTO users VALUES (1, 'alice', 1.5), (2, 'bob', 2.5), (3, 'charlie', 3.5)",
        [],
    )?;
    
    // Query
    let mut stmt = conn.prepare("SELECT * FROM users WHERE value > ?")?;
    let mut rows = stmt.query([2.0])?;
    
    while let Some(row) = rows.next()? {
        let id: i32 = row.get(0)?;
        let name: &str = row.get(1)?;
        let value: f64 = row.get(2)?;
        
        println!("{}: {} - {}", id, name, value);
    }
    
    Ok(())
}

Data Pipeline Construction

Pipeline Architecture

use std::sync::Arc;
use tokio::sync::mpsc;

pub struct DataPipeline {
    stages: Vec<Box<dyn PipelineStage>>,
}

pub trait PipelineStage: Send + Sync {
    fn name(&self) -> &str;
    fn process(&self, batch: RecordBatch) -> impl Future<Output = Result<RecordBatch, PipelineError>> + Send;
}

pub struct ETLStage {
    transform: Box<dyn Fn(RecordBatch) -> Result<RecordBatch, PipelineError> + Send + Sync>,
}

impl ETLStage {
    pub fn new<F>(transform: F) -> Self 
    where 
        F: Fn(RecordBatch) -> Result<RecordBatch, PipelineError> + Send + Sync + 'static 
    {
        Self { transform: Box::new(transform) }
    }
}

impl PipelineStage for ETLStage {
    fn name(&self) -> &str { "etl" }
    
    async fn process(&self, batch: RecordBatch) -> Result<RecordBatch, PipelineError> {
        (self.transform)(batch)
    }
}

Reading and Writing Files

CSV Processing

use arrow::csv::{Reader, Writer, WriterBuilder};
use std::fs::File;

pub fn read_csv(path: &str) -> Result<RecordBatch, Box<dyn std::error::Error>> {
    let file = File::open(path)?;
    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int32, false),
        Field::new("name", DataType::Utf8, false),
    ]));
    
    let reader = Reader::new(file, schema.clone(), true, None);
    let batches = reader.collect::<Result<Vec<_>, _>>()?;
    
    // Combine batches
    if let Some(batch) = batches.first() {
        // Return first batch (or combine all)
        Ok(batch.clone())
    } else {
        Err("No data".into())
    }
}

pub fn write_csv(path: &str, batch: &RecordBatch) -> std::io::Result<()> {
    let file = File::create(path)?;
    let mut writer = WriterBuilder::new().build(file)?;
    
    writer.write(batch)?;
    writer.finish()?;
    
    Ok(())
}

Polars Integration

use polars::prelude::*;

pub fn data_operations() -> Result<DataFrame, PolarsError> {
    let df = df!(
        "id" => [1, 2, 3, 4, 5],
        "name" => ["a", "b", "c", "d", "e"],
        "value" => [1.5, 2.5, 3.5, 4.5, 5.5]
    )?;
    
    // Filter
    let filtered = df.filter(&col("value").gt(lit(2.5)))?;
    
    // Group by
    let grouped = filtered.group_by(["id"]).agg([
        col("value").sum(),
    ])?;
    
    Ok(grouped)
}

Conclusion

Rust offers excellent tools for data engineering:

  1. Apache Arrow - Columnar data format
  2. DuckDB - Embedded analytics DB
  3. Polars bindings - DataFrame operations
  4. Pipeline patterns - ETL processing

These enable high-performance data pipelines.


Comments