Skip to main content
โšก Calmops

Recurrent Neural Networks (RNNs) and LSTMs: Sequence Processing

Recurrent Neural Networks (RNNs) and LSTMs: Sequence Processing

Recurrent Neural Networks are designed for sequential data where the order matters. They maintain hidden state that captures information from previous time steps, making them ideal for time series, text, and other sequential tasks.

RNN Fundamentals

Why RNNs for Sequences?

Traditional feedforward networks treat inputs independently, but sequences have temporal dependencies:

  • Previous words influence next word in text
  • Past prices influence future stock prices
  • Previous frames affect next video frame

RNNs solve this through recurrent connections that maintain hidden state across time steps.

Basic RNN Architecture

import numpy as np
import matplotlib.pyplot as plt

class SimpleRNN:
    def __init__(self, input_size, hidden_size, output_size, learning_rate=0.01):
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.learning_rate = learning_rate
        
        # Initialize weights
        self.Wxh = np.random.randn(input_size, hidden_size) * 0.01
        self.Whh = np.random.randn(hidden_size, hidden_size) * 0.01
        self.Why = np.random.randn(hidden_size, output_size) * 0.01
        self.bh = np.zeros((1, hidden_size))
        self.by = np.zeros((1, output_size))
    
    def tanh(self, x):
        return np.tanh(x)
    
    def tanh_derivative(self, x):
        return 1 - np.tanh(x) ** 2
    
    def softmax(self, x):
        exp_x = np.exp(x - np.max(x, axis=1, keepdims=True))
        return exp_x / np.sum(exp_x, axis=1, keepdims=True)
    
    def forward(self, X):
        """Forward pass through sequence"""
        batch_size, seq_len, _ = X.shape
        
        # Initialize hidden state
        h = np.zeros((batch_size, self.hidden_size))
        
        # Store activations for backprop
        self.h_cache = [h]
        self.x_cache = []
        
        # Process sequence
        for t in range(seq_len):
            x_t = X[:, t, :]
            self.x_cache.append(x_t)
            
            # Hidden state update: h_t = tanh(Wxh @ x_t + Whh @ h_{t-1} + bh)
            h = self.tanh(np.dot(x_t, self.Wxh) + np.dot(h, self.Whh) + self.bh)
            self.h_cache.append(h)
        
        # Output: y = Why @ h_T + by
        y = np.dot(h, self.Why) + self.by
        output = self.softmax(y)
        
        return output
    
    def backward(self, X, y, output):
        """Backpropagation through time (BPTT)"""
        batch_size, seq_len, _ = X.shape
        
        # Output layer gradient
        dy = output - y
        dWhy = np.dot(self.h_cache[-1].T, dy)
        dby = np.sum(dy, axis=0, keepdims=True)
        
        # Backprop through time
        dh = np.dot(dy, self.Why.T)
        dWxh = np.zeros_like(self.Wxh)
        dWhh = np.zeros_like(self.Whh)
        dbh = np.zeros_like(self.bh)
        
        for t in range(seq_len - 1, -1, -1):
            # Gradient through tanh
            dh_raw = dh * self.tanh_derivative(self.h_cache[t+1])
            
            # Accumulate gradients
            dWxh += np.dot(self.x_cache[t].T, dh_raw)
            dWhh += np.dot(self.h_cache[t].T, dh_raw)
            dbh += np.sum(dh_raw, axis=0, keepdims=True)
            
            # Propagate to previous time step
            dh = np.dot(dh_raw, self.Whh.T)
        
        # Clip gradients to prevent explosion
        for dparam in [dWxh, dWhh, dWhy, dbh, dby]:
            np.clip(dparam, -5, 5, out=dparam)
        
        # Update weights
        self.Wxh -= self.learning_rate * dWxh
        self.Whh -= self.learning_rate * dWhh
        self.Why -= self.learning_rate * dWhy
        self.bh -= self.learning_rate * dbh
        self.by -= self.learning_rate * dby

# Example: Simple sequence prediction
X = np.random.randn(32, 10, 5)  # batch_size=32, seq_len=10, input_size=5
y = np.random.randint(0, 3, (32, 3))  # 3 output classes

rnn = SimpleRNN(input_size=5, hidden_size=20, output_size=3)
output = rnn.forward(X)
print(f"Output shape: {output.shape}")

The Vanishing Gradient Problem

RNNs suffer from vanishing gradients when backpropagating through many time steps.

# Visualization of vanishing gradients
import numpy as np
import matplotlib.pyplot as plt

