Working with Large Datasets in Go
Introduction
Processing large datasets requires careful memory management and efficient algorithms. This guide covers techniques for handling large datasets in Go without exhausting system resources.
Proper dataset handling ensures your applications can process gigabytes or terabytes of data efficiently.
Memory-Efficient Data Processing
Streaming vs Loading
package main
import (
"bufio"
"fmt"
"os"
)
// BadApproach: Load entire file into memory
func BadLoadLargeFile(filename string) ([]string, error) {
data, err := os.ReadFile(filename)
if err != nil {
return nil, err
}
// Entire file in memory - problematic for large files
lines := strings.Split(string(data), "\n")
return lines, nil
}
// GoodApproach: Stream file line by line
func StreamLargeFile(filename string, processor func(string) error) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
for scanner.Scan() {
if err := processor(scanner.Text()); err != nil {
return err
}
}
return scanner.Err()
}
// Example usage
func StreamExample() {
lineCount := 0
StreamLargeFile("large_file.txt", func(line string) error {
lineCount++
return nil
})
fmt.Printf("Processed %d lines\n", lineCount)
}
Good: Proper Large Dataset Handling
package main
import (
"bufio"
"context"
"fmt"
"io"
"os"
"sync"
)
// ChunkReader reads data in chunks
type ChunkReader struct {
reader io.Reader
chunkSize int
}
// NewChunkReader creates a new chunk reader
func NewChunkReader(reader io.Reader, chunkSize int) *ChunkReader {
return &ChunkReader{
reader: reader,
chunkSize: chunkSize,
}
}
// ReadChunk reads a chunk of data
func (cr *ChunkReader) ReadChunk() ([]byte, error) {
chunk := make([]byte, cr.chunkSize)
n, err := cr.reader.Read(chunk)
if n > 0 {
return chunk[:n], nil
}
return nil, err
}
// ChunkedProcessor processes data in chunks
type ChunkedProcessor struct {
chunkSize int
workers int
}
// NewChunkedProcessor creates a new chunked processor
func NewChunkedProcessor(chunkSize, workers int) *ChunkedProcessor {
return &ChunkedProcessor{
chunkSize: chunkSize,
workers: workers,
}
}
// ProcessFile processes a file in chunks
func (cp *ChunkedProcessor) ProcessFile(ctx context.Context, filename string, processor func([]byte) error) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
reader := NewChunkReader(file, cp.chunkSize)
chunkChan := make(chan []byte, cp.workers)
// Producer goroutine
go func() {
defer close(chunkChan)
for {
chunk, err := reader.ReadChunk()
if err != nil && err != io.EOF {
return
}
if len(chunk) > 0 {
select {
case <-ctx.Done():
return
case chunkChan <- chunk:
}
}
if err == io.EOF {
return
}
}
}()
// Consumer goroutines
var wg sync.WaitGroup
errChan := make(chan error, cp.workers)
for i := 0; i < cp.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for chunk := range chunkChan {
if err := processor(chunk); err != nil {
errChan <- err
return
}
}
}()
}
wg.Wait()
close(errChan)
// Check for errors
for err := range errChan {
if err != nil {
return err
}
}
return nil
}
// LineProcessor processes file line by line efficiently
type LineProcessor struct {
bufferSize int
workers int
}
// NewLineProcessor creates a new line processor
func NewLineProcessor(bufferSize, workers int) *LineProcessor {
return &LineProcessor{
bufferSize: bufferSize,
workers: workers,
}
}
// ProcessLines processes lines from a file
func (lp *LineProcessor) ProcessLines(ctx context.Context, filename string, processor func(string) error) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close()
scanner := bufio.NewScanner(file)
scanner.Buffer(make([]byte, lp.bufferSize), lp.bufferSize)
lineChan := make(chan string, lp.workers)
// Producer
go func() {
defer close(lineChan)
for scanner.Scan() {
select {
case <-ctx.Done():
return
case lineChan <- scanner.Text():
}
}
}()
// Consumers
var wg sync.WaitGroup
for i := 0; i < lp.workers; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for line := range lineChan {
if err := processor(line); err != nil {
return
}
}
}()
}
wg.Wait()
return scanner.Err()
}
// Example usage
func LargeDatasetExample() {
processor := NewChunkedProcessor(1024*1024, 4) // 1MB chunks, 4 workers
ctx := context.Background()
processor.ProcessFile(ctx, "large_file.bin", func(chunk []byte) error {
fmt.Printf("Processed %d bytes\n", len(chunk))
return nil
})
}
Bad: Improper Large Dataset Handling
package main
// BAD: Load entire file into memory
func BadProcessLargeFile(filename string) {
data, _ := os.ReadFile(filename)
// Entire file in memory - will crash with large files
lines := strings.Split(string(data), "\n")
for _, line := range lines {
// Process line
}
}
// BAD: No chunking
func BadNoChunking(data []byte) {
// Process all at once
// No memory management
}
Problems:
- Loads entire file into memory
- No chunking
- No parallelization
- No error handling
Memory Optimization Techniques
Object Pooling
package main
import (
"sync"
)
// BufferPool manages reusable buffers
type BufferPool struct {
pool *sync.Pool
size int
}
// NewBufferPool creates a new buffer pool
func NewBufferPool(size int) *BufferPool {
return &BufferPool{
size: size,
pool: &sync.Pool{
New: func() interface{} {
return make([]byte, size)
},
},
}
}
// Get gets a buffer from the pool
func (bp *BufferPool) Get() []byte {
return bp.pool.Get().([]byte)
}
// Put returns a buffer to the pool
func (bp *BufferPool) Put(buf []byte) {
if cap(buf) == bp.size {
bp.pool.Put(buf)
}
}
// Example usage
func PoolingExample() {
pool := NewBufferPool(1024)
buf := pool.Get()
defer pool.Put(buf)
// Use buffer
copy(buf, "data")
}
Lazy Loading
package main
import (
"sync"
)
// LazyLoader loads data on demand
type LazyLoader struct {
loader func() (interface{}, error)
data interface{}
err error
once sync.Once
}
// NewLazyLoader creates a new lazy loader
func NewLazyLoader(loader func() (interface{}, error)) *LazyLoader {
return &LazyLoader{
loader: loader,
}
}
// Get gets the data, loading if necessary
func (ll *LazyLoader) Get() (interface{}, error) {
ll.once.Do(func() {
ll.data, ll.err = ll.loader()
})
return ll.data, ll.err
}
// Example usage
func LazyLoadingExample() {
loader := NewLazyLoader(func() (interface{}, error) {
// Load data only when needed
return "expensive data", nil
})
data, _ := loader.Get()
println(data)
}
Pagination and Batching
package main
import (
"fmt"
)
// Paginator handles pagination
type Paginator struct {
totalItems int
pageSize int
}
// NewPaginator creates a new paginator
func NewPaginator(totalItems, pageSize int) *Paginator {
return &Paginator{
totalItems: totalItems,
pageSize: pageSize,
}
}
// GetPage gets a page of items
func (p *Paginator) GetPage(pageNum int) (start, end int) {
start = (pageNum - 1) * p.pageSize
end = start + p.pageSize
if end > p.totalItems {
end = p.totalItems
}
return start, end
}
// TotalPages returns total number of pages
func (p *Paginator) TotalPages() int {
pages := p.totalItems / p.pageSize
if p.totalItems%p.pageSize > 0 {
pages++
}
return pages
}
// Example usage
func PaginationExample() {
paginator := NewPaginator(1000, 100)
for page := 1; page <= paginator.TotalPages(); page++ {
start, end := paginator.GetPage(page)
fmt.Printf("Page %d: items %d-%d\n", page, start, end)
}
}
Best Practices
1. Stream Data
// Process data as it arrives
scanner := bufio.NewScanner(file)
for scanner.Scan() {
process(scanner.Text())
}
2. Use Chunking
// Process in manageable chunks
chunkSize := 1024 * 1024 // 1MB
3. Implement Pagination
// Load data in pages
paginator := NewPaginator(totalItems, pageSize)
4. Monitor Memory
// Track memory usage
var m runtime.MemStats
runtime.ReadMemStats(&m)
Common Pitfalls
1. Loading Entire Dataset
Always stream or chunk data.
2. No Memory Management
Use object pooling and lazy loading.
3. Insufficient Parallelization
Use multiple workers for processing.
4. No Progress Tracking
Monitor processing progress.
Resources
Summary
Efficient large dataset handling is crucial. Key takeaways:
- Stream data instead of loading entirely
- Use chunking for manageable processing
- Implement pagination for large datasets
- Use object pooling for memory efficiency
- Parallelize processing
- Monitor memory usage
- Test with realistic data volumes
By mastering large dataset handling, you can process massive amounts of data efficiently.
Comments