Skip to main content
โšก Calmops

Advanced GraphQL Patterns in Rust: Federation, Directives, and Authorization

Advanced GraphQL Patterns in Rust: Federation, Directives, and Authorization

TL;DR: This guide covers advanced GraphQL patterns for production Rust APIs. You’ll learn schema federation, custom directives, field-level authorization, real-time subscriptions, and optimization techniques for high-scale deployments.


Introduction

Once you’ve built a basic GraphQL API, advanced patterns unlock:

  • Federation - Compose multiple services into one API
  • Custom Directives - Transform schema behavior
  • Field-Level Authorization - Granular access control
  • Subscriptions - Real-time data updates
  • Performance - Dataloaders, caching, query analysis

Schema Federation

Federate multiple GraphQL services into a unified supergraph.

Setting Up Federation

[dependencies]
async-graphql = { version = "7.0", features = ["apollo federation"] }

Federated Schema Definition

use async_graphql::{
    schema::{EmptySubscription, Schema},
    federation::Entity,
    Object, SimpleObject, InputObject, ID,
};

pub type AppSchema = Schema<QueryRoot, MutationRoot, SubscriptionRoot>;

pub struct QueryRoot;

#[Object]
impl QueryRoot {
    #[entity]
    async fn user(&self, id: ID) -> Option<User> {
        // Fetch from database
        User::find(&id).await.ok()
    }

    #[entity]
    async fn post(&self, id: ID) -> Option<Post> {
        Post::find(&id).await.ok()
    }

    async fn users(&self, after: Option<String>, first: i32) -> Connection<String, User> {
        // Pagination logic
        User::paginate(after, first).await
    }
}

Key Entity Resolution

use async_graphql::federation::Entity;

#[derive(SimpleObject, Clone)]
pub struct User {
    pub id: ID,
    pub username: String,
    pub email: String,
    pub created_at: i64,
}

#[derive(InputObject)]
pub struct UserKey {
    pub id: ID,
}

impl Entity for User {
    type Key = UserKey;

    async fn find(key: &Self::Key) -> async_graphql::Result<Option<Self>> {
        Ok(User::find_by_id(&key.id).await?)
    }
}

Extending External Types

use async_graphql::federation::External;

#[derive(External, Clone)]
struct ProductId(String);

#[derive(SimpleObject, Clone)]
struct Product {
    #[external]
    pub id: ProductId,
    
    #[graphql(override(from = "products"))]
    pub reviews: Vec<Review>,
}

Custom Directives

Creating Custom Directives

use async_graphql::{Schema, CustomDirective, Directive, InputValue, ServerResult, Value, Context};

#[derive(CustomDirective, Debug)]
pub struct DeprecatedDirective {
    pub reason: Option<String>,
}

impl DeprecatedDirective {
    pub fn new(reason: Option<String>) -> Self {
        Self { reason }
    }
}

pub struct DeprecatedDirectiveRuntime;

impl Directive for DeprecatedDirectiveRuntime {
    fn name(&self) -> &str {
        "deprecated"
    }

    fn arguments(&self) -> Vec<InputValue> {
        vec![InputValue::new("reason", Value::String(String::new()))]
    }

    fn execute(&self, _ctx: &Context, _args: &std::collections::HashMap<String, Value>) -> ServerResult<Value> {
        // Directive execution logic
        Ok(Value::Null)
    }
}

Using Directives in Schema

use async_graphql::SimpleObject;

#[derive(SimpleObject)]
pub struct User {
    #[graphql(visible = "isAuthenticated")]
    pub email: String,
    
    #[graphql(deprecated = "Use posts instead")]
    pub user_posts: Vec<Post>,
    
    #[graphql(complex)]
    pub internal_id: String,
}

fn isAuthenticated(ctx: &Context<'_>) -> bool {
    ctx.data::<CurrentUser>()
        .map(|u| u.is_authenticated())
        .unwrap_or(false)
}

Rate Limiting Directive

use async_graphql::{CustomDirective, Directive, InputValue, ServerResult, Value, Context};
use std::sync::Arc;
use std::collections::HashMap;
use tokio::sync::RwLock;

#[derive(CustomDirective)]
pub struct RateLimitDirective {
    pub max_requests: i32,
    pub window_seconds: i32,
}

pub struct RateLimitState {
    requests: Arc<RwLock<HashMap<String, Vec<std::time::Instant>>>>,
}

impl RateLimitDirective {
    pub fn new(max_requests: i32, window_seconds: i32) -> Self {
        Self { max_requests, window_seconds }
    }
}

pub struct RateLimitRuntime {
    state: RateLimitState,
}

impl RateLimitRuntime {
    pub fn new() -> Self {
        Self {
            state: RateLimitState {
                requests: Arc::new(RwLock::new(HashMap::new())),
            },
        }
    }
}

