Enriching via API — the $https operator in MongoDB Atlas Stream Processing
Enriching via API — the $https operator in MongoDB Atlas Stream Processing
Photo by Elijah Hiett on Unsplash
Like a tributary flowing to the mainstem, the new Atlas Stream Processing operator called $https enriches the pipeline in important and interesting ways. Let’s dive in and see how easy it is to use.
Just to review a bit — Atlas Stream Processing is built on the foundations of the MongoDB Aggregation Language. A pipeline is an array of stages built from operators that pass data from stage to stage, processing the data as it goes. It’s a simple yet powerful paradigm for stream processing built right into the MongoDB ecosystem.
The new $https operator builds on this paradigm and allows the user to enrich and expand the capabilities of the pipeline by interacting with REST endpoints internal or external to the stream processor. In line with other features of Atlas Stream Processing, it has two components:
- A connection registry entry that specifies the base URL and credentials to be used to access it.
- A stage that calls this entry by name sending data to and capturing results from the RESTful call.
It’s that easy.
This unlocks a wide range of use cases, such as triggering PagerDuty when an IoT sensor detects anomalies, sending SMS alerts via Twilio when data stops flowing, or enriching streams with public APIs to retrieve ZIP codes, solar yield, stock prices, and used car values. Additionally, it can be used to interact with internal microservices via RESTful interfaces for tasks like verifying frequent flyer status or detecting fraud, waste, and abuse.
Let’s walk through a simple example to demonstrate the power of this operator. In an intrusion detection system, identifying malicious traffic involves geo-tagging incoming requests to detect patterns. To enrich data with geolocation information, we’ll process a stream of IP addresses and use the $http operator to retrieve location data — such as latitude, longitude, ZIP code, and more — via the ipinfo.io API.
Step 1: Create the Connection Registry entry
For this step, we will be using the MongoDB Atlas Admin API. For ease of use, I will use Python to call the endpoint. This is a one-time step that registers IPinfo.io as a named connection.
import os
import requests
from requests.auth import HTTPDigestAuth
import pprint
# Environment variables
ATLAS_USER = os.environ["ATLAS_USER"]
ATLAS_USER_KEY = os.environ["ATLAS_USER_KEY"]
INSTANCE = os.environ["ATLAS_INSTANCE"]
GROUP = os.environ["ATLAS_GROUP"]
IPKEY = os.environ["IPKEY"]
# Base URL
base_url = "https://cloud.mongodb.com/api/atlas/v2/"
# Authentication
auth = HTTPDigestAuth(ATLAS_USER, ATLAS_USER_KEY)
# Headers
headers = {
'Accept': 'application/vnd.atlas.2024-05-30+json',
'Content-Type': 'application/json'
}
# URL and response for GET request
url = f"groups/{GROUP}/streams/{INSTANCE}/connections"
print(url)
response = requests.get(base_url + url, auth=auth, headers=headers)
pprint.pprint(response.json())
# Send headers
send_headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {IPKEY}"
}
# Data payload
data = {
'name': 'IPinfo',
'type': 'Https',
'headers': send_headers,
'url': 'https://ipinfo.io/'
}
# URL and response for POST request
response = requests.post(base_url + url, auth=auth, headers=headers, json=data)
pprint.pprint(response.json())
Note: You will need to set variables on your operating system for the various credentials at the top. Check out the docs here for how to get them from Atlas.
Running this from the command line gives us output similar to this:
{
'results': [
{'name': 'IPinfo',
'type': 'Https',
'url': 'https://ipinfo.io/'}
]
}
Step 2: Create and run the processor.
Now, we can create a processor that uses this connection to address our use case. For this example, we will use the $documents stage to generate sample data, but in your case, maybe it’s a stream of network events coming from Apache Kafka or something similar.
Let’s define some sample data as a source for the pipeline that looks like IP traffic:
{
$source: {
documents: [
{ "_id": 1, "ip": "192.168.1.10", "request_type": "GET", "status": 200, "bytes": 512 },
{ "_id": 2, "ip": "203.0.113.25", "request_type": "POST", "status": 201, "bytes": 1042 },
{ "_id": 3, "ip": "8.8.8.8", "request_type": "GET", "status": 404, "bytes": 0 },
{ "_id": 4, "ip": "198.51.100.42", "request_type": "PUT", "status": 500, "bytes": 256 },
{ "_id": 5, "ip": "123.45.67.89", "request_type": "DELETE", "status": 403, "bytes": 128 }
]
},
},
The $https stage is simple, it looks like this:
{
$https: {
connectionName: "IPinfo", // reference the connection by name
method: "GET", // a get request
as: "apiResults", // the subdoc to return into
path: "$ip", // pass the ip field to the URL (https://ipinfo.io/198.51.100.42)
},
},
Let’s put it together into something we can run end-to-end:
p = [
{
$source: {
documents: [
{ "_id": 1, "ip": "192.168.1.10", "request_type": "GET", "status": 200, "bytes": 512 },
{ "_id": 2, "ip": "203.0.113.25", "request_type": "POST", "status": 201, "bytes": 1042 },
{ "_id": 3, "ip": "8.8.8.8", "request_type": "GET", "status": 404, "bytes": 0 },
{ "_id": 4, "ip": "198.51.100.42", "request_type": "PUT", "status": 500, "bytes": 256 },
{ "_id": 5, "ip": "123.45.67.89", "request_type": "DELETE", "status": 403, "bytes": 128 }
]
},
},
{
$https: {
connectionName: "IPinfo", // reference the connection by name
method: "GET", // a get request
as: "apiResults", // the subdoc to return into
path: "$ip", // pass the ip field to the URL (https://ipinfo.io/198.51.100.42)
},
},
];
sp.process(p);
And our output is now enriched. We can see both lat/lon and some other richer geo-data. Notice we are using .process() to see the output; check out the docs here on how sp.process() is used.
{
_id: 5,
ip: '123.45.67.89',
request_type: 'DELETE',
status: 403,
bytes: 128,
_ts: ISODate("2025-03-04T16:41:36.208Z"),
_stream_meta: {
source: {
type: 'generated'
},
https: {
url: 'https://ipinfo.io/123.45.67.89',
method: 'GET',
httpStatusCode: 200,
responseTimeMs: 35
}
},
apiResults: { // enriched object
ip: '123.45.67.89',
city: 'Seoul',
region: 'Seoul',
country: 'KR',
loc: '37.5660,126.9784', // lat-lon data
postal: '03141',
timezone: 'Asia/Seoul'
}
}
We could now write this to MongoDB via $merge or maybe back out to Apache Kafka using $out.
If you want to dig deeper, check out sp.stats() for monitoring the performance of these calls, and also read up on the docs for the $https stage to understand the nuances of passing and returning data with REST endpoints.
To get started or try this example, go to Atlas and log in. You can also use this as a companion to get started: https://medium.com/mongodb/mongodb-atlas-stream-processing-your-first-steps-bcb2814034ca
As a parting shot, if you are like me, you immediately start thinking, hey, can I use this to plug into a** large language model** , perhaps calling an API to get probabilistic results based on some text input? The answer is YES! Stay tuned for some unbelievably cool use cases coming from this type of pipeline design.
Enriching via API — the $https operator in MongoDB Atlas Stream Processing was originally published in Towards Data Engineering on Medium, where people are continuing the conversation by highlighting and responding to this story.