Skip to main content

MongoDB for JavaScript Developers: Read Concerns, Bulk Operations & $lookup

MongoDB for JavaScript Developers(Chapter 3 of 4)

Created: June 12, 2021 11 min read

Introduction

Building production MongoDB applications requires more than basic CRUD. You need to understand how consistent your reads are, how to join related collections without sacrificing performance, how to batch writes for throughput, how to react to live data changes, and how to keep your data clean. This article covers five interconnected topics that every Node.js developer using MongoDB should know: read concerns, write concerns, the $lookup and $graphLookup aggregation stages, the bulkWrite() API, and change streams.

Read Concerns

Read concerns control the consistency and isolation properties of the data that a query returns. They answer the question: how many replica set members must acknowledge a write before I can read that data?

MongoDB offers five read concern levels: local, available, majority, linearizable, and snapshot. Each trades off between consistency guarantees and performance.

Read Concern Local (Default)

"local" returns the most recent data on the node that handles the query, regardless of whether that data has been replicated to other nodes. This is the default for all reads against the primary. It offers the lowest isolation — a subsequent failover could roll back the data you just read.

const { MongoClient } = require("mongodb");

async function readLocal() {
  const client = new MongoClient("mongodb://localhost:27017");
  await client.connect();
  const db = client.db("shop");
  const orders = db.collection("orders");

  const doc = await orders.findOne(
    { _id: "order-1001" },
    { readConcern: { level: "local" } }
  );
  console.log("Read with local concern:", doc);

  await client.close();
}

Use local for non-critical reads where a millisecond of latency matters more than theoretical rollback — real-time analytics, dashboards, or reporting queries.

Read Concern Majority

"majority" returns data that has been committed to a majority of replica set members. This guarantees the data will not be rolled back. The trade-off is that majority reads may return slightly stale data because they must wait for replication to a majority.

async function readMajority() {
  const client = new MongoClient("mongodb://localhost:27017");
  await client.connect();
  const db = client.db("banking");
  const accounts = db.collection("accounts");

  const balance = await accounts.findOne(
    { accountId: "ACC-4421" },
    { readConcern: { level: "majority" } }
  );
  console.log("Read with majority concern:", balance);

  await client.close();
}

Use majority for mission-critical reads — account balances, payment statuses, or any data where reading a rolled-back value would cause a business problem.

Read Concern Linearizable

"linearizable" provides the strongest guarantee: the read returns the most recent write acknowledged by a majority of the replica set. Linearizable reads are serialized with other operations, so they see a consistent snapshot of the data as of the moment the read completes.

async function readLinearizable() {
  const client = new MongoClient("mongodb://localhost:27017");
  await client.connect();
  const db = client.db("inventory");
  const items = db.collection("items");

  const item = await items.findOne(
    { sku: "WIDGET-001" },
    { readConcern: { level: "linearizable" }, maxTimeMS: 5000 }
  );
  console.log("Linearizable read:", item);

  await client.close();
}

Linearizable reads are slow — they require a round-trip to a majority of nodes and must complete within maxTimeMS (otherwise they error). Use them only when external consistency is critical, such as leader-election logic or distributed locking.

Read Concern Snapshot

Available with WiredTiger and replica set transactions, "snapshot" returns data from a point-in-time snapshot of the majority-committed data. It is used with multi-document transactions that require a consistent view across multiple collections.

async function readSnapshot(session) {
  const db = session.client.db("orders");
  const orders = db.collection("orders");
  const audits = db.collection("auditLog");

  const order = await orders.findOne(
    { orderId: "ORD-778" },
    { readConcern: { level: "snapshot" }, session }
  );
  const log = await audits.findOne(
    { refId: "ORD-778" },
    { readConcern: { level: "snapshot" }, session }
  );
  return { order, log };
}

Read Concern Comparison

Level Guarantee Performance Use Case
local Returns current data on queried node, may roll back Fastest Dashboards, non-critical reads
available Returns current data on queried node; for sharded clusters may return orphaned documents Fastest; sharded Sharded clusters with secondary reads
majority Data has been committed to majority; no rollback Moderate latency Account balances, payments
linearizable Most recent majority-committed write; serialized order Highest latency (needs maxTimeMS) Leader election, distributed locks
snapshot Point-in-time majority-committed snapshot Transaction overhead Multi-document transactions

Write Concerns

Write concerns control the acknowledgment a driver receives from the server. They pair naturally with read concerns: a w: "majority" write followed by a readConcern: "majority" read guarantees you always read what you just wrote.

async function writeWithConcern() {
  const client = new MongoClient("mongodb://localhost:27017");
  await client.connect();
  const db = client.db("ecommerce");
  const orders = db.collection("orders");

  const result = await orders.insertOne(
    { item: "laptop", qty: 1, price: 1299 },
    { writeConcern: { w: "majority", j: true, wtimeout: 5000 } }
  );
  console.log("Write ack:", result.insertedId);

  await client.close();
}
  • w: 1 — acknowledge after primary write (default).
  • w: "majority" — acknowledge after majority of voting members.
  • j: true — acknowledge after journal write.
  • wtimeout — abort if acknowledgment takes too long.

