Skip to main content

Import Data from MongoDB into Meilisearch

Import Data from MongoDB into Meilisearch

Created: September 22, 2022 11 min read

Background

MongoDB stores data as JSON-like documents in a schema-less structure. Each document has a unique _id field, typically an ObjectId.

Meilisearch is an open-source search engine written in Rust. It provides typo-tolerant full-text search with instant responses. Meilisearch accepts JSON and CSV input and requires a unique string id field for every document.

This guide walks through the full pipeline: exporting data from MongoDB, transforming it into a Meilisearch-compatible format, and importing it at scale.

Architecture Overview

The import pipeline moves data through four stages:

┌──────────┐    ┌───────────────┐    ┌────────────────┐    ┌─────────────┐
│ MongoDB  │───▶│ Export Method │───▶│ Transform      │───▶│ Meilisearch │
│ Database │    │ (mongoexport, │    │ (normalize     │    │ Index       │
│          │    │  aggregation, │    │  types, flatten│    │             │
│          │    │  change stream)│    │  map fields)   │    │             │
└──────────┘    └───────────────┘    └────────────────┘    └─────────────┘
                       │
                       ▼
              ┌──────────────────┐
              │ Staging (JSON    │
              │ files, memory,   │
              │ or message queue)│
              └──────────────────┘

MongoDB is the source of truth. You export documents, transform fields (ObjectId to string, ISODate to string, flatten nested objects), and push the result to Meilisearch. For one-time bulk loads you export to a file. For ongoing syncs you use change streams.

MongoDB Data Export Methods

Three approaches cover most use cases. Each has different trade-offs in speed, complexity, and data fidelity.

mongoexport (Simple, File-Based)

mongoexport is a command-line tool included with MongoDB. It writes collections to JSON or CSV files.

mongoexport \
  --uri="mongodb://localhost:27017" \
  --db=my_db \
  --collection=users \
  --out=users.json \
  --jsonArray

The --jsonArray flag wraps documents in an array, which Meilisearch requires. Without it every document appears on its own line (JSONL format), which Meilisearch does not accept directly.

mongodump / mongorestore (Binary, Fastest)

mongodump creates a binary BSON dump. It is faster than mongoexport for large collections but requires an extra conversion step to get plain JSON.

mongodump \
  --uri="mongodb://localhost:27017" \
  --db=my_db \
  --collection=users \
  --out=./dump

To convert BSON to JSON you use bsondump:

bsondump ./dump/my_db/users.bson > users.json

This produces JSONL format (one document per line). You still need to wrap it in an array or use Meilisearch’s JSONL support if your version allows it.

Aggregation Pipeline with $out (Most Flexible)

Running an aggregation pipeline inside MongoDB gives you full control over the output. You can rename fields, cast types, filter, and flatten inside the database before the data ever leaves.

db.users.aggregate([
  { $match: { active: true } },
  { $addFields: {
      id: { $toString: "$_id" },
      registered_at: { $toString: "$created_at" }
  }},
  { $project: { _id: 0, created_at: 0 } },
  { $out: "users_for_export" }
])

This writes the transformed documents to a new collection users_for_export. You then export that pre-processed collection with mongoexport.

Method Comparison

Method Speed Complexity Best For
mongoexport Fast Low One-off exports, small to medium datasets
mongodump + bsondump Fastest Medium Very large datasets, raw performance needed
Aggregation + $out Moderate High Complex transformations before export
Change Streams Real-time High Ongoing sync, incremental updates

Preparing the View for Export

Meilisearch requires a string id field containing only [0-9a-zA-Z_]. MongoDB’s _id is an ObjectId that serializes with $oid syntax, which includes $ characters that Meilisearch rejects.

Create a MongoDB view that converts _id to a string id and removes the original field:

var pipeline = [
  { $addFields: { id: { "$toString": "$_id" } } },
  { $project: { _id: 0 } }
];

db.createView("users_export_view", "users", pipeline);

Now export the view directly:

mongoexport \
  --uri="mongodb://localhost:27017" \
  --db=my_db \
  --collection=users_export_view \
  --out=users_export.json \
  --jsonArray

Handling Nested Documents

MongoDB stores sub-documents and arrays freely. Meilisearch accepts nested JSON, but flat structures perform better.

Flatten nested objects in the view pipeline:

var pipeline = [
  { $addFields: {
      id: { $toString: "$_id" },
      "address_city": "$address.city",
      "address_zip": "$address.zip"
  }},
  { $project: { _id: 0, address: 0 } }
];

Handling Arrays

Arrays can appear as-is if Meilisearch should index each element as a searchable string. If you need to join array elements into a single string:

var pipeline = [
  { $addFields: {
      id: { $toString: "$_id" },
      tags: { $reduce: {
          input: "$tags",
          initialValue: "",
          in: { $cond: {
              if: { $eq: ["$$value", ""] },
              then: "$$this",
              else: { $concat: ["$$value", ", ", "$$this"] }
          }}
      }}
  }},
  { $project: { _id: 0 } }
];

Handling Dates

Meilisearch accepts ISO 8601 date strings. Convert ISODate fields with $toString:

var pipeline = [
  { $addFields: {
      id: { $toString: "$_id" },
      created_at: { $toString: "$created_at" },
      updated_at: { $toString: "$updated_at" }
  }},
  { $project: { _id: 0 } }
];

Data Transformation Pipeline

When the view approach is insufficient — for example when you need to combine data from multiple collections, or apply logic that MongoDB aggregation does not handle well — you write a transformation script.

ObjectId to String

Meilisearch rejects {"$oid": "..."}. Every _id must become a plain string:

if (doc._id && doc._id.$oid) {
  doc.id = doc._id.$oid;
  delete doc._id;
}

ISODate to String

Dates exported via mongoexport appear as {"$date": "2024-01-15T10:30:00Z"}. Convert them:

if (doc.created_at && doc.created_at.$date) {
  doc.created_at = doc.created_at.$date;
}

Flatten Embedded Documents

Meilisearch handles nested objects but flat structures are simpler to search and sort:

def flatten(doc, parent_key="", sep="_"):
    items = []
    for k, v in doc.items():
        new_key = f"{parent_key}{sep}{k}" if parent_key else k
        if isinstance(v, dict):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)

Remove Unsupported Types

MongoDB documents may contain ObjectId, DBRef, BinData, or Timestamp types that do not survive JSON export cleanly. Strip or convert them before sending to Meilisearch.

Importing Data into Meilisearch

Adjust Payload Size Limit

Meilisearch has a default payload limit of 100 MB. Large exports exceed this. Raise it at startup:

meilisearch \
  --http-addr 0.0.0.0:7700 \
  --master-key="your-secure-key" \
  --http-payload-size-limit=100Gb

Upload Documents

curl \
  -X POST 'http://localhost:7700/indexes/users/documents' \
  -H 'Content-Type: application/json' \
  -H 'Authorization: Bearer your-secure-key' \
  --data-binary @users_export.json

Check Task Status

Meilisearch processes document additions asynchronously. Poll the returned task UID:

curl \
  -H 'Authorization: Bearer your-secure-key' \
  -X GET 'http://localhost:7700/tasks/42'

Example success response:

{
  "uid": 42,
  "indexUid": "users",
  "status": "succeeded",
  "type": "documentAdditionOrUpdate",
  "details": {
    "receivedDocuments": 500000,
    "indexedDocuments": 500000
  }
}

Validating After Import

Compare document counts between MongoDB and Meilisearch to confirm nothing was lost:

# MongoDB document count
mongo --quiet --eval 'db.users.countDocuments({active: true})' my_db

# Meilisearch document count
curl -s \
  -H 'Authorization: Bearer your-secure-key' \
  -X GET 'http://localhost:7700/indexes/users/statistics' | jq '.numberOfDocuments'

If the counts do not match, inspect the Meilisearch task log for errors and re-run the failed batches.

Large Dataset Handling

Chunked Exports

For collections with millions of documents, export in chunks using --skip and --limit:

mongoexport --db=my_db --collection=users \
  --skip=0 --limit=100000 --out=users_0.json --jsonArray

mongoexport --db=my_db --collection=users \
  --skip=100000 --limit=100000 --out=users_1.json --jsonArray

Batch Uploads to Meilisearch

Split the final JSON into 50 MB chunks and upload each batch sequentially:

split -l 50000 users_export.json chunk_
for f in chunk_*; do
  echo "Uploading $f..."
  curl -X POST 'http://localhost:7700/indexes/users/documents' \
    -H 'Content-Type: application/json' \
    -H 'Authorization: Bearer your-secure-key' \
    --data-binary "@$f"
done

Streaming Imports with Node.js

Avoid writing intermediate files entirely by streaming from MongoDB directly into Meilisearch:

const { MongoClient } = require("mongodb");
const MeiliSearch = require("meilisearch");

const mongo = new MongoClient("mongodb://localhost:27017");
const meili = new MeiliSearch({ host: "http://localhost:7700", apiKey: "your-secure-key" });

async function streamImport() {
  await mongo.connect();
  const db = mongo.db("my_db");
  const cursor = db.collection("users").find({ active: true });
  const index = meili.index("users");

  let batch = [];
  for await (const doc of cursor) {
    const transformed = {
      id: doc._id.toString(),
      name: doc.name,
      email: doc.email,
      created_at: doc.created_at?.toISOString(),
    };
    batch.push(transformed);
    if (batch.length >= 10000) {
      await index.addDocuments(batch);
      batch = [];
    }
  }
  if (batch.length > 0) {
    await index.addDocuments(batch);
  }
  await mongo.close();
}

