Mongo Streams can be used in various ways. It is designed to inject actual dependency on collection as late as possible and provides
powerful DSL with combinator syntax to allow expressive, simple to use, readable functional way to interact with mongoDB.
Any operation on collection is implemented as ChannelResult[A]
. A
Type parameter may be either DBObject
(when querying documents from
the collection), or WriteResult
(when performing save, insert or update). Additional types depend on each operation on collection.
To start with mongo streams include the generic syntax import
import scalaz.stream.mongodb.collectionSyntax._
this will add to scope required implicits for Mongo Streams DSL.
Now you can easily create simple queries. The basic directive to do so is findDocuments code:
query("key1" === "123")
This will return Query
object that can be used to perfrom actual query on collection like below:.
def queryOnlyActive = query("active" === true)
val allActiveUsers = (users through queryOnlyActive).runLog.run
Note you may apply process combinators on the query BEFORE the query is actually combined with connection:
def queryOnlyFirst50Active = query("active" === true) |> take(50)
val first50ActiveUsers = (users through queryOnlyFirst50Active).runLog.run
In example above we also piped (|>) result to another process, which further transformed the information from mongoDB. This
essentially allows you to combine queries and build a library of your own queries that can be reused and then applied to collection
when necessary.
Standard output of query operation from streams mongo is DBObject from java mongo driver.
Essentially there are two types of update operations that can be performed against mongo collection.
Query based updates are equivalents of mongoDB's findAndModify, findAndRemove and update commands.
This sets in all documents with name == luke their ship property to falcon:
query("name" -> "luke") and update("ship" := "falcon")
This removes all documents with name == yoda from collection (equivalent to remove in mongoDB):
query("name" -> "yoda") and remove
This modifies first document with name == yoda from collection (equivalent to findAndModify in mongodDB). In contrast with
update you may receive the original of the document updated
query("name" -> "yoda") and updateOne("ship" := None)
query("name" -> "yoda") and updateOne("ship" := None).returnNew(true)
query("name" -> "yoda").sort("last" Ascending) and updateOne(document)
This removes first document with name == yoda from collection (equivalent to findAndModify with remove == true in mongoDB. In contrast with
remove, you may receive the original document that was removed, if present.
query("name" -> "yoda") and removeOne
query("name" -> "yoda").sort("last" Ascending) and removeOne
New documents may be inserted in the mongoDB collection with either save or insert.
insert(document)
save(document).ensure(WriteConcern.REPLICA_ACKNOWLEDGED)
All the actions in Mongo Streams can be combined together via process combinators.
Mongo streams actions are monads, therefor they can be used in for comprehensions. Code below demonstrates one of
the possible usages where we first query all skywalkers and then we lookup all friends of the skywalkers:
def findAllSkyWalkersWithFriends =
for {
skywalker <- query("name" -> "luke")
friend <- query("friendOf" -> skywalker.as[String]("name"))
} yield (skywalker, friend)
starWars through findAllSkyWalkersWithFriends
Now this is great if we only want to interact with mongo collections, but what if we want to query all skywalker's and then maybe read
their blogs from internet? We can still combine mongo stream with some process that fetches the blogs from the internet web site like shown below:
def findBlog(sw: DBObject): Process[Task, String] = ???
def skyWalkersBlogs =
query("name" -> "luke").flatMapProcess(sw=>
findBlog(sw).map(blog => (sw,blog))
)
starWars through skyWalkersBlogs
Another example that you may often need is that you query data from one colection and with results you want to query other collection.
Of course, this can be also achieved (again, without knowing upfront on which mongo collections it will run):
def readFrom2(col1: DBCollection, col2: DBCollection): Process[Task, (DBObject, DBObject)] = {
for {
sw <- col1 through query("name" -> "luke")
ship <- col2 through query("ship" -> sw.getAs[String]("like"))
} yield (sw, ship)
}
// then later somewhere in your code or in test
(readFrom2(collA, collB)): Process[Task, (DBObject, DBObject)]
Mongo Stream actions have similar combinators to the ones you will find on scalaz-stream processes. They allow you to reuse syntax from streams to
further combine the actions with processes and processes with actions
//queries first all lukes and then all chewaccas
def lukeAndChewacca =
query("name" -> "luke") ++ query("name" -> "chewacca")
//saves all lukes to file
def flatFileStore: Sink[Task, DBObject] = ???
(starWars through lukeAndChewacca) to flatFileStore
//or
def exportLukeAndChewacca2File = lukeAndChewacca to flatFileStore
starWars through exportLukeAndChewacca2File
Total for specification BasicUsageSpec | |
---|---|
Finished in | 0 ms |
Results | 1 example, 0 failure, 0 error, 1 pending |