Enrichment Nirvana — using $lookup with Atlas Stream Processing

May 24, 2024

Enrichment Nirvana — using $lookup with Atlas Stream Processing

How using $lookup makes enriching streams of data a no-brainer

Photo by Jurian Kersten on Unsplash

Enriching streams using data persisted in databases is a key capability for anyone building stream processors. It’s used across a massive corpus of use cases — enriching streams of manufacturing data with historical aggregations, fetching customer details to enrich streams of e-commerce actions, and so on. These use cases all work the same way: the stream isn’t the whole story, and enriching the data from another source completes it.

The notion of a join in MongoDB may not be familiar, as it’s not a concept that has been traditionally used. However, the functionality to perform a logical join has been in MongoDB since very early in its life. The capability is provided with the $lookup command. With Atlas Stream Processing, we tackled the most common, and frankly most simplistic, use cases first — adding the ability to enrich streams using $lookup. At the top of our list of concerns was that joining data is a source of developer frustration. It’s complicated and messy to visualize how the join works and why the results look like they do. The MongoDB interface was already good, but could we adapt it to work with streaming systems?

A simple example

Let’s take a simple example to illustrate how it works. Let’s say we are building an application for solar farm maintenance workers. This application must show a stream of panel performance numbers (the event stream) and the panel’s manufacture and model (persisted in a database). This would help the worker better understand underperforming panels and technical details for each panel type, warranty, and support paths.

For this example, we can use the built-in sample_stream_solar data source. For each message, we will perform a lookup on the solar_devices collection using the source key (localField) device_id and lookup key (foreignField) panel_id. The result is specified to return as the panel_detail key.

// create a source using the sample_stream_solar source  
let s = { '$source': { connectionName: 'sample_stream_solar' } }  
  
// now let's create a lookup from the database  
let l = {  
  '$lookup': {  
    from: { connectionName: 'Cluster0', db: 'demo', coll: 'solar_devices' },  
    localField: 'device_id',  
    foreignField: 'panel_id',  
    as: 'panel_detail'  
  }  
}  
  
// create a processor with these two stages  
let p = [s, l]  
  
// look at the results  
sp.process(p)  
  
{  
  device_id: 'device_3',  
  group_id: 3,  
  timestamp: '2024-05-24T14:46:53.359+00:00',  
  max_watts: 250,  
  event_type: 0,  
  obs: {  
    watts: 170,  
    temp: 24  
  },  
  _ts: ISODate("2024-05-24T14:46:53.359Z"),  
  panel_detail: [  
    {  
      _id: ObjectId("6650a6ca277691637d5d34e6"),  
      panel_id: 'device_3',  
      panel_type: 'Panasonic',  
      panel_model: 'PNS-400-EVP132GL'  
    }  
  ]  
}  
{  
  device_id: 'device_1',  
  group_id: 2,  
  timestamp: '2024-05-24T14:46:53.359+00:00',  
  max_watts: 250,  
  event_type: 0,  
  obs: {  
    watts: 121,  
    temp: 14  
  },  
  _ts: ISODate("2024-05-24T14:46:53.359Z"),  
  panel_detail: [  
    {  
      _id: ObjectId("6650a66a277691637d5d34e4"),  
      panel_id: 'device_1',  
      panel_type: 'Panasonic',  
      panel_model: 'PNS-EVPV400HK'  
    }  
  ]  
}

Now we have a data stream (max_watts, obs.watts, obs.temp) that we have enriched with data from the collection (panel_detail.panel_type, panel_detail.panel_model). Technically speaking, it’s a left outer join (so it still returns events where there is no match in the lookup collection), and we used an equality match with a single collection.

Pro Tip

You might notice in the example the enriched data is returned as a nested element. But there is a pro tip to graduate the nested document to the top level using $replaceRoot with $mergeObjects, then using $project to remove the nested key.

// the same lookup as above  
let l = {  
  '$lookup': {  
    from: { connectionName: 'Cluster0', db: 'demo', coll: 'solar_devices' },  
    localField: 'device_id',  
    foreignField: 'panel_id',  
    as: 'panel_detail'  
  }  
}  
  
// move the data returned from $lookup to the root level  
let r = {  
    $replaceRoot: { newRoot: { $mergeObjects: [ { $arrayElemAt: [ "$panel_detail", 0 ] }, "$$ROOT" ] } }  
}  
  
// remove the data we don't need anymore  
let p = {  
   $project: { panel_detail: 0, _id: 0, panel_id: 0 }   
}  
  
  
// This would output a document that looks like this:  
  
{  
  _ts: ISODate("2024-05-24T14:46:53.359Z"),    
  device_id: 'device_1',  
  group_id: 2,  
  timestamp: '2024-05-24T14:46:53.359+00:00',  
  max_watts: 250,  
  event_type: 0,  
  obs: {  
    watts: 121,  
    temp: 14  
  },  
  panel_id: 'device_1',  
  panel_type: 'Panasonic',  
  panel_model: 'PNS-EVPV400HK'  
}

Going further

The $lookup syntax supports more sophisticated capabilities, like correlated subqueries using a nested pipeline, iterating over arrays, and more. We could introduce a $lookup that works at window boundary time (it runs when windows close vs. message by message). Should we? We want to hear from you.

Join the conversation on the MongoDB Community Forums. Just select ‘Atlas Stream Processing’ as the category.

Lastly, check out the Github repo for this example. It will guide you through running the complete example yourself.


Enrichment Nirvana — using $lookup with Atlas Stream Processing was originally published in MongoDB on Medium, where people are continuing the conversation by highlighting and responding to this story.