impl Directive for RateLimitRuntime {
    fn name(&self) -> &str {
        "rateLimit"
    }

    fn arguments(&self) -> Vec<InputValue> {
        vec![
            InputValue::new("maxRequests", Value::Int(async_graphql::Number::from(100))),
            InputValue::new("windowSeconds", Value::Int(async_graphql::Number::from(60))),
        ]
    }

    async fn execute(&self, ctx: &Context, args: &HashMap<String, Value>) -> ServerResult<Value> {
        let max_requests = args.get("maxRequests")
            .and_then(|v| v.as_i64())
            .unwrap_or(100) as i32;
            
        let window_seconds = args.get("windowSeconds")
            .and_then(|v| v.as_i64())
            .unwrap_or(60) as u64;
        
        let client_id = ctx.data::<ClientId>()
            .map(|c| c.0.clone())
            .unwrap_or_else(|_| "anonymous".to_string());
        
        let mut requests = self.state.requests.write().await;
        let now = std::time::Instant::now();
        
        let client_requests = requests.entry(client_id).or_insert_with(Vec::new);
        client_requests.retain(|t| now.duration_since(*t).as_secs() < window_seconds);
        
        if client_requests.len() >= max_requests as usize {
            return Err(async_graphql::ServerError::new(
                "Rate limit exceeded",
                Some(vec![async_graphql::Error::new(
                    "Too many requests, please try again later"
                )])
            ));
        }
        
        client_requests.push(now);
        Ok(Value::Null)
    }
}

Field-Level Authorization

Authorization Context

use async_graphql::{Context, Object, SimpleObject, ID};
use std::sync::Arc;
use tokio::sync::RwLock;

#[derive(Clone, SimpleObject)]
pub struct User {
    pub id: ID,
    pub username: String,
    pub email: String,
    pub role: UserRole,
}

#[derive(Clone, Copy, SimpleObject, Eq, PartialEq)]
pub enum UserRole {
    Admin,
    Moderator,
    User,
    Guest,
}

pub struct CurrentUser {
    pub id: ID,
    pub username: String,
    pub role: UserRole,
}

impl CurrentUser {
    pub fn is_admin(&self) -> bool {
        self.role == UserRole::Admin
    }
    
    pub fn can_read(&self, resource_owner_id: &ID) -> bool {
        self.is_admin() || self.id == *resource_owner_id
    }
}

pub struct AuthService {
    pub current_user: Arc<RwLock<Option<CurrentUser>>>,
}

impl AuthService {
    pub fn new() -> Self {
        Self {
            current_user: Arc::new(RwLock::new(None)),
        }
    }
    
    pub async fn set_user(&self, user: CurrentUser) {
        let mut current = self.current_user.write().await;
        *current = Some(user);
    }
}

Field-Level Authorization Implementation

use async_graphql::{Context, Object, SimpleObject, ID, Schema};
use crate::auth::{CurrentUser, UserRole};

pub struct QueryRoot;

#[Object]
impl QueryRoot {
    // Public - accessible to everyone
    async fn public_post(&self, id: ID) -> Option<Post> {
        Post::find(&id).await.ok()
    }
    
    // Protected - requires authentication
    #[graphql(visible = "isAuthenticated")]
    async fn my_posts(&self, ctx: &Context<'_>) -> async_graphql::Result<Vec<Post>> {
        let user = ctx.data::<CurrentUser>()?;
        Post::find_by_user(&user.id).await
    }
    
    // Admin only
    #[graphql(visible = "isAdmin")]
    async fn all_users(&self, ctx: &Context<'_>) -> async_graphql::Result<Vec<User>> {
        User::all().await
    }
    
    // Owner or admin
    #[graphql(visible = "canViewUser")]
    async fn user(&self, ctx: &Context<'_>, id: ID) -> async_graphql::Result<User> {
        let current = ctx.data::<CurrentUser>()?;
        
        if current.is_admin() || current.id == id {
            return User::find(&id).await
                .ok_or_else(|| async_graphql::Error::from("User not found"));
        }
        
        Err(async_graphql::Error::from("Forbidden"))
    }
}

fn isAuthenticated(ctx: &Context<'_>) -> bool {
    ctx.data::<CurrentUser>().is_ok()
}

fn isAdmin(ctx: &Context<'_>) -> bool {
    ctx.data::<CurrentUser>()
        .map(|u| u.role == UserRole::Admin)
        .unwrap_or(false)
}

fn canViewUser(ctx: &Context<'_>, id: &ID) -> bool {
    ctx.data::<CurrentUser>()
        .map(|u| u.is_admin() || u.id == *id)
        .unwrap_or(false)
}

Role-Based Access Control (RBAC)

