Aggregation Pipeline

Aggregation Pipeline

Mongodb allows to from so called aggregation pipeline as one of the basic methods in aggregation framework.
Mongo Streams has simple syntactic sugar for these aggregation pipeline queries and commands.
Supported commands are:

  • $match (query and only syntax)
  • $project
  • $limit
  • $skip
  • $unwind
  • $group
  • $sort

Below is description of syntax for every command supported

Limiting result with query and only

Each aggregation command starts with initial query, that is turned to very first match. For example
query("key" -> "value") |>> limit(1) will turn to aggregation command in json
{ " $match" : { "key" : "value" } } and { " $limit" : 1 }.

Additionally multiple queries may be chained together with only syntax
(please note the collectionSyntax prefix is not necessary):

 query("key" -> "value") |>> ( limit(5) |>> collectionSyntax.only("key2" === 3))

only syntax is converted to $match in Aggregation pipeline.

Grouping documents

Basic aggregation pipeline command is group. Group allows to pick unique id for the documents that may consist
of multiple keys found in document. Then it looks for documents that have this id same, and when found will for
each document compute the aggregation expression.

// creates aggregation pipeline, that first filters all documents where `key == value` then it groups all 
      // returned documents based on content of `key1` and for each document where `key1` is same will count sum of values in 
      // `key3` that is stored in `sumKey` key.
      query("key" -> "value") |>> (group("grouped" -> "$key1") compute ("sumKey" sum "$key3"))

      //multiple computations may be specified on the query: 

      (query("key" -> "value") |>> (group("grouped" -> "$key1") compute("sumKey" sum "$key3", "avgKey" avg "$key4"))): ChannelResult[DBCollection, DBObject]

There are few grouping expressions for group command:

  • sum :
 query() |>> (group("g" -> "$k") compute ("sumKey" sum "$key3"))
  • avg :
 query() |>> (group("g" -> "$k") compute ("avgKey" avg "$key3"))
  • min :
 query() |>> (group("g" -> "$k") compute ("minKey" min "$key3"))
  • max :
 query() |>> (group("g" -> "$k") compute ("maxKey" max "$key3"))
  • last :
 query() |>> (group("g" -> "$k") compute ("lastKey" last "$key3"))
  • first :
 query() |>> (group("g" -> "$k") compute ("firstKey" first "$key3"))
  • addToSet :
 query() |>> (group("g" -> "$k") compute ("setKey" addToSet "$key3"))
  • push :
 query() |>> (group("g" -> "$k") compute ("listKey" push "$key3"))

Sorting documents

Similarly as in plain queries, the aggregation pipeline allows the sorting of the documents. This is achieved
simply via sort syntax. Please note that sorting documents before the aggregation pipeline has no effects.

 query("key" -> "value") |>> sort("key1" Ascending)

Limiting and skipping documents

To limit results or skip unwanted results, limit and skip aggregation pipelines can be used. Following example
will skip first 10 results and then will limit the query by 20 results.

 query("key" -> "value") |>> (skip(10) |>> limit(20))

Unwinding the documents

If the documents contains field, that is of list type and eventually contains document, the unwind syntax allows to
convert the field to documents, that will instead of that key of list type contain each entry from the list and all other
keys of the document.

 query("key" -> "value") |>> unwind("key2")

Projecting the result documents

Pipeline allows to transform resulting document with project syntax this allows you to include or exclude certain keys from document

query("key" -> "value") |>> project("key2" include, "key3" exclude)

Moreover there is a possibility to add whole new fields with their values or rename the fields:

  • renaming the fields :
 query() |>> (project("key2" include, "nkey2" setTo "$key2"))

Total for specification PipelineSpec
Finished in6 seconds, 217 ms
Results13 examples, 0 failure, 0 error