streamImport().catch(console.error);

This script streams documents from a MongoDB cursor, transforms each one, and sends 10,000-document batches to Meilisearch. No intermediate file needed.

Incremental Sync Strategies

A one-time import gets stale. Choose an incremental sync approach that matches your tolerance for latency and complexity.

Change Streams from MongoDB to Meilisearch

MongoDB change streams emit every insert, update, and delete in real time. Watch the oplog and reflect changes into Meilisearch:

const { MongoClient } = require("mongodb");
const MeiliSearch = require("meilisearch");

const mongo = new MongoClient("mongodb://localhost:27017");
const meili = new MeiliSearch({ host: "http://localhost:7700", apiKey: "your-secure-key" });

async function watchCollection() {
  await mongo.connect();
  const collection = mongo.db("my_db").collection("users");
  const index = meili.index("users");
  const pipeline = [{ $match: { operationType: { $in: ["insert", "update", "delete"] } } }];
  const changeStream = collection.watch(pipeline, { fullDocument: "updateLookup" });

  changeStream.on("change", async (change) => {
    try {
      if (change.operationType === "delete") {
        await index.deleteDocument(change.documentKey._id.toString());
      } else {
        const doc = change.fullDocument;
        const transformed = {
          id: doc._id.toString(),
          name: doc.name,
          email: doc.email,
          created_at: doc.created_at?.toISOString(),
          updated_at: new Date().toISOString(),
        };
        await index.addDocuments([transformed]);
      }
      console.log(`Synced ${change.operationType}: ${change.documentKey._id}`);
    } catch (err) {
      console.error(`Failed to sync document ${change.documentKey._id}:`, err.message);
    }
  });
}

watchCollection();

Change streams require a replica set. Use mongod --replSet rs0 in development or connect to a replica set in production.

Scheduled Re-Indexing

When real-time sync is over-engineering, run a full re-import on a cron schedule:

0 3 * * * /usr/local/bin/mongo-to-meili.sh

The script below compares counts before and after, and alerts on mismatch:

#!/bin/bash
set -e

MONGO_DB="my_db"
MONGO_COLLECTION="users"
MEILI_HOST="http://localhost:7700"
MEILI_KEY="your-secure-key"

mongoexport --db="$MONGO_DB" --collection="$MONGO_COLLECTION" \
  --out=/tmp/export.json --jsonArray

python3 transform.py /tmp/export.json /tmp/transformed.json

curl -X POST "$MEILI_HOST/indexes/users/documents" \
  -H "Content-Type: application/json" \
  -H "Authorization: Bearer $MEILI_KEY" \
  --data-binary @/tmp/transformed.json

mongo_count=$(mongo --quiet --eval "db.$MONGO_COLLECTION.countDocuments({active: true})" "$MONGO_DB")
meili_count=$(curl -s -H "Authorization: Bearer $MEILI_KEY" \
  "$MEILI_HOST/indexes/users/statistics" | jq '.numberOfDocuments')

if [ "$mongo_count" -ne "$meili_count" ]; then
  echo "WARNING: Count mismatch. MongoDB: $mongo_count, Meilisearch: $meili_count"
  exit 1
fi

echo "Sync complete. MongoDB: $mongo_count, Meilisearch: $meili_count"

CDC with Debezium

For high-scale production pipelines, Debezium captures MongoDB oplog events and publishes them to Kafka. A Kafka Connect Meilisearch sink then writes into the search index. This decouples source and sink, adds buffering, and supports exactly-once semantics.

Automation Scripts

Python: Full Export, Transform, and Import

#!/usr/bin/env python3
"""Export MongoDB collection, transform documents, import into Meilisearch."""

import json
import sys
import time
from pathlib import Path

import requests
from pymongo import MongoClient


MONGO_URI = "mongodb://localhost:27017"
MONGO_DB = "my_db"
MONGO_COLLECTION = "users"
MEILI_HOST = "http://localhost:7700"
MEILI_KEY = "your-secure-key"
MEILI_INDEX = "users"
BATCH_SIZE = 10000


def fetch_documents():
    client = MongoClient(MONGO_URI)
    db = client[MONGO_DB]
    return list(db[MONGO_COLLECTION].find({"active": True}))


def transform(doc):
    return {
        "id": str(doc["_id"]),
        "name": doc.get("name", ""),
        "email": doc.get("email", ""),
        "created_at": doc.get("created_at").isoformat() if doc.get("created_at") else None,
    }