# Simulate gradient flow through time
def simulate_gradient_flow(num_steps, gradient_factor=0.9):
    """Simulate how gradients shrink through time"""
    gradients = [1.0]
    
    for _ in range(num_steps):
        gradients.append(gradients[-1] * gradient_factor)
    
    return gradients

# Compare different gradient factors
steps = 50
plt.figure(figsize=(10, 5))

for factor in [0.5, 0.8, 0.9, 0.95]:
    grads = simulate_gradient_flow(steps, factor)
    plt.plot(grads, label=f'Factor: {factor}')

plt.xlabel('Time Step')
plt.ylabel('Gradient Magnitude')
plt.title('Vanishing Gradient Problem in RNNs')
plt.legend()
plt.yscale('log')
plt.grid(True)
plt.show()

LSTMs: Long Short-Term Memory

LSTMs solve the vanishing gradient problem through gating mechanisms.

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# LSTM cell components
class LSTMCell:
    """
    LSTM cell with forget, input, and output gates
    
    Forget gate: f_t = sigmoid(Wf @ [h_{t-1}, x_t] + bf)
    Input gate: i_t = sigmoid(Wi @ [h_{t-1}, x_t] + bi)
    Candidate: C_tilde = tanh(Wc @ [h_{t-1}, x_t] + bc)
    Cell state: C_t = f_t * C_{t-1} + i_t * C_tilde
    Output gate: o_t = sigmoid(Wo @ [h_{t-1}, x_t] + bo)
    Hidden state: h_t = o_t * tanh(C_t)
    """
    pass

