Skip to main content
โšก Calmops

gRPC: High-Performance RPC Framework

gRPC: High-Performance RPC Framework

Introduction

gRPC is a modern, high-performance RPC framework built on HTTP/2. It enables efficient communication between microservices with features like bidirectional streaming, multiplexing, and automatic code generation. This guide covers implementing gRPC services in Go.

gRPC uses Protocol Buffers for serialization and HTTP/2 for transport, providing significant performance advantages over traditional REST APIs.

gRPC Service Definition

Basic Service Definition

// user.proto
syntax = "proto3";

package user;

option go_package = "github.com/example/user/pb";

// User represents a user
message User {
  string id = 1;
  string name = 2;
  string email = 3;
}

// GetUserRequest requests a user
message GetUserRequest {
  string id = 1;
}

// CreateUserRequest creates a new user
message CreateUserRequest {
  string name = 1;
  string email = 2;
}

// UserService provides user operations
service UserService {
  rpc GetUser(GetUserRequest) returns (User);
  rpc CreateUser(CreateUserRequest) returns (User);
  rpc ListUsers(Empty) returns (stream User);
}

message Empty {}

Good: Proper gRPC Implementation

package main

import (
	"context"
	"fmt"
	"log"
	"net"
	"sync"

	"google.golang.org/grpc"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
	pb "github.com/example/user/pb"
)

// UserServer implements the UserService
type UserServer struct {
	pb.UnimplementedUserServiceServer
	users map[string]*pb.User
	mu    sync.RWMutex
}

// NewUserServer creates a new user server
func NewUserServer() *UserServer {
	return &UserServer{
		users: make(map[string]*pb.User),
	}
}

// GetUser retrieves a user
func (s *UserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
	// Validate input
	if req.Id == "" {
		return nil, status.Error(codes.InvalidArgument, "user ID is required")
	}

	// Check context
	select {
	case <-ctx.Done():
		return nil, status.Error(codes.Canceled, "request canceled")
	default:
	}

	s.mu.RLock()
	user, exists := s.users[req.Id]
	s.mu.RUnlock()

	if !exists {
		return nil, status.Errorf(codes.NotFound, "user not found: %s", req.Id)
	}

	return user, nil
}

// CreateUser creates a new user
func (s *UserServer) CreateUser(ctx context.Context, req *pb.CreateUserRequest) (*pb.User, error) {
	// Validate input
	if req.Name == "" {
		return nil, status.Error(codes.InvalidArgument, "name is required")
	}
	if req.Email == "" {
		return nil, status.Error(codes.InvalidArgument, "email is required")
	}

	select {
	case <-ctx.Done():
		return nil, status.Error(codes.Canceled, "request canceled")
	default:
	}

	user := &pb.User{
		Id:    fmt.Sprintf("user-%d", len(s.users)+1),
		Name:  req.Name,
		Email: req.Email,
	}

	s.mu.Lock()
	s.users[user.Id] = user
	s.mu.Unlock()

	return user, nil
}

// ListUsers lists all users
func (s *UserServer) ListUsers(req *pb.Empty, stream pb.UserService_ListUsersServer) error {
	s.mu.RLock()
	defer s.mu.RUnlock()

	for _, user := range s.users {
		select {
		case <-stream.Context().Done():
			return stream.Context().Err()
		default:
		}

		if err := stream.Send(user); err != nil {
			return status.Errorf(codes.Internal, "failed to send user: %v", err)
		}
	}

	return nil
}

// StartServer starts the gRPC server
func StartServer(port string) error {
	listener, err := net.Listen("tcp", ":"+port)
	if err != nil {
		return fmt.Errorf("failed to listen: %w", err)
	}

	server := grpc.NewServer()
	pb.RegisterUserServiceServer(server, NewUserServer())

	log.Printf("Starting gRPC server on port %s", port)
	return server.Serve(listener)
}