def upload_batch(docs, retries=3):
    url = f"{MEILI_HOST}/indexes/{MEILI_INDEX}/documents"
    headers = {
        "Content-Type": "application/json",
        "Authorization": f"Bearer {MEILI_KEY}",
    }
    for attempt in range(retries):
        try:
            resp = requests.post(url, json=docs, headers=headers, timeout=120)
            resp.raise_for_status()
            task = resp.json()
            print(f"Uploaded {len(docs)} docs, task UID: {task['taskUid']}")
            return
        except requests.exceptions.RequestException as e:
            print(f"Attempt {attempt + 1} failed: {e}", file=sys.stderr)
            if attempt < retries - 1:
                time.sleep(2 ** attempt)
            else:
                raise


def main():
    docs = fetch_documents()
    print(f"Fetched {len(docs)} documents from MongoDB")
    transformed = [transform(d) for d in docs]

    for i in range(0, len(transformed), BATCH_SIZE):
        batch = transformed[i:i + BATCH_SIZE]
        upload_batch(batch)

    print("Import complete")


if __name__ == "__main__":
    main()

Go: Concurrent Batch Processor

For maximum throughput, use Go with goroutines to process and upload batches concurrently:

package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/meilisearch/meilisearch-go"
	"go.mongodb.org/mongo-driver/bson"
	"go.mongodb.org/mongo-driver/mongo"
	"go.mongodb.org/mongo-driver/mongo/options"
)

type User struct {
	ID   string `json:"id"`
	Name string `json:"name"`
}

func main() {
	ctx := context.Background()

	mongoClient, _ := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017"))
	meiliClient := meilisearch.NewClient(meilisearch.ClientConfig{
		Host:   "http://localhost:7700",
		APIKey: "your-secure-key",
	})

	coll := mongoClient.Database("my_db").Collection("users")
	cursor, _ := coll.Find(ctx, bson.M{"active": true})
	defer cursor.Close(ctx)

	index := meiliClient.Index("users")
	var batch []User

	for cursor.Next(ctx) {
		var raw bson.M
		cursor.Decode(&raw)
		batch = append(batch, User{
			ID:   fmt.Sprintf("%v", raw["_id"]),
			Name: fmt.Sprintf("%v", raw["name"]),
		})
		if len(batch) >= 10000 {
			task, err := index.AddDocuments(batch)
			if err != nil {
				log.Fatal(err)
			}
			fmt.Printf("Sent batch, task: %s\n", task.TaskUID)
			batch = batch[:0]
			time.Sleep(100 * time.Millisecond)
		}
	}
	if len(batch) > 0 {
		index.AddDocuments(batch)
	}
}

Docker Compose Setup

Run MongoDB and Meilisearch locally for development or CI:

version: "3.8"
services:
  mongodb:
    image: mongo:7
    ports:
      - "27017:27017"
    volumes:
      - mongo_data:/data/db

  meilisearch:
    image: getmeili/meilisearch:v1.12
    ports:
      - "7700:7700"
    environment:
      - MEILI_MASTER_KEY=your-secure-key
      - MEILI_HTTP_PAYLOAD_SIZE_LIMIT=100Gb
    volumes:
      - meili_data:/meili_data

volumes:
  mongo_data:
  meili_data:

Start with docker compose up -d and run your import scripts against localhost:27017 and localhost:7700.

Error Handling and Retry Logic

Network failures, timeouts, and Meilisearch rate limits interrupt imports. Handle them with exponential backoff:

import time
from functools import wraps


def retry(max_attempts=5, base_delay=1):
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            last_error = None
            for attempt in range(max_attempts):
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    last_error = e
                    delay = base_delay * (2 ** attempt)
                    print(f"Retry {attempt + 1}/{max_attempts} after {delay}s: {e}")
                    time.sleep(delay)
            raise last_error
        return wrapper
    return decorator


@retry(max_attempts=3, base_delay=2)
def upload_to_meilisearch(docs):
    resp = requests.post(url, json=docs, headers=headers, timeout=60)
    resp.raise_for_status()
    return resp.json()

Common error scenarios and how to handle them:

Error Cause Mitigation
413 Payload Too Large Document batch exceeds limit Reduce batch size
503 Service Unavailable Meilisearch is indexing Retry with backoff
Timeout Network or large payload Increase timeout, reduce batch size
Invalid document Field type not supported Validate schema before upload

Additional Tips

  • Ensure MongoDB is running and accessible on the specified port.
  • Install and run Meilisearch before importing. Download from the Meilisearch releases page.
  • Use a secure master key in production and load it from environment variables, never hardcode it.
  • Verify the index after import by searching a known document.
  • For large datasets, monitor system resources — CPU, memory, and disk I/O — during indexing.
  • If using CSV input to Meilisearch, ensure the first column is the id field.
  • Set MEILI_MAX_INDEXING_MEMORY if the server has limited RAM.
  • Run a dry-run import with a small subset (1000 documents) before committing to a full migration.

Resources

Comments

Share this article

Scan to read on mobile