Locating tacos with Atlas Stream Processing (or streaming ETL with MongoDB)

September 30, 2024

Photo by Tai’s Captures on Unsplash

Extract, Transform, and Load (ETL) has been an industry acronym for decades. It typically describes a system where data is moved from one location to another and mutated for business-specific purposes. Streaming ETL takes this further— by implying that data is more up-to-date than a batch cycle would typically permit, being continuously processed as data arrives.

Use cases vary, but most of the time, the data consumer needs that data to be as fresh as possible. The data could drive a real-time dashboard of tornado locations, alerts on pressures inside nuclear reactors, or persistent storage for displaying the location of those tacos you just ordered.

Using MongoDB Atlas Stream Processing, these use cases become almost too easy. But before we dive in, let’s understand a pipeline in Atlas Stream Processing. If you’ve used the MongoDB aggregation language, you are already very close to an expert in creating ETL pipelines in MongoDB — it’s the same syntax with a bit more sugar. ETL pipelines are written in Javascript and use the Aggregation Language in the shell, VS Code, or other tools (MongoDB Compass support is coming!)

Stream Processors follow the following pattern:

// create a source for the continuous extract  
let extract = {$source: {...}}  
  
// continuously transform the data, in this case a filter ($match)  
let transform = {$match: {...}}  
  
// load the data, keep the target continuously up to date. In this case  
// the MongoDB database using $merge  
let load = {$merge: {...}}  
  
// create an array of stages that define the processor  
let processor = [extract, transform, load]  
  
// start the processor  
sp.process(processor);

Yes, it’s that simple.

The magic with Atlas Stream Processing is that it’s designed to run continuously, pulling data from a source, transforming it, and writing it to a sink. It sounds a lot like ETL, right? Because it does this continuously, it always performs this action based on the freshest data and keeps that chart of taco locations up-to-date.

Let’s break down the traditional Extract, Transform, and Load paradigm into its equivalents in Atlas Stream Processing:

Extract

Extracting data is different in streaming ETL. It’s not a batch activity like a Create Table as Select (CTAS) or export function from a database. It’s a connection to the source system that consumes events one by one. Examples of these messages are change events from a database or messages from Kafka topics.

The $source stage is used with Atlas Stream Processing to consume these messages. It can only be specified as the first stage in a pipeline, and it can be a MongoDB change stream or a Kafka topic (and other sources are coming). Once a $source is configured, the stream processor continuously runs on the boundless data, processing it event by event or message by message.

// Continuously extract data from the source Atlas Cluster, where raw taco  
// location events get stored  
let extract = {  
  $source: {  
    connectionName: 'mysourceAtlasCluster01',  
    db: 'tacoData',  
    coll: 'tacos',  
    config: {  
      fullDocument: 'whenAvailable',  
      fullDocumentBeforeChange: 'whenAvailable',  
    }  
  }  
};

Transform

The transform stage is where the magic happens in streaming ETL pipelines. In this phase, data is filtered, aggregated, enriched, joined, masked out, or otherwise mutated to match the business use case. This is done using the MongoDB aggregation language, so stages like $project, $match, $lookup, and $$ROOT are among the language’s vast library of capabilities to transform data from its source to its destination form. For tacos, we don’t want to see the entire path the tacos took. That would be too much data.

This is the stage where we aggregate the stream of location points for my tacos into the latest place they were seen.

// transform the data by aggregating over a tumbling window every 1 minute  
// returning the latest distance from the destination by delivery ID (orderID)  
let transform = {  
  $tumblingWindow: {  
    interval: {  
      size: 1,  
      unit: "minute",  
    },  
    pipeline: [  
      {  
        $group: {  
          _id: { deliveryID: "$delivery_id" },  
          distanceFromDestination: { $last: "$dist" },  
        },  
      },  
    ],  
  },  
};

Load

Like extract, the loading phase in a streaming ETL pipeline isn’t a batch-type activity; instead, it is a stream of writes to some sink. In Atlas Stream Processing, that can be the MongoDB database (via $merge) or Kafka topics (via $emit) or even S3 (coming soon!). In any case, the data is processed continuously. In the case of the database, the $merge statement keeps the specified collection up to date with the latest changes coming from the stream. This is where our taco locations are stored — a $merge with upsert (conditional update if exists, else insert) for each taco in the ‘in transit’ stage.

// Continuously merge (load) the data into the target collection  
let load = {  
  $merge: {  
    into: {  
      connectionName: 'tacoDashboard',  
      db: 'dashboardDB01',  
      coll: 'tacosTimetoDestCollection',  
    },  
    on: ['customerID'],  
  },  
};

Putting it together

To build a complete end-to-end streaming ETL pipeline with Atlas Stream Processing, we can assemble the stages above into a final solution for taco tracking. Note that this processor is run within MongoDB Atlas and doesn’t require extensive setup or configuration.

  1. Create a stream processing instance. See the docs here.
  2. Create a source and a sink in the Connection Registry. See docs here.
  3. Connect via the MongoDB shell or VS Code. See docs here.
  4. Use the above code to define your stream processing stages for Extract Transform and Load stages, assemble them into a processing pipeline, and run it!
// create a processor with the above stages—order matters.  
// multiple stages for transform could exist, not just one like this example  
let processor = [extract, transform, load]  
// create the stream processor and give it a name  
sp.createStreamProcessor('tacoProcessor', processor);  
// start it  
sp.tacoProcessor.start();  
// check up on it!  
sp.tacoProcessor.stats();

Go here next:

Get Started using Atlas Stream Processing

Further reading

To take your Streaming ETL game to the next level, check out the docs, jump in, and join the community. We want to hear from you!


Locating tacos with Atlas Stream Processing (or streaming ETL with MongoDB) was originally published in MongoDB on Medium, where people are continuing the conversation by highlighting and responding to this story.