func main() {
	if err := StartServer("50051"); err != nil {
		log.Fatal(err)
	}
}

Bad: Improper gRPC Implementation

package main

import (
	"context"
	pb "github.com/example/user/pb"
)

// BAD: No input validation
type BadUserServer struct {
	pb.UnimplementedUserServiceServer
	users map[string]*pb.User
}

// BAD: No error handling
func (s *BadUserServer) GetUser(ctx context.Context, req *pb.GetUserRequest) (*pb.User, error) {
	// No validation
	// No context checking
	// No error handling
	return s.users[req.Id], nil
}

// BAD: No streaming error handling
func (s *BadUserServer) ListUsers(req *pb.Empty, stream pb.UserService_ListUsersServer) error {
	for _, user := range s.users {
		// No context checking
		// No error handling
		stream.Send(user)
	}
	return nil
}

Problems:

  • No input validation
  • No error handling
  • No context management
  • No proper status codes

gRPC Client Implementation

package main

import (
	"context"
	"fmt"
	"io"
	"log"
	"time"

	"google.golang.org/grpc"
	"google.golang.org/grpc/credentials/insecure"
	pb "github.com/example/user/pb"
)

// UserClient wraps the gRPC client
type UserClient struct {
	client pb.UserServiceClient
	conn   *grpc.ClientConn
}

// NewUserClient creates a new user client
func NewUserClient(address string) (*UserClient, error) {
	conn, err := grpc.Dial(
		address,
		grpc.WithTransportCredentials(insecure.NewCredentials()),
	)
	if err != nil {
		return nil, fmt.Errorf("failed to connect: %w", err)
	}

	return &UserClient{
		client: pb.NewUserServiceClient(conn),
		conn:   conn,
	}, nil
}

// GetUser retrieves a user
func (c *UserClient) GetUser(ctx context.Context, id string) (*pb.User, error) {
	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	return c.client.GetUser(ctx, &pb.GetUserRequest{Id: id})
}

// CreateUser creates a new user
func (c *UserClient) CreateUser(ctx context.Context, name, email string) (*pb.User, error) {
	ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
	defer cancel()

	return c.client.CreateUser(ctx, &pb.CreateUserRequest{
		Name:  name,
		Email: email,
	})
}

// ListUsers lists all users
func (c *UserClient) ListUsers(ctx context.Context) ([]*pb.User, error) {
	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	stream, err := c.client.ListUsers(ctx, &pb.Empty{})
	if err != nil {
		return nil, err
	}

	var users []*pb.User
	for {
		user, err := stream.Recv()
		if err == io.EOF {
			break
		}
		if err != nil {
			return nil, err
		}
		users = append(users, user)
	}

	return users, nil
}

// Close closes the client connection
func (c *UserClient) Close() error {
	return c.conn.Close()
}

func main() {
	client, err := NewUserClient("localhost:50051")
	if err != nil {
		log.Fatal(err)
	}
	defer client.Close()

	// Create user
	user, err := client.CreateUser(context.Background(), "John Doe", "[email protected]")
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Created user: %v\n", user)

	// Get user
	user, err = client.GetUser(context.Background(), user.Id)
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("Retrieved user: %v\n", user)

	// List users
	users, err := client.ListUsers(context.Background())
	if err != nil {
		log.Fatal(err)
	}
	fmt.Printf("All users: %v\n", users)
}

Streaming Patterns

Server-Side Streaming

// ServerSideStreaming sends multiple responses
func (s *UserServer) GetUsersByDomain(req *pb.GetUsersByDomainRequest, stream pb.UserService_GetUsersByDomainServer) error {
	s.mu.RLock()
	defer s.mu.RUnlock()

	for _, user := range s.users {
		if !strings.HasSuffix(user.Email, req.Domain) {
			continue
		}

		select {
		case <-stream.Context().Done():
			return stream.Context().Err()
		default:
		}

		if err := stream.Send(user); err != nil {
			return status.Errorf(codes.Internal, "failed to send: %v", err)
		}
	}

	return nil
}