# Build LSTM model with Keras
lstm_model = keras.Sequential([
    layers.LSTM(128, activation='relu', input_shape=(timesteps, features), 
                return_sequences=True),
    layers.Dropout(0.2),
    layers.LSTM(64, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(32, activation='relu'),
    layers.Dense(1, activation='sigmoid')
])

lstm_model.compile(
    optimizer='adam',
    loss='binary_crossentropy',
    metrics=['accuracy']
)

Building RNN/LSTM Models with TensorFlow/Keras

Time Series Prediction

import numpy as np
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.preprocessing import MinMaxScaler

# Generate synthetic time series data
def generate_time_series(num_samples=1000, seq_length=50):
    """Generate synthetic time series"""
    data = np.sin(np.linspace(0, 100, num_samples)) + np.random.normal(0, 0.1, num_samples)
    
    X, y = [], []
    for i in range(len(data) - seq_length):
        X.append(data[i:i+seq_length])
        y.append(data[i+seq_length])
    
    return np.array(X), np.array(y)

# Prepare data
X, y = generate_time_series(num_samples=1000, seq_length=50)
X = X.reshape(X.shape[0], X.shape[1], 1)  # Add feature dimension

# Split data
split = int(0.8 * len(X))
X_train, X_test = X[:split], X[split:]
y_train, y_test = y[:split], y[split:]

# Build LSTM model
model = keras.Sequential([
    layers.LSTM(64, activation='relu', input_shape=(50, 1), return_sequences=True),
    layers.Dropout(0.2),
    layers.LSTM(32, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(16, activation='relu'),
    layers.Dense(1)
])

model.compile(optimizer='adam', loss='mse', metrics=['mae'])

# Train
history = model.fit(
    X_train, y_train,
    epochs=20,
    batch_size=32,
    validation_split=0.2,
    verbose=1
)

# Evaluate
test_loss, test_mae = model.evaluate(X_test, y_test)
print(f"Test MAE: {test_mae:.4f}")

# Predict
predictions = model.predict(X_test[:10])

Text Generation with RNN

# Simple character-level text generation
text = "The quick brown fox jumps over the lazy dog"

# Create character mappings
chars = sorted(set(text))
char_to_idx = {c: i for i, c in enumerate(chars)}
idx_to_char = {i: c for i, c in enumerate(chars)}

# Prepare sequences
seq_length = 10
X_text, y_text = [], []

for i in range(len(text) - seq_length):
    X_text.append([char_to_idx[c] for c in text[i:i+seq_length]])
    y_text.append(char_to_idx[text[i+seq_length]])

X_text = np.array(X_text)
y_text = np.array(y_text)

# Build model
text_model = keras.Sequential([
    layers.Embedding(len(chars), 32, input_length=seq_length),
    layers.LSTM(128, return_sequences=True),
    layers.Dropout(0.2),
    layers.LSTM(64),
    layers.Dropout(0.2),
    layers.Dense(len(chars), activation='softmax')
])

text_model.compile(optimizer='adam', loss='sparse_categorical_crossentropy', metrics=['accuracy'])

# text_model.fit(X_text, y_text, epochs=50, batch_size=16)

Building RNNs with PyTorch

import torch
import torch.nn as nn
import torch.optim as optim

class LSTMModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size, dropout=0.2):
        super(LSTMModel, self).__init__()
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        
        self.lstm = nn.LSTM(
            input_size, hidden_size, num_layers,
            batch_first=True, dropout=dropout
        )
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        # LSTM forward pass
        lstm_out, (h_n, c_n) = self.lstm(x)
        
        # Use last hidden state
        out = self.fc(h_n[-1])
        
        return out

# Create model
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = LSTMModel(input_size=10, hidden_size=64, num_layers=2, output_size=1).to(device)

# Training
criterion = nn.MSELoss()
optimizer = optim.Adam(model.parameters(), lr=0.001)

# Dummy data
X_train = torch.randn(100, 50, 10).to(device)  # batch_size=100, seq_len=50, input_size=10
y_train = torch.randn(100, 1).to(device)

for epoch in range(10):
    optimizer.zero_grad()
    
    outputs = model(X_train)
    loss = criterion(outputs, y_train)
    
    loss.backward()
    optimizer.step()
    
    if (epoch + 1) % 2 == 0:
        print(f"Epoch {epoch+1}, Loss: {loss.item():.4f}")

GRU: Gated Recurrent Unit

GRU is a simpler alternative to LSTM with fewer parameters:

# GRU model with Keras
gru_model = keras.Sequential([
    layers.GRU(128, activation='relu', input_shape=(timesteps, features), 
               return_sequences=True),
    layers.Dropout(0.2),
    layers.GRU(64, activation='relu'),
    layers.Dropout(0.2),
    layers.Dense(32, activation='relu'),
    layers.Dense(1, activation='sigmoid')
])

gru_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

# GRU with PyTorch
class GRUModel(nn.Module):
    def __init__(self, input_size, hidden_size, num_layers, output_size):
        super(GRUModel, self).__init__()
        self.gru = nn.GRU(input_size, hidden_size, num_layers, batch_first=True)
        self.fc = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        gru_out, h_n = self.gru(x)
        out = self.fc(h_n[-1])
        return out

Bidirectional RNNs

Process sequences in both directions:

# Bidirectional LSTM
bidirectional_model = keras.Sequential([
    layers.Bidirectional(
        layers.LSTM(64, activation='relu', return_sequences=True),
        input_shape=(timesteps, features)
    ),
    layers.Dropout(0.2),
    layers.Bidirectional(layers.LSTM(32, activation='relu')),
    layers.Dropout(0.2),
    layers.Dense(16, activation='relu'),
    layers.Dense(1, activation='sigmoid')
])

bidirectional_model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])

Best Practices

  1. Gradient clipping: Prevent exploding gradients in RNNs
  2. Use LSTM/GRU: Better than vanilla RNN for long sequences
  3. Bidirectional processing: When full sequence is available
  4. Stateful RNNs: For very long sequences, process in chunks
  5. Sequence padding: Handle variable-length sequences
  6. Dropout: Prevent overfitting in deep RNNs

Common Pitfalls

Bad Practice:

# Don't: Use vanilla RNN for long sequences
model = keras.Sequential([
    layers.SimpleRNN(64, input_shape=(1000, 10)),  # Vanishing gradients!
    layers.Dense(1)
])

# Don't: Forget to reshape for time dimension
X = X.reshape(-1, 10)  # Missing time dimension

# Don't: No gradient clipping
model.fit(X, y, epochs=100)  # May explode

Good Practice:

# Do: Use LSTM for long sequences
model = keras.Sequential([
    layers.LSTM(64, input_shape=(1000, 10)),
    layers.Dense(1)
])

# Do: Include time dimension
X = X.reshape(-1, timesteps, features)

# Do: Use gradient clipping
optimizer = keras.optimizers.Adam(clipvalue=1.0)
model.compile(optimizer=optimizer, loss='mse')

Conclusion

RNNs and LSTMs enable processing of sequential data with temporal dependencies. LSTMs solve the vanishing gradient problem through gating mechanisms, making them suitable for long sequences. Understand the architecture, use appropriate variants (LSTM/GRU), and monitor training carefully to build effective sequence models.

Comments