Background
MongoDB stores data as JSON-like documents in a schema-less structure. Each document has a unique _id field, typically an ObjectId.
Meilisearch is an open-source search engine written in Rust. It provides typo-tolerant full-text search with instant responses. Meilisearch accepts JSON and CSV input and requires a unique string id field for every document.
This guide walks through the full pipeline: exporting data from MongoDB, transforming it into a Meilisearch-compatible format, and importing it at scale.
Architecture Overview
The import pipeline moves data through four stages:
┌──────────┐ ┌───────────────┐ ┌────────────────┐ ┌─────────────┐
│ MongoDB │───▶│ Export Method │───▶│ Transform │───▶│ Meilisearch │
│ Database │ │ (mongoexport, │ │ (normalize │ │ Index │
│ │ │ aggregation, │ │ types, flatten│ │ │
│ │ │ change stream)│ │ map fields) │ │ │
└──────────┘ └───────────────┘ └────────────────┘ └─────────────┘
│
▼
┌──────────────────┐
│ Staging (JSON │
│ files, memory, │
│ or message queue)│
└──────────────────┘
MongoDB is the source of truth. You export documents, transform fields (ObjectId to string, ISODate to string, flatten nested objects), and push the result to Meilisearch. For one-time bulk loads you export to a file. For ongoing syncs you use change streams.
MongoDB Data Export Methods
Three approaches cover most use cases. Each has different trade-offs in speed, complexity, and data fidelity.
mongoexport (Simple, File-Based)
mongoexport is a command-line tool included with MongoDB. It writes collections to JSON or CSV files.
mongoexport \
--uri="mongodb://localhost:27017" \
--db=my_db \
--collection=users \
--out=users.json \
--jsonArray
The --jsonArray flag wraps documents in an array, which Meilisearch requires. Without it every document appears on its own line (JSONL format), which Meilisearch does not accept directly.
mongodump / mongorestore (Binary, Fastest)
mongodump creates a binary BSON dump. It is faster than mongoexport for large collections but requires an extra conversion step to get plain JSON.
mongodump \
--uri="mongodb://localhost:27017" \
--db=my_db \
--collection=users \
--out=./dump
To convert BSON to JSON you use bsondump:
bsondump ./dump/my_db/users.bson > users.json
This produces JSONL format (one document per line). You still need to wrap it in an array or use Meilisearch’s JSONL support if your version allows it.
Aggregation Pipeline with $out (Most Flexible)
Running an aggregation pipeline inside MongoDB gives you full control over the output. You can rename fields, cast types, filter, and flatten inside the database before the data ever leaves.
db.users.aggregate([
{ $match: { active: true } },
{ $addFields: {
id: { $toString: "$_id" },
registered_at: { $toString: "$created_at" }
}},
{ $project: { _id: 0, created_at: 0 } },
{ $out: "users_for_export" }
])
This writes the transformed documents to a new collection users_for_export. You then export that pre-processed collection with mongoexport.
Method Comparison
| Method | Speed | Complexity | Best For |
|---|---|---|---|
| mongoexport | Fast | Low | One-off exports, small to medium datasets |
| mongodump + bsondump | Fastest | Medium | Very large datasets, raw performance needed |
| Aggregation + $out | Moderate | High | Complex transformations before export |
| Change Streams | Real-time | High | Ongoing sync, incremental updates |
Preparing the View for Export
Meilisearch requires a string id field containing only [0-9a-zA-Z_]. MongoDB’s _id is an ObjectId that serializes with $oid syntax, which includes $ characters that Meilisearch rejects.
Create a MongoDB view that converts _id to a string id and removes the original field:
var pipeline = [
{ $addFields: { id: { "$toString": "$_id" } } },
{ $project: { _id: 0 } }
];
db.createView("users_export_view", "users", pipeline);
Now export the view directly:
mongoexport \
--uri="mongodb://localhost:27017" \
--db=my_db \
--collection=users_export_view \
--out=users_export.json \
--jsonArray
Handling Nested Documents
MongoDB stores sub-documents and arrays freely. Meilisearch accepts nested JSON, but flat structures perform better.
Flatten nested objects in the view pipeline:
var pipeline = [
{ $addFields: {
id: { $toString: "$_id" },
"address_city": "$address.city",
"address_zip": "$address.zip"
}},
{ $project: { _id: 0, address: 0 } }
];
Handling Arrays
Arrays can appear as-is if Meilisearch should index each element as a searchable string. If you need to join array elements into a single string:
var pipeline = [
{ $addFields: {
id: { $toString: "$_id" },
tags: { $reduce: {
input: "$tags",
initialValue: "",
in: { $cond: {
if: { $eq: ["$$value", ""] },
then: "$$this",
else: { $concat: ["$$value", ", ", "$$this"] }
}}
}}
}},
{ $project: { _id: 0 } }
];
Handling Dates
Meilisearch accepts ISO 8601 date strings. Convert ISODate fields with $toString:
var pipeline = [
{ $addFields: {
id: { $toString: "$_id" },
created_at: { $toString: "$created_at" },
updated_at: { $toString: "$updated_at" }
}},
{ $project: { _id: 0 } }
];
Data Transformation Pipeline
When the view approach is insufficient — for example when you need to combine data from multiple collections, or apply logic that MongoDB aggregation does not handle well — you write a transformation script.
ObjectId to String
Meilisearch rejects {"$oid": "..."}. Every _id must become a plain string:
if (doc._id && doc._id.$oid) {
doc.id = doc._id.$oid;
delete doc._id;
}
ISODate to String
Dates exported via mongoexport appear as {"$date": "2024-01-15T10:30:00Z"}. Convert them:
if (doc.created_at && doc.created_at.$date) {
doc.created_at = doc.created_at.$date;
}
Flatten Embedded Documents
Meilisearch handles nested objects but flat structures are simpler to search and sort:
def flatten(doc, parent_key="", sep="_"):
items = []
for k, v in doc.items():
new_key = f"{parent_key}{sep}{k}" if parent_key else k
if isinstance(v, dict):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
Remove Unsupported Types
MongoDB documents may contain ObjectId, DBRef, BinData, or Timestamp types that do not survive JSON export cleanly. Strip or convert them before sending to Meilisearch.
Importing Data into Meilisearch
Adjust Payload Size Limit
Meilisearch has a default payload limit of 100 MB. Large exports exceed this. Raise it at startup:
meilisearch \
--http-addr 0.0.0.0:7700 \
--master-key="your-secure-key" \
--http-payload-size-limit=100Gb
Upload Documents
curl \
-X POST 'http://localhost:7700/indexes/users/documents' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer your-secure-key' \
--data-binary @users_export.json
Check Task Status
Meilisearch processes document additions asynchronously. Poll the returned task UID:
curl \
-H 'Authorization: Bearer your-secure-key' \
-X GET 'http://localhost:7700/tasks/42'
Example success response:
{
"uid": 42,
"indexUid": "users",
"status": "succeeded",
"type": "documentAdditionOrUpdate",
"details": {
"receivedDocuments": 500000,
"indexedDocuments": 500000
}
}
Validating After Import
Compare document counts between MongoDB and Meilisearch to confirm nothing was lost:
# MongoDB document count
mongo --quiet --eval 'db.users.countDocuments({active: true})' my_db
# Meilisearch document count
curl -s \
-H 'Authorization: Bearer your-secure-key' \
-X GET 'http://localhost:7700/indexes/users/statistics' | jq '.numberOfDocuments'
If the counts do not match, inspect the Meilisearch task log for errors and re-run the failed batches.
Large Dataset Handling
Chunked Exports
For collections with millions of documents, export in chunks using --skip and --limit:
mongoexport --db=my_db --collection=users \
--skip=0 --limit=100000 --out=users_0.json --jsonArray
mongoexport --db=my_db --collection=users \
--skip=100000 --limit=100000 --out=users_1.json --jsonArray
Batch Uploads to Meilisearch
Split the final JSON into 50 MB chunks and upload each batch sequentially:
split -l 50000 users_export.json chunk_
for f in chunk_*; do
echo "Uploading $f..."
curl -X POST 'http://localhost:7700/indexes/users/documents' \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer your-secure-key' \
--data-binary "@$f"
done
Streaming Imports with Node.js
Avoid writing intermediate files entirely by streaming from MongoDB directly into Meilisearch:
const { MongoClient } = require("mongodb");
const MeiliSearch = require("meilisearch");
const mongo = new MongoClient("mongodb://localhost:27017");
const meili = new MeiliSearch({ host: "http://localhost:7700", apiKey: "your-secure-key" });
async function streamImport() {
await mongo.connect();
const db = mongo.db("my_db");
const cursor = db.collection("users").find({ active: true });
const index = meili.index("users");
let batch = [];
for await (const doc of cursor) {
const transformed = {
id: doc._id.toString(),
name: doc.name,
email: doc.email,
created_at: doc.created_at?.toISOString(),
};
batch.push(transformed);
if (batch.length >= 10000) {
await index.addDocuments(batch);
batch = [];
}
}
if (batch.length > 0) {
await index.addDocuments(batch);
}
await mongo.close();
}
streamImport().catch(console.error);
This script streams documents from a MongoDB cursor, transforms each one, and sends 10,000-document batches to Meilisearch. No intermediate file needed.
Incremental Sync Strategies
A one-time import gets stale. Choose an incremental sync approach that matches your tolerance for latency and complexity.
Change Streams from MongoDB to Meilisearch
MongoDB change streams emit every insert, update, and delete in real time. Watch the oplog and reflect changes into Meilisearch:
const { MongoClient } = require("mongodb");
const MeiliSearch = require("meilisearch");
const mongo = new MongoClient("mongodb://localhost:27017");
const meili = new MeiliSearch({ host: "http://localhost:7700", apiKey: "your-secure-key" });
async function watchCollection() {
await mongo.connect();
const collection = mongo.db("my_db").collection("users");
const index = meili.index("users");
const pipeline = [{ $match: { operationType: { $in: ["insert", "update", "delete"] } } }];
const changeStream = collection.watch(pipeline, { fullDocument: "updateLookup" });
changeStream.on("change", async (change) => {
try {
if (change.operationType === "delete") {
await index.deleteDocument(change.documentKey._id.toString());
} else {
const doc = change.fullDocument;
const transformed = {
id: doc._id.toString(),
name: doc.name,
email: doc.email,
created_at: doc.created_at?.toISOString(),
updated_at: new Date().toISOString(),
};
await index.addDocuments([transformed]);
}
console.log(`Synced ${change.operationType}: ${change.documentKey._id}`);
} catch (err) {
console.error(`Failed to sync document ${change.documentKey._id}:`, err.message);
}
});
}
watchCollection();
Change streams require a replica set. Use mongod --replSet rs0 in development or connect to a replica set in production.
Scheduled Re-Indexing
When real-time sync is over-engineering, run a full re-import on a cron schedule:
0 3 * * * /usr/local/bin/mongo-to-meili.sh
The script below compares counts before and after, and alerts on mismatch:
#!/bin/bash
set -e
MONGO_DB="my_db"
MONGO_COLLECTION="users"
MEILI_HOST="http://localhost:7700"
MEILI_KEY="your-secure-key"
mongoexport --db="$MONGO_DB" --collection="$MONGO_COLLECTION" \
--out=/tmp/export.json --jsonArray
python3 transform.py /tmp/export.json /tmp/transformed.json
curl -X POST "$MEILI_HOST/indexes/users/documents" \
-H "Content-Type: application/json" \
-H "Authorization: Bearer $MEILI_KEY" \
--data-binary @/tmp/transformed.json
mongo_count=$(mongo --quiet --eval "db.$MONGO_COLLECTION.countDocuments({active: true})" "$MONGO_DB")
meili_count=$(curl -s -H "Authorization: Bearer $MEILI_KEY" \
"$MEILI_HOST/indexes/users/statistics" | jq '.numberOfDocuments')
if [ "$mongo_count" -ne "$meili_count" ]; then
echo "WARNING: Count mismatch. MongoDB: $mongo_count, Meilisearch: $meili_count"
exit 1
fi
echo "Sync complete. MongoDB: $mongo_count, Meilisearch: $meili_count"
CDC with Debezium
For high-scale production pipelines, Debezium captures MongoDB oplog events and publishes them to Kafka. A Kafka Connect Meilisearch sink then writes into the search index. This decouples source and sink, adds buffering, and supports exactly-once semantics.
Automation Scripts
Python: Full Export, Transform, and Import
#!/usr/bin/env python3
"""Export MongoDB collection, transform documents, import into Meilisearch."""
import json
import sys
import time
from pathlib import Path
import requests
from pymongo import MongoClient
MONGO_URI = "mongodb://localhost:27017"
MONGO_DB = "my_db"
MONGO_COLLECTION = "users"
MEILI_HOST = "http://localhost:7700"
MEILI_KEY = "your-secure-key"
MEILI_INDEX = "users"
BATCH_SIZE = 10000
def fetch_documents():
client = MongoClient(MONGO_URI)
db = client[MONGO_DB]
return list(db[MONGO_COLLECTION].find({"active": True}))
def transform(doc):
return {
"id": str(doc["_id"]),
"name": doc.get("name", ""),
"email": doc.get("email", ""),
"created_at": doc.get("created_at").isoformat() if doc.get("created_at") else None,
}
def upload_batch(docs, retries=3):
url = f"{MEILI_HOST}/indexes/{MEILI_INDEX}/documents"
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {MEILI_KEY}",
}
for attempt in range(retries):
try:
resp = requests.post(url, json=docs, headers=headers, timeout=120)
resp.raise_for_status()
task = resp.json()
print(f"Uploaded {len(docs)} docs, task UID: {task['taskUid']}")
return
except requests.exceptions.RequestException as e:
print(f"Attempt {attempt + 1} failed: {e}", file=sys.stderr)
if attempt < retries - 1:
time.sleep(2 ** attempt)
else:
raise
def main():
docs = fetch_documents()
print(f"Fetched {len(docs)} documents from MongoDB")
transformed = [transform(d) for d in docs]
for i in range(0, len(transformed), BATCH_SIZE):
batch = transformed[i:i + BATCH_SIZE]
upload_batch(batch)
print("Import complete")
if __name__ == "__main__":
main()
Go: Concurrent Batch Processor
For maximum throughput, use Go with goroutines to process and upload batches concurrently:
package main
import (
"context"
"fmt"
"log"
"time"
"github.com/meilisearch/meilisearch-go"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
type User struct {
ID string `json:"id"`
Name string `json:"name"`
}
func main() {
ctx := context.Background()
mongoClient, _ := mongo.Connect(ctx, options.Client().ApplyURI("mongodb://localhost:27017"))
meiliClient := meilisearch.NewClient(meilisearch.ClientConfig{
Host: "http://localhost:7700",
APIKey: "your-secure-key",
})
coll := mongoClient.Database("my_db").Collection("users")
cursor, _ := coll.Find(ctx, bson.M{"active": true})
defer cursor.Close(ctx)
index := meiliClient.Index("users")
var batch []User
for cursor.Next(ctx) {
var raw bson.M
cursor.Decode(&raw)
batch = append(batch, User{
ID: fmt.Sprintf("%v", raw["_id"]),
Name: fmt.Sprintf("%v", raw["name"]),
})
if len(batch) >= 10000 {
task, err := index.AddDocuments(batch)
if err != nil {
log.Fatal(err)
}
fmt.Printf("Sent batch, task: %s\n", task.TaskUID)
batch = batch[:0]
time.Sleep(100 * time.Millisecond)
}
}
if len(batch) > 0 {
index.AddDocuments(batch)
}
}
Docker Compose Setup
Run MongoDB and Meilisearch locally for development or CI:
version: "3.8"
services:
mongodb:
image: mongo:7
ports:
- "27017:27017"
volumes:
- mongo_data:/data/db
meilisearch:
image: getmeili/meilisearch:v1.12
ports:
- "7700:7700"
environment:
- MEILI_MASTER_KEY=your-secure-key
- MEILI_HTTP_PAYLOAD_SIZE_LIMIT=100Gb
volumes:
- meili_data:/meili_data
volumes:
mongo_data:
meili_data:
Start with docker compose up -d and run your import scripts against localhost:27017 and localhost:7700.
Error Handling and Retry Logic
Network failures, timeouts, and Meilisearch rate limits interrupt imports. Handle them with exponential backoff:
import time
from functools import wraps
def retry(max_attempts=5, base_delay=1):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
last_error = None
for attempt in range(max_attempts):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
delay = base_delay * (2 ** attempt)
print(f"Retry {attempt + 1}/{max_attempts} after {delay}s: {e}")
time.sleep(delay)
raise last_error
return wrapper
return decorator
@retry(max_attempts=3, base_delay=2)
def upload_to_meilisearch(docs):
resp = requests.post(url, json=docs, headers=headers, timeout=60)
resp.raise_for_status()
return resp.json()
Common error scenarios and how to handle them:
| Error | Cause | Mitigation |
|---|---|---|
| 413 Payload Too Large | Document batch exceeds limit | Reduce batch size |
| 503 Service Unavailable | Meilisearch is indexing | Retry with backoff |
| Timeout | Network or large payload | Increase timeout, reduce batch size |
| Invalid document | Field type not supported | Validate schema before upload |
Additional Tips
- Ensure MongoDB is running and accessible on the specified port.
- Install and run Meilisearch before importing. Download from the Meilisearch releases page.
- Use a secure master key in production and load it from environment variables, never hardcode it.
- Verify the index after import by searching a known document.
- For large datasets, monitor system resources — CPU, memory, and disk I/O — during indexing.
- If using CSV input to Meilisearch, ensure the first column is the
idfield. - Set
MEILI_MAX_INDEXING_MEMORYif the server has limited RAM. - Run a dry-run import with a small subset (1000 documents) before committing to a full migration.
Resources
- MongoDB Documentation: Views
- MongoDB Documentation: Change Streams
- MongoDB Documentation: Aggregation Pipeline
- Meilisearch Documentation: Adding Documents
- Meilisearch Documentation: Task Management
- Meilisearch GitHub Repository
- Debezium MongoDB Connector
Comments