Establishing Connection
The CassandraCluster
interface exposes a way to create a CassandraSession
via its session
method defined as follow:
def session: Stream[F, CassandraSession[F]]
It acquires a single session, that can be used to execute statements. Note that this emits only once, and session is closed when the resulting process terminates.
NOTE: It is recommended to create a single Session
and pass it as a parameter to any components that might need it.
Example
Given a CassandraSession
execute a statement (eg. insert statements).
import cats.effect.IO
import com.datastax.driver.core.Cluster
import fs2.Stream
import spinoco.fs2.cassandra.{CassandraCluster, CassandraSession}
import scala.concurrent.ExecutionContext.Implicits.global
val config = Cluster.builder().addContactPoints("127.0.0.1")
def doSomething(session: CassandraSession[IO]) = {
session.queryCql("SELECT * FROM ks.test") // or something else
}
val program: Stream[IO, Unit] =
for {
cluster <- CassandraCluster[IO](config, None)
session <- cluster.session
_ <- doSomething(session)
} yield ()