Mongodb filesystem usage

Mongodb filesystem usage

Mongo streams allow simple usage of mongodb GridFS. Files can be listed, read, written and combined with
other scalaz-stream processes.

Similarly with collection queries, the binding of GridFS to actual database where the files are stored is
delayed up to the very last time. This allows for great reuse of code and nice abstraction.


Initial construct of Mongo Streams is the filesystem construct. It references to mongodbs filesystem (or bucket). Default name of this filesystem isfs`. To specify filesystem following constructs are available :

val defaultFilesystem = filesystem(filesDb)

    val namedFileSystem = filesystem(filesDb, filesystemName = "fsName")

Listing files

Basic operation on mongodb is to list files in filesystem. To list files
in filesystem use list() directive as shown below:

val listAllFiles = list files()

    val listAllFilesNamedGoo = list named ("goo")

    //running the queries 

    (filesystem(filesDb) through listAllFiles): Process[Task, MongoFileRead]
    (filesystem(filesDb) through listAllFilesNamedGoo): Process[Task, MongoFileRead]

    //combining the results 

    val listEverything = listAllFiles ++ listAllFilesNamedGoo

    //then running them

    (filesystem(filesDb) through listEverything): Process[Task, MongoFileRead]

List command supports additional queries to select particular files with similar syntax as Mongo Streams query() :

val fileId: ObjectId = ???

    val filesMatchingFoo = list named ("foo")
    val filesWithSuppliedId = list withId (fileId)

    //complex queries 

    val filesWithRegex = list files ("filename" regex "report.*")
    val olderFiles = list files ("uploadDate" <= new Date)

Please see more in


Reading from files

Reading is an action on the returned list of files. There are two possible variants of reading. Files are either
read directly (first file matching the list) or process with result of reading the files is returned to allow for
reading multiple files:

val fileId: ObjectId = ???

    val readSingleFile = list withId (fileId) and readFile()

    val readAllFiles = list files() foreach readFile()

    //run the reads  
    (filesystem(filesDb) through readSingleFile): Process[Task, Bytes]

    //run the multiple Files, please not the different type
    val allFiles = ((filesystem(filesDb)) through readAllFiles): Process[Task, (MongoFileRead, Process[Task, Bytes])]

    //now to read every file that has size >= 100 bytes and concatenate their output
    ((allFiles |> filter { case (file, _) => file.length > 100 }).map(v => v._2).join): Process[Task, Bytes]

Please note that read operation recycles the inner array object on every read attempt. The object is encapsulated
within primitive and underlying array must not be mutated. It is safe however to call at
any time toArray method that will allocate Array[Byte].

If there is need to specify custom size of read buffer (default is == com.mongodb.gridfs.GridFS.DEFAULT_CHUNKSIZE)
you can pass this to read command as argument:

//specify read buffer of 1024 bytes
    val customRead = filesystem(filesDb) through (list files() and readFile(1024))

Please see more in


Writing to files

In order to write file to filesystem, the Mongo Streams library builds the Sink, that can be used to write any source of
Bytes. File to which data will be written needs to be specified with file() command :

//creates file named `foo.txt` with supplied ObjectId. Please note `id` must be unique in given filesystem
    file("foo.txt", id = new ObjectId)

    //creates file named `foo.txt` with metadata setting `owner` key to `luke` value
    file("foo.txt", meta = Some(BSONObject("owner" -> "luke")))

To write into such created file simply use the write syntax

val fileId = new ObjectId

    //write to single file
    val writeToFile = write file("foo.txt", id = fileId)

    val fileSource: InputStream = ???
    val readBuffer = new Array[Byte](1024 * 1024)

    //read from one file (inputstream) and write to single file 
    (Process.eval( through unsafeChunkR(fileSource)).map(Bytes(_)) to (filesystem(filesDb) using writeToFile)

Please see more in


Deleting files

Files can be removed from filesystem by using delete command.

// deletes all files that matches the query
    filesystem(filesDb) through (list named ("old") and removeFile)

    // provides process that allows to run delete for each file in query
    filesystem(filesDb) through (list named ("old") foreach removeFile)

Please see more in


Total for specification GridFSUsageSpec
Finished in11 seconds, 408 ms
Results16 examples, 0 failure, 0 error, 1 pending