Introduction
Building production MongoDB applications requires more than basic CRUD. You need to understand how consistent your reads are, how to join related collections without sacrificing performance, how to batch writes for throughput, how to react to live data changes, and how to keep your data clean. This article covers five interconnected topics that every Node.js developer using MongoDB should know: read concerns, write concerns, the $lookup and $graphLookup aggregation stages, the bulkWrite() API, and change streams.
Read Concerns
Read concerns control the consistency and isolation properties of the data that a query returns. They answer the question: how many replica set members must acknowledge a write before I can read that data?
MongoDB offers five read concern levels: local, available, majority, linearizable, and snapshot. Each trades off between consistency guarantees and performance.
Read Concern Local (Default)
"local" returns the most recent data on the node that handles the query, regardless of whether that data has been replicated to other nodes. This is the default for all reads against the primary. It offers the lowest isolation — a subsequent failover could roll back the data you just read.
const { MongoClient } = require("mongodb");
async function readLocal() {
const client = new MongoClient("mongodb://localhost:27017");
await client.connect();
const db = client.db("shop");
const orders = db.collection("orders");
const doc = await orders.findOne(
{ _id: "order-1001" },
{ readConcern: { level: "local" } }
);
console.log("Read with local concern:", doc);
await client.close();
}
Use local for non-critical reads where a millisecond of latency matters more than theoretical rollback — real-time analytics, dashboards, or reporting queries.
Read Concern Majority
"majority" returns data that has been committed to a majority of replica set members. This guarantees the data will not be rolled back. The trade-off is that majority reads may return slightly stale data because they must wait for replication to a majority.
async function readMajority() {
const client = new MongoClient("mongodb://localhost:27017");
await client.connect();
const db = client.db("banking");
const accounts = db.collection("accounts");
const balance = await accounts.findOne(
{ accountId: "ACC-4421" },
{ readConcern: { level: "majority" } }
);
console.log("Read with majority concern:", balance);
await client.close();
}
Use majority for mission-critical reads — account balances, payment statuses, or any data where reading a rolled-back value would cause a business problem.
Read Concern Linearizable
"linearizable" provides the strongest guarantee: the read returns the most recent write acknowledged by a majority of the replica set. Linearizable reads are serialized with other operations, so they see a consistent snapshot of the data as of the moment the read completes.
async function readLinearizable() {
const client = new MongoClient("mongodb://localhost:27017");
await client.connect();
const db = client.db("inventory");
const items = db.collection("items");
const item = await items.findOne(
{ sku: "WIDGET-001" },
{ readConcern: { level: "linearizable" }, maxTimeMS: 5000 }
);
console.log("Linearizable read:", item);
await client.close();
}
Linearizable reads are slow — they require a round-trip to a majority of nodes and must complete within maxTimeMS (otherwise they error). Use them only when external consistency is critical, such as leader-election logic or distributed locking.
Read Concern Snapshot
Available with WiredTiger and replica set transactions, "snapshot" returns data from a point-in-time snapshot of the majority-committed data. It is used with multi-document transactions that require a consistent view across multiple collections.
async function readSnapshot(session) {
const db = session.client.db("orders");
const orders = db.collection("orders");
const audits = db.collection("auditLog");
const order = await orders.findOne(
{ orderId: "ORD-778" },
{ readConcern: { level: "snapshot" }, session }
);
const log = await audits.findOne(
{ refId: "ORD-778" },
{ readConcern: { level: "snapshot" }, session }
);
return { order, log };
}
Read Concern Comparison
| Level | Guarantee | Performance | Use Case |
|---|---|---|---|
local |
Returns current data on queried node, may roll back | Fastest | Dashboards, non-critical reads |
available |
Returns current data on queried node; for sharded clusters may return orphaned documents | Fastest; sharded | Sharded clusters with secondary reads |
majority |
Data has been committed to majority; no rollback | Moderate latency | Account balances, payments |
linearizable |
Most recent majority-committed write; serialized order | Highest latency (needs maxTimeMS) |
Leader election, distributed locks |
snapshot |
Point-in-time majority-committed snapshot | Transaction overhead | Multi-document transactions |
Write Concerns
Write concerns control the acknowledgment a driver receives from the server. They pair naturally with read concerns: a w: "majority" write followed by a readConcern: "majority" read guarantees you always read what you just wrote.
async function writeWithConcern() {
const client = new MongoClient("mongodb://localhost:27017");
await client.connect();
const db = client.db("ecommerce");
const orders = db.collection("orders");
const result = await orders.insertOne(
{ item: "laptop", qty: 1, price: 1299 },
{ writeConcern: { w: "majority", j: true, wtimeout: 5000 } }
);
console.log("Write ack:", result.insertedId);
await client.close();
}
w: 1— acknowledge after primary write (default).w: "majority"— acknowledge after majority of voting members.j: true— acknowledge after journal write.wtimeout— abort if acknowledgment takes too long.
When you set w: "majority" on writes and readConcern: "majority" on reads, you get read-your-writes consistency without needing linearizable isolation.
$lookup: Joining Collections in Aggregation
MongoDB is a document database, but real applications often need data from multiple collections. The $lookup stage performs a left outer join with another collection.
Equi-Join $lookup
The simplest form matches a local field to a foreign field.
const pipeline = [
{
$lookup: {
from: "reviews",
localField: "productId",
foreignField: "productId",
as: "reviews"
}
},
{ $limit: 5 }
];
const results = await db.collection("products").aggregate(pipeline).toArray();
results.forEach(p => {
console.log(`${p.name} has ${p.reviews.length} reviews`);
});
Pipeline $lookup
For more control (filtering, sorting, limiting the joined documents), use the pipeline syntax.
const pipeline = [
{
$lookup: {
from: "reviews",
let: { productId: "$productId" },
pipeline: [
{ $match: { $expr: { $eq: ["$productId", "$$productId"] } } },
{ $sort: { createdAt: -1 } },
{ $limit: 3 },
{ $project: { rating: 1, text: 1, createdAt: 1 } }
],
as: "topReviews"
}
}
];
const products = await db.collection("products").aggregate(pipeline).toArray();
Correlated Subquery with $lookup
Use $expr and $match inside the pipeline for correlated subqueries — for example, joining only documents where a condition holds true.
const pipeline = [
{
$lookup: {
from: "inventory",
let: { prodId: "$_id", minStock: 5 },
pipeline: [
{
$match: {
$expr: {
$and: [
{ $eq: ["$productId", "$$prodId"] },
{ $lte: ["$stock", "$$minStock"] }
]
}
}
}
],
as: "lowStock"
}
},
{ $match: { "lowStock.0": { $exists: true } } }
];
const lowStockProducts = await db.collection("products")
.aggregate(pipeline)
.toArray();
$graphLookup: Recursive Traversals
When your data forms a graph — employee hierarchies, social networks, dependency trees — $graphLookup performs a recursive search.
// Collection: employees
// { _id: 1, name: "Alice", reportsTo: null }
// { _id: 2, name: "Bob", reportsTo: 1 }
// { _id: 3, name: "Carol", reportsTo: 2 }
// { _id: 4, name: "Dave", reportsTo: 2 }
const pipeline = [
{ $match: { name: "Alice" } },
{
$graphLookup: {
from: "employees",
startWith: "$_id",
connectFromField: "_id",
connectToField: "reportsTo",
as: "subordinates",
maxDepth: 10,
depthField: "level"
}
}
];
const result = await db.collection("employees")
.aggregate(pipeline)
.next();
console.log(`${result.name} manages ${result.subordinates.length} people`);
result.subordinates.forEach(s => {
console.log(` Level ${s.level}: ${s.name}`);
});
$graphLookup supports restrictSearchWithMatch to filter nodes during traversal and maxDepth to cap recursion depth. Without maxDepth, the stage could traverse indefinitely in a cyclic graph.
Bulk Operations
The bulkWrite() method sends multiple write operations in a single network round trip, dramatically improving throughput. It supports insertOne, updateOne, updateMany, deleteOne, deleteMany, and replaceOne.
Ordered Bulk Write
By default, bulkWrite() runs operations ordered (ordered: true). If any operation fails, MongoDB stops processing the remaining operations. The error reports which operation index failed.
async function orderedBulk() {
const db = (await MongoClient.connect("mongodb://localhost:27017"))
.db("shop");
const products = db.collection("products");
try {
const result = await products.bulkWrite([
{ insertOne: { document: { name: "Widget", price: 9.99 } } },
{ updateOne: {
filter: { name: "Gadget" },
update: { $inc: { stock: 5 } }
}},
{ deleteOne: { filter: { name: "Discontinued" } } },
{ insertOne: { document: { name: "Doohickey", price: 14.99 } } }
]);
console.log(`${result.insertedCount} inserted, ${result.modifiedCount} updated`);
} catch (err) {
console.error("Bulk write failed at op index", err.result?.getWriteError()?.index);
}
await client.close();
}
Unordered Bulk Write
With ordered: false, all operations are attempted regardless of individual failures. Use this when operations are independent and you want maximum throughput.
async function unorderedBulk() {
const db = (await MongoClient.connect("mongodb://localhost:27017"))
.db("logs");
const entries = db.collection("entries");
const result = await entries.bulkWrite(
logEntries.map(entry => ({
insertOne: { document: entry }
})),
{ ordered: false }
);
console.log(
`${result.insertedCount} inserted, ${result.getWriteErrorCount()} errors`
);
}
BulkWrite Error Handling
Always inspect the result object in unordered mode. Individual errors do not throw; they are collected in the result.
const result = await collection.bulkWrite(ops, { ordered: false });
if (result.hasWriteErrors()) {
for (const err of result.getWriteErrors()) {
console.error(`Op index ${err.index}: ${err.errmsg}`);
}
}
Data Cleanup Patterns
Real-world collections accumulate stale data. Combine bulk operations with aggregation to clean efficiently.
Archive and Delete Pattern
async function archiveOldOrders(daysOld) {
const db = (await MongoClient.connect("mongodb://localhost:27017"))
.db("ecommerce");
const orders = db.collection("orders");
const archive = db.collection("orders_archive");
const cutoff = new Date(Date.now() - daysOld * 86400000);
// 1. Find documents to archive
const staleOrders = await orders
.find({ createdAt: { $lt: cutoff } })
.toArray();
if (staleOrders.length === 0) return;
// 2. Bulk insert into archive
await archive.bulkWrite(
staleOrders.map(doc => ({ insertOne: { document: doc } })),
{ ordered: false }
);
// 3. Bulk delete from source
const deleteResult = await orders.deleteMany({ createdAt: { $lt: cutoff } });
console.log(`Archived ${staleOrders.length}, deleted ${deleteResult.deletedCount}`);
}
Schema Migration with Bulk Write
async function migrateProductSchema() {
const db = (await MongoClient.connect("mongodb://localhost:27017"))
.db("shop");
const products = db.collection("products");
const cursor = products.find({ price: { $type: "string" } });
const ops = [];
for await (const doc of cursor) {
ops.push({
updateOne: {
filter: { _id: doc._id },
update: {
$set: { price: parseFloat(doc.price) },
$unset: { priceStr: "" }
}
}
});
// Flush in batches of 500
if (ops.length >= 500) {
await products.bulkWrite(ops);
ops.length = 0;
}
}
if (ops.length > 0) await products.bulkWrite(ops);
console.log("Schema migration complete");
}
Change Streams for Real-Time Applications
Change streams allow applications to watch collection, database, or cluster-level changes in real time. They are built on the aggregation framework and read from the oplog.
async function watchOrders() {
const client = new MongoClient("mongodb://localhost:27017");
await client.connect();
const db = client.db("ecommerce");
const orders = db.collection("orders");
const changeStream = orders.watch(
[{ $match: { "fullDocument.status": "pending" } }],
{ fullDocument: "updateLookup" }
);
changeStream.on("change", (change) => {
console.log("Order changed:", change.operationType, change.documentKey);
if (change.operationType === "insert") {
sendConfirmationEmail(change.fullDocument);
}
if (change.operationType === "update" &&
change.fullDocument.status === "shipped") {
triggerFulfillment(change.fullDocument);
}
});
// Keep the process alive
await new Promise(() => {});
}
Resumable Change Streams
Change streams automatically resume from the last seen token if they disconnect. You can store the resume token in a collection for durability across application restarts.
async function resumableWatch() {
const client = new MongoClient("mongodb://localhost:27017");
await client.connect();
const db = client.db("ecommerce");
const tokens = db.collection("resumeTokens");
const orders = db.collection("orders");
// Load the last resume token
const last = await tokens.findOne({ _id: "ordersStream" });
const resumeAfter = last?.resumeToken;
const changeStream = orders.watch(
[{ $match: { operationType: { $in: ["insert", "update"] } } }],
resumeAfter ? { resumeAfter } : {}
);
for await (const change of changeStream) {
console.log(change);
// Persist resume token
await tokens.updateOne(
{ _id: "ordersStream" },
{ $set: { resumeToken: change._id } },
{ upsert: true }
);
}
}
Best Practices for Production MongoDB & JavaScript Applications
Always set read and write concerns explicitly. Relying on defaults works for development, but production applications need consistency guarantees documented in code. Set readConcern and writeConcern at the client, database, or operation level depending on your needs.
Prefer pipeline $lookup over equi-join. The pipeline syntax lets you filter, sort, and limit joined documents, reducing network overhead in sharded clusters. It also supports correlated subqueries with $expr.
Use bulkWrite() over individual operations. Batching 500–1000 operations per call reduces round trips by orders of magnitude. Use ordered: false when operations are independent to avoid a single failure blocking the batch.
Index the fields used in $lookup and $graphLookup. Without indexes, these stages read entire collections. For $graphLookup, index both connectFromField and connectToField.
Store change stream resume tokens. If your application restarts, resuming from the last stored token prevents missing events. Without a stored token, a restart creates a brand-new stream starting from the current oplog time.
Match read concern to operation criticality. Use majority for reads that affect business decisions; use local for dashboards and logging. Linearizable reads should be reserved for operations that require strict serializability and are tolerant of higher latency.
Handle bulk write errors gracefully. In unordered mode, always check result.hasWriteErrors() and log or retry failed operations. Never assume a bulk write fully succeeded without inspecting the result.
Test replica set failover scenarios. Read concerns only matter in a replica set. Set up a three-node replica set locally with mongod instances and simulate a primary failure to verify your application handles rollback correctly.
Resources
- MongoDB Read Concern Specification
- MongoDB Write Concern Specification
- MongoDB $lookup Stage
- MongoDB $graphLookup Stage
- MongoDB bulkWrite() Driver Method
- MongoDB Change Streams
- MongoDB University: M220JS
Comments