When you set w: "majority" on writes and readConcern: "majority" on reads, you get read-your-writes consistency without needing linearizable isolation.

$lookup: Joining Collections in Aggregation

MongoDB is a document database, but real applications often need data from multiple collections. The $lookup stage performs a left outer join with another collection.

Equi-Join $lookup

The simplest form matches a local field to a foreign field.

const pipeline = [
  {
    $lookup: {
      from: "reviews",
      localField: "productId",
      foreignField: "productId",
      as: "reviews"
    }
  },
  { $limit: 5 }
];

const results = await db.collection("products").aggregate(pipeline).toArray();
results.forEach(p => {
  console.log(`${p.name} has ${p.reviews.length} reviews`);
});

Pipeline $lookup

For more control (filtering, sorting, limiting the joined documents), use the pipeline syntax.

const pipeline = [
  {
    $lookup: {
      from: "reviews",
      let: { productId: "$productId" },
      pipeline: [
        { $match: { $expr: { $eq: ["$productId", "$$productId"] } } },
        { $sort: { createdAt: -1 } },
        { $limit: 3 },
        { $project: { rating: 1, text: 1, createdAt: 1 } }
      ],
      as: "topReviews"
    }
  }
];

const products = await db.collection("products").aggregate(pipeline).toArray();

Correlated Subquery with $lookup

Use $expr and $match inside the pipeline for correlated subqueries — for example, joining only documents where a condition holds true.

const pipeline = [
  {
    $lookup: {
      from: "inventory",
      let: { prodId: "$_id", minStock: 5 },
      pipeline: [
        {
          $match: {
            $expr: {
              $and: [
                { $eq: ["$productId", "$$prodId"] },
                { $lte: ["$stock", "$$minStock"] }
              ]
            }
          }
        }
      ],
      as: "lowStock"
    }
  },
  { $match: { "lowStock.0": { $exists: true } } }
];

const lowStockProducts = await db.collection("products")
  .aggregate(pipeline)
  .toArray();

$graphLookup: Recursive Traversals

When your data forms a graph — employee hierarchies, social networks, dependency trees — $graphLookup performs a recursive search.

// Collection: employees
// { _id: 1, name: "Alice", reportsTo: null }
// { _id: 2, name: "Bob",   reportsTo: 1 }
// { _id: 3, name: "Carol", reportsTo: 2 }
// { _id: 4, name: "Dave",  reportsTo: 2 }

const pipeline = [
  { $match: { name: "Alice" } },
  {
    $graphLookup: {
      from: "employees",
      startWith: "$_id",
      connectFromField: "_id",
      connectToField: "reportsTo",
      as: "subordinates",
      maxDepth: 10,
      depthField: "level"
    }
  }
];

const result = await db.collection("employees")
  .aggregate(pipeline)
  .next();

console.log(`${result.name} manages ${result.subordinates.length} people`);
result.subordinates.forEach(s => {
  console.log(`  Level ${s.level}: ${s.name}`);
});

$graphLookup supports restrictSearchWithMatch to filter nodes during traversal and maxDepth to cap recursion depth. Without maxDepth, the stage could traverse indefinitely in a cyclic graph.

Bulk Operations

The bulkWrite() method sends multiple write operations in a single network round trip, dramatically improving throughput. It supports insertOne, updateOne, updateMany, deleteOne, deleteMany, and replaceOne.

Ordered Bulk Write

By default, bulkWrite() runs operations ordered (ordered: true). If any operation fails, MongoDB stops processing the remaining operations. The error reports which operation index failed.

async function orderedBulk() {
  const db = (await MongoClient.connect("mongodb://localhost:27017"))
    .db("shop");
  const products = db.collection("products");

  try {
    const result = await products.bulkWrite([
      { insertOne: { document: { name: "Widget", price: 9.99 } } },
      { updateOne: {
          filter: { name: "Gadget" },
          update: { $inc: { stock: 5 } }
      }},
      { deleteOne: { filter: { name: "Discontinued" } } },
      { insertOne: { document: { name: "Doohickey", price: 14.99 } } }
    ]);
    console.log(`${result.insertedCount} inserted, ${result.modifiedCount} updated`);
  } catch (err) {
    console.error("Bulk write failed at op index", err.result?.getWriteError()?.index);
  }

  await client.close();
}

Unordered Bulk Write

With ordered: false, all operations are attempted regardless of individual failures. Use this when operations are independent and you want maximum throughput.

async function unorderedBulk() {
  const db = (await MongoClient.connect("mongodb://localhost:27017"))
    .db("logs");
  const entries = db.collection("entries");

  const result = await entries.bulkWrite(
    logEntries.map(entry => ({
      insertOne: { document: entry }
    })),
    { ordered: false }
  );

  console.log(
    `${result.insertedCount} inserted, ${result.getWriteErrorCount()} errors`
  );
}

BulkWrite Error Handling

Always inspect the result object in unordered mode. Individual errors do not throw; they are collected in the result.

const result = await collection.bulkWrite(ops, { ordered: false });

if (result.hasWriteErrors()) {
  for (const err of result.getWriteErrors()) {
    console.error(`Op index ${err.index}: ${err.errmsg}`);
  }
}