Client-Side Streaming

// ClientSideStreaming receives multiple requests
func (s *UserServer) CreateUsers(stream pb.UserService_CreateUsersServer) error {
	var createdUsers []*pb.User

	for {
		req, err := stream.Recv()
		if err == io.EOF {
			return stream.SendAndClose(&pb.CreateUsersResponse{
				Users: createdUsers,
			})
		}
		if err != nil {
			return status.Errorf(codes.Internal, "failed to receive: %v", err)
		}

		user := &pb.User{
			Id:    fmt.Sprintf("user-%d", len(s.users)+1),
			Name:  req.Name,
			Email: req.Email,
		}

		s.mu.Lock()
		s.users[user.Id] = user
		s.mu.Unlock()

		createdUsers = append(createdUsers, user)
	}
}

Bidirectional Streaming

// BidirectionalStreaming sends and receives simultaneously
func (s *UserServer) SyncUsers(stream pb.UserService_SyncUsersServer) error {
	for {
		req, err := stream.Recv()
		if err == io.EOF {
			return nil
		}
		if err != nil {
			return status.Errorf(codes.Internal, "failed to receive: %v", err)
		}

		user := &pb.User{
			Id:    req.Id,
			Name:  req.Name,
			Email: req.Email,
		}

		s.mu.Lock()
		s.users[user.Id] = user
		s.mu.Unlock()

		if err := stream.Send(user); err != nil {
			return status.Errorf(codes.Internal, "failed to send: %v", err)
		}
	}
}

Interceptors

package main

import (
	"context"
	"log"
	"time"

	"google.golang.org/grpc"
)

// UnaryServerInterceptor logs unary RPC calls
func UnaryServerInterceptor(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
	start := time.Now()

	resp, err := handler(ctx, req)

	duration := time.Since(start)
	log.Printf("RPC: %s, Duration: %v, Error: %v", info.FullMethod, duration, err)

	return resp, err
}

// StreamServerInterceptor logs streaming RPC calls
func StreamServerInterceptor(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) error {
	start := time.Now()

	err := handler(srv, ss)

	duration := time.Since(start)
	log.Printf("Stream RPC: %s, Duration: %v, Error: %v", info.FullMethod, duration, err)

	return err
}

// Create server with interceptors
func NewServerWithInterceptors() *grpc.Server {
	return grpc.NewServer(
		grpc.UnaryInterceptor(UnaryServerInterceptor),
		grpc.StreamInterceptor(StreamServerInterceptor),
	)
}

Best Practices

1. Error Handling

Always use proper gRPC status codes:

return status.Errorf(codes.InvalidArgument, "invalid input")
return status.Errorf(codes.NotFound, "resource not found")
return status.Errorf(codes.Internal, "internal error")

2. Timeouts

Always set timeouts on client calls:

ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()

3. Connection Pooling

Reuse connections for multiple calls:

conn, _ := grpc.Dial("localhost:50051")
defer conn.Close()

client1 := pb.NewUserServiceClient(conn)
client2 := pb.NewOrderServiceClient(conn)

Common Pitfalls

1. No Timeout Management

Always set timeouts to prevent hanging requests.

2. Ignoring Stream Errors

Check for errors when sending/receiving on streams.

3. Not Validating Input

Always validate request data before processing.

4. Improper Error Codes

Use appropriate gRPC status codes for different error scenarios.

Resources

Summary

gRPC provides a powerful framework for building high-performance microservices. Key takeaways:

  • Use Protocol Buffers for efficient serialization
  • Implement proper error handling with status codes
  • Leverage streaming for efficient data transfer
  • Use interceptors for cross-cutting concerns
  • Always set timeouts on client calls
  • Validate input data thoroughly
  • Reuse connections for efficiency

By following these practices, you can build robust, high-performance gRPC services in Go.

Comments