User defined functions in Atlas Stream Processing — $function

December 12, 2025

User defined functions in Atlas Stream Processing — $function

Transforming data in ways that are unique to your business

Photo by Kenny Gorman on Unsplash

First some background

UDFs have been foundational to stream processing since the early days.** from Apache Storm’s “bolts” (custom processing logic) in the early 2010s, to Azure Stream Analytics adding JavaScript UDFs, to Apache Flink’s comprehensive UDF framework supporting scalar, table, and aggregate functions across Java, Scala, and Python. As streaming systems evolved from first-generation frameworks (Storm, Spark Streaming) to modern platforms (Flink, Kafka Streams), UDF support became increasingly sophisticated, enabling developers to extend built-in operators with custom business logic, complex transformations, and domain-specific calculations that can’t be expressed through SQL alone.

Now consider MongoDB Aggregation Pipelines. MongoDB introduced the Aggregation Framework in August 2012 with version 2.2 as a powerful replacement for Map-Reduce. An aggregation pipeline is a multi-stage data processing framework where documents flow through sequential stages (like $match, $group, $sort, $project) that filter, transform, group, and reshape data. Each stage performs a specific operation and passes its output to the next stage, allowing you to build complex data transformations and analytics directly in the database without moving data elsewhere. The aggregation framework is the backbone of Atlas Stream Processing, they work in exactly the same way. You see where this is going.

In July 2020, MongoDB 4.4 added the $function operator, which allows developers to define custom JavaScript functions within aggregation pipelines. This was a significant evolution — for the first time, you could write arbitrary JavaScript logic inline when built-in aggregation operators couldn’t fulfill your needs. While $function comes with performance considerations (JavaScript execution is slower than native operators), it provided essential flexibility for complex business logic, custom calculations, and edge cases that standard operators couldn’t handle.

Now this same capability is available in Atlas Stream Processing, giving you consistent UDF support whether you’re working with static data in the database or streaming data.

An example

Let’s dive into a more concrete use case to illustrate the power of $function. Take an input message in this form. A simple weather message:

{  
  "temp": 98,  
  "loc": "Phoenix, AZ",  
  "humidity": 45,  
  "timestamp": "2025-12-12T14:30:00Z"  
}

Now let’s create a stream processor that looks for high temperatures and generates a conditional warning. This is obviously a very simple example to show the functionality.

{  
  "name": "simple_weather_alert",  
  "pipeline": [  
    {  
      "$source": {  
        "connectionName": "sample_stream_solar"  
      }  
    },  
    {  
      "$addFields": {  
        "heat_alert": {  
          "$function": {  
            "body": "function(temp, loc) {  
                        if (temp > 95) {  
                          return { alert: 'HEAT_WARNING', temp, loc };  
                          }  
                        return null;  
                     }",  
            "args": [  
              "$temp",  
              "$loc"  
            ],  
            "lang": "js"  
          }  
        }  
      }  
    },  
    {  
      "$match": {  
        "heat_alert": {  
          "$ne": null  
        }  
      }  
    },  
    {  
      "$merge": {  
        "into": {  
          "connectionName": "Cluster01",  
          "db": "weather_monitoring",  
          "coll": "heat_alerts"  
        }  
      }  
    }  
  ]  
}

Because the temperature is > 95, the output looks like this:

{  
  "alert": "HEAT_WARNING",  
  "temp": 98,  
  "loc": "Phoenix, AZ"  
}

Some important tips

  • Atlas Stream Processing supports running processors on different tiers with different memory and compute profiles. $function does use more resources so depending on the program, it may need to be moved to a higher tier.
  • Atlas Stream Processing uses a sandboxed JavaScript environment that supports modern ES6+ syntax including arrow functions, const/let declarations, and template literals. The runtime provides access to essential built-in objects like Math, JSON, and Date, but deliberately excludes Node.js-specific APIs like the process object, file system access, and global scope manipulation for security reasons.
  • In the MongoDB shell, you can monitor the stream processor’s JavaScript function performance by connecting to the Atlas Stream Processing instance and running sp.processor.stats(“processor_name”) to view basic metrics, or sp.processor.stats(“processor_name”, {verbose: true}) for detailed per-operator statistics that include memory usage in bytes and latency percentiles (p50, p99) for each pipeline stage. The verbose output provides granular performance data showing exactly how much time is spent in JavaScript execution versus other pipeline operations, allowing you to identify bottlenecks and optimize.

Get Started

Ready to build custom stream processing logic? Check out our documentation to learn more about JavaScript functions in Atlas Stream Processing.


User defined functions in Atlas Stream Processing — $function was originally published in Towards Data Engineering on Medium, where people are continuing the conversation by highlighting and responding to this story.