Data Cleanup Patterns

Real-world collections accumulate stale data. Combine bulk operations with aggregation to clean efficiently.

Archive and Delete Pattern

async function archiveOldOrders(daysOld) {
  const db = (await MongoClient.connect("mongodb://localhost:27017"))
    .db("ecommerce");
  const orders = db.collection("orders");
  const archive = db.collection("orders_archive");
  const cutoff = new Date(Date.now() - daysOld * 86400000);

  // 1. Find documents to archive
  const staleOrders = await orders
    .find({ createdAt: { $lt: cutoff } })
    .toArray();

  if (staleOrders.length === 0) return;

  // 2. Bulk insert into archive
  await archive.bulkWrite(
    staleOrders.map(doc => ({ insertOne: { document: doc } })),
    { ordered: false }
  );

  // 3. Bulk delete from source
  const deleteResult = await orders.deleteMany({ createdAt: { $lt: cutoff } });
  console.log(`Archived ${staleOrders.length}, deleted ${deleteResult.deletedCount}`);
}

Schema Migration with Bulk Write

async function migrateProductSchema() {
  const db = (await MongoClient.connect("mongodb://localhost:27017"))
    .db("shop");
  const products = db.collection("products");

  const cursor = products.find({ price: { $type: "string" } });
  const ops = [];

  for await (const doc of cursor) {
    ops.push({
      updateOne: {
        filter: { _id: doc._id },
        update: {
          $set: { price: parseFloat(doc.price) },
          $unset: { priceStr: "" }
        }
      }
    });

    // Flush in batches of 500
    if (ops.length >= 500) {
      await products.bulkWrite(ops);
      ops.length = 0;
    }
  }

  if (ops.length > 0) await products.bulkWrite(ops);
  console.log("Schema migration complete");
}

Change Streams for Real-Time Applications

Change streams allow applications to watch collection, database, or cluster-level changes in real time. They are built on the aggregation framework and read from the oplog.

async function watchOrders() {
  const client = new MongoClient("mongodb://localhost:27017");
  await client.connect();
  const db = client.db("ecommerce");
  const orders = db.collection("orders");

  const changeStream = orders.watch(
    [{ $match: { "fullDocument.status": "pending" } }],
    { fullDocument: "updateLookup" }
  );

  changeStream.on("change", (change) => {
    console.log("Order changed:", change.operationType, change.documentKey);

    if (change.operationType === "insert") {
      sendConfirmationEmail(change.fullDocument);
    }

    if (change.operationType === "update" &&
        change.fullDocument.status === "shipped") {
      triggerFulfillment(change.fullDocument);
    }
  });

  // Keep the process alive
  await new Promise(() => {});
}

Resumable Change Streams

Change streams automatically resume from the last seen token if they disconnect. You can store the resume token in a collection for durability across application restarts.

async function resumableWatch() {
  const client = new MongoClient("mongodb://localhost:27017");
  await client.connect();
  const db = client.db("ecommerce");
  const tokens = db.collection("resumeTokens");
  const orders = db.collection("orders");

  // Load the last resume token
  const last = await tokens.findOne({ _id: "ordersStream" });
  const resumeAfter = last?.resumeToken;

  const changeStream = orders.watch(
    [{ $match: { operationType: { $in: ["insert", "update"] } } }],
    resumeAfter ? { resumeAfter } : {}
  );

  for await (const change of changeStream) {
    console.log(change);

    // Persist resume token
    await tokens.updateOne(
      { _id: "ordersStream" },
      { $set: { resumeToken: change._id } },
      { upsert: true }
    );
  }
}

Best Practices for Production MongoDB & JavaScript Applications

Always set read and write concerns explicitly. Relying on defaults works for development, but production applications need consistency guarantees documented in code. Set readConcern and writeConcern at the client, database, or operation level depending on your needs.

Prefer pipeline $lookup over equi-join. The pipeline syntax lets you filter, sort, and limit joined documents, reducing network overhead in sharded clusters. It also supports correlated subqueries with $expr.

Use bulkWrite() over individual operations. Batching 500–1000 operations per call reduces round trips by orders of magnitude. Use ordered: false when operations are independent to avoid a single failure blocking the batch.

Index the fields used in $lookup and $graphLookup. Without indexes, these stages read entire collections. For $graphLookup, index both connectFromField and connectToField.

Store change stream resume tokens. If your application restarts, resuming from the last stored token prevents missing events. Without a stored token, a restart creates a brand-new stream starting from the current oplog time.

Match read concern to operation criticality. Use majority for reads that affect business decisions; use local for dashboards and logging. Linearizable reads should be reserved for operations that require strict serializability and are tolerant of higher latency.

Handle bulk write errors gracefully. In unordered mode, always check result.hasWriteErrors() and log or retry failed operations. Never assume a bulk write fully succeeded without inspecting the result.

Test replica set failover scenarios. Read concerns only matter in a replica set. Set up a three-node replica set locally with mongod instances and simulate a primary failure to verify your application handles rollback correctly.

Resources

Comments

Share this article

Scan to read on mobile