use async_graphql::{SimpleObject, InputObject};
use std::collections::HashSet;

#[derive(InputObject)]
pub struct PermissionInput {
    pub resource: String,
    pub action: String,
}

#[derive(Clone, SimpleObject)]
pub struct Permission {
    pub resource: String,
    pub actions: Vec<String>,
}

pub struct PermissionService {
    permissions: HashSet<(String, String)>, // (resource, action)
}

impl PermissionService {
    pub fn new() -> Self {
        let mut permissions = HashSet::new();
        
        // Admin permissions
        permissions.insert(("users".to_string(), "read".to_string()));
        permissions.insert(("users".to_string(), "write".to_string()));
        permissions.insert(("users".to_string(), "delete".to_string()));
        
        // Moderator permissions
        permissions.insert(("posts".to_string(), "read".to_string()));
        permissions.insert(("posts".to_string(), "write".to_string()));
        permissions.insert(("posts".to_string(), "moderate".to_string()));
        
        // User permissions
        permissions.insert(("posts".to_string(), "read".to_string()));
        permissions.insert(("posts".to_string(), "write".to_string()));
        
        Self { permissions }
    }
    
    pub fn has_permission(&self, role: &str, resource: &str, action: &str) -> bool {
        let role_prefix = match role {
            "admin" => vec!["admin", "moderator", "user"],
            "moderator" => vec!["moderator", "user"],
            _ => vec!["user"],
        };
        
        for prefix in role_prefix {
            let permission_key = (format!("{}_{}", prefix, resource), action.to_string());
            if self.permissions.contains(&permission_key) {
                return true;
            }
        }
        
        false
    }
}

Real-Time Subscriptions

Subscription Implementation

use async_graphql::{Object, Subscription, Context, SimpleObject, ID};
use async_graphql_warp::graphql;
use std::sync::Arc;
use tokio::sync::broadcast;

pub struct SubscriptionRoot;

#[Subscription]
impl SubscriptionRoot {
    async fn on_post_added(
        &self,
        ctx: &Context<'_>,
    ) -> Result<impl Stream<Item = Post>, async_graphql::Error> {
        let post_events = ctx.data::<Arc<PostEventBus>>()?;
        
        Ok(post_events.subscribe())
    }
    
    async fn on_user_status_changed(
        &self,
        ctx: &Context<'_>,
        user_id: ID,
    ) -> Result<impl Stream<Item = UserStatus>, async_graphql::Error> {
        let user_events = ctx.data::<Arc<UserEventBus>>()?;
        
        Ok(user_events.subscribe_for_user(user_id))
    }
}

pub struct PostEventBus {
    sender: broadcast::Sender<Post>,
}

impl PostEventBus {
    pub fn new() -> Self {
        let (sender, _) = broadcast::channel(100);
        Self { sender }
    }
    
    pub fn publish(&self, post: Post) {
        let _ = self.sender.send(post);
    }
    
    pub fn subscribe(&self) -> impl Stream<Item = Post> {
        self.sender.subscribe()
    }
}

#[derive(Clone, SimpleObject)]
pub struct Post {
    pub id: ID,
    pub title: String,
    pub content: String,
    pub author_id: ID,
}

WebSocket Subscription Server

use async_graphql_warp::graphql;
use warp::Filter;

async fn graphql_ws_handler(
    schema: AppSchema,
    http_response: warp::http::Response<warp::hyper::Body>,
) -> impl warp::reply::Response {
    let (request, _) = http_response.into_parts();
    
    graphql(&schema)
        .with_warp()
        .run(request)
        .await
}

pub fn create_routes(schema: AppSchema) -> impl Filter<Extract = (impl warp::reply::Reply,), Error = warp::Rejection> + Clone {
    let schema_filter = async_graphql_warp::graphql(schema);
    
    warp::path("graphql")
        .and(warp::post())
        .and(warp::body::json())
        .and(schema_filter)
        .and_then(|(schema, request): (AppSchema, async_graphql::Request)| async move {
            let resp = schema.execute(request).await;
            Ok::<_, std::convert::Infallible>(async_graphql_warp::GraphQLResponse::from(resp))
        })
}

DataLoader Pattern

Batch Loading

use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;

pub trait Loader<K, V> {
    fn load(&self, keys: Vec<K>) -> impl std::future::Future<Output = Result<Vec<V>, async_graphql::Error>> + Send;
}

pub struct DataLoader<K, V> {
    loader: Arc<dyn Loader<K, V> + Send + Sync>,
    cache: Arc<RwLock<HashMap<K, V>>>,
    batch_size: usize,
}

impl<K, V> DataLoader<K, V> 
where 
    K: Clone + Eq + std::hash::Hash + Send + 'static,
    V: Clone + Send + 'static,
{
    pub fn new(loader: Arc<dyn Loader<K, V> + Send + Sync>, batch_size: usize) -> Self {
        Self {
            loader,
            cache: Arc::new(RwLock::new(HashMap::new())),
            batch_size,
        }
    }
    
    pub async fn load_one(&self, key: K) -> Result<V, async_graphql::Error> {
        // Check cache first
        {
            let cache = self.cache.read().await;
            if let Some(value) = cache.get(&key) {
                return Ok(value.clone());
            }
        }
        
        // Load with batching
        self.load_many(vec![key])
            .await?
            .into_iter()
            .next()
            .ok_or_else(|| async_graphql::Error::from("Key not found"))
    }
    
    pub async fn load_many(&self, keys: Vec<K>) -> Result<Vec<V>, async_graphql::Error> {
        // Deduplicate and check cache
        let unique_keys: Vec<K> = keys.into_iter().collect::<std::collections::HashSet<_>>()
            .into_iter()
            .collect();
        
        let mut results = Vec::with_capacity(unique_keys.len());
        let mut uncached_keys = Vec::new();
        
        // Check cache
        {
            let cache = self.cache.read().await;
            for key in &unique_keys {
                if let Some(value) = cache.get(key) {
                    results.push(value.clone());
                } else {
                    uncached_keys.push(key.clone());
                }
            }
        }
        
        // Batch load uncached
        if !uncached_keys.is_empty() {
            let loaded = self.loader.load(uncached_keys.clone()).await?;
            
            // Update cache
            {
                let mut cache = self.cache.write().await;
                for (key, value) in unique_keys.iter().zip(loaded.iter()) {
                    cache.insert(key.clone(), value.clone());
                }
            }
            
            results.extend(loaded);
        }
        
        Ok(results)
    }
}

User Posts DataLoader

pub struct UserPostsLoader {
    pool: sqlx::PgPool,
}

#[async_trait]
impl Loader<ID, Vec<Post>> for UserPostsLoader {
    async fn load(&self, user_ids: Vec<ID>) -> Result<Vec<Vec<Post>>, async_graphql::Error> {
        let posts = sqlx::query_as!(
            Post,
            "SELECT id, title, content, author_id FROM posts WHERE author_id = ANY($1)",
            user_ids.iter().map(|id| id.to_string()).collect::<Vec<_>>()
        )
        .fetch_all(&self.pool)
        .await
        .map_err(|e| async_graphql::Error::from(e.to_string()))?;
        
        // Group by user_id
        let mut result: Vec<Vec<Post>> = vec![vec![]; user_ids.len()];
        for post in posts {
            if let Some(idx) = user_ids.iter().position(|id| id.to_string() == post.author_id) {
                result[idx].push(post);
            }
        }
        
        Ok(result)
    }
}

Query Performance Optimization

Query Complexity Analysis

use async_graphql::{Object, Context, Value, ServerResult};
use std::collections::HashMap;

pub struct ComplexityAnalyzer {
    field_costs: HashMap<String, i32>,
}

impl ComplexityAnalyzer {
    pub fn new() -> Self {
        let mut field_costs = HashMap::new();
        
        // Base costs
        field_costs.insert("Query.user".to_string(), 1);
        field_costs.insert("Query.users".to_string(), 1);
        field_costs.insert("User.posts".to_string(), 2);
        field_costs.insert("User.comments".to_string(), 2);
        
        Self { field_costs }
    }
    
    pub fn analyze(&self, query: &async_graphql::Request) -> i32 {
        let mut total = 0;
        
        for selection in query.query.selection() {
            total += self.calculate_field_cost(selection);
        }
        
        total
    }
    
    fn calculate_field_cost(&self, selection: &async_graphql::Selection) -> i32 {
        let name = selection.name();
        self.field_costs.get(name).copied().unwrap_or(1)
    }
}

pub fn enforce_query_limits(
    ctx: &Context<'_>,
    max_complexity: i32,
) -> ServerResult<()> {
    let analyzer = ctx.data::<ComplexityAnalyzer>()?;
    
    // This would need request access
    // let complexity = analyzer.analyze(&request);
    
    // if complexity > max_complexity {
    //     return Err(async_graphql::ServerError::new(
    //         "Query too complex",
    //         None,
    //     ));
    // }
    
    Ok(())
}

Conclusion

Advanced GraphQL patterns enable:

  1. Federation - Compose services into unified APIs
  2. Custom Directives - Transform schema behavior
  3. Field-Level Authorization - Granular access control
  4. Subscriptions - Real-time updates
  5. DataLoaders - Batch loading for N+1 prevention
  6. Query Analysis - Complexity and depth limits

These patterns are essential for production GraphQL APIs at scale.


External Resources


Comments