Practical analytics with Spark and Cassandra

At Heuritech we work with huge quantities of Apache logs. This post explores how we use Apache Spark and Cassandra to analyze them.

The tutorial for Spark Cassandra connector, does not cover our usecase: large Spark-based processing pipelines, with frequent updates of  multiple Cassandra tables.

Let’s explain how we do it with an example Spark job, which is run periodically to process new log files.

Setting things up

We use Spark 1.2.1 and Cassandra 2.1.
Create a Scala project with SBT and add the following library dependencies in build.sbt:

val sparkVersion = "1.2.1"
libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-core" % sparkVersion % "provided" exclude("", "guava"),
  "org.apache.spark" % "spark-sql_2.10" % sparkVersion % "provided" exclude("", "guava"),
  "com.datastax.spark" %% "spark-cassandra-connector" % "1.2.+" exclude("", "guava"),
  "" % "guava" % "16.0.1"

Note that we force a specific guava version to avoid conflicts.
Spark dependencies are marked as « provided » since our Spark cluster already has them. If you run the job through sbt run, you can remove these.

Parsing and aggregating with Spark

First, we define a parseLine function that takes a line and returns an Either[ParsingError,LogData]

import java.util.Date

case class ParsingError(msg: String)

case class LogData(
  time: Date,
  cookie: String,
  remoteHost: String,
  url: String,
  userAgent: String)

def parseLine(line: String): Either[ParsingError,LogData] = {
  // ...

Some code for this parsing is available in that blog post. We start our program by opening the log files and mapping our parseLine function on each line:

val parsingResult: RDD[Either[ParsingError,LogData]] =

The next step is to create an RDD which contains only LogData, that is the lines we could parse correctly.
How to filter out LogData, the right part from our Either type ?
Well, as Scala developpers like to say, just flatmap that sh** !

val validLines: RDD[LogData] =

Now to compute some stats on these datas, for example to aggregate stuff over cookies, let’s do even better and count the number of page views per cookie and domain.

// Our key is the cookie-domain pair
type CookieDomain = (String, String)

val pageViews: RDD[(CookieDomain, Long)] ={ logData =>
    ((logData.cookie, extractDomain(logData.url)), 1L)
  }).reduceByKey(_ + _)

It is basically a word count, with cookie-domain pairs instead of words.

Updating analytics in Cassandra

Our goal here is to store this stat in Cassandra and update it regularly.
Run cqlsh, the shell for Cassandra Query Language (CQL), and create a test keyspace and and table with the following commands:

WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};

CREATE TABLE test.stats_counter (
  cookie TEXT,
  domain TEXT,
  nviews BIGINT,
  PRIMARY KEY (cookie, domain));

The Spark Cassandra connector allows you to manipulate a Cassandra table as an RDD and save it back in a table:

  • sc.cassandraTable(keyspace, table) returns an RDD that contains the rows of the table.
  • rdd.saveToCassandra(keyspace, table) writes the content of rdd into Cassandra.

In Cassandra, writing means inserting or replacing. What about updates then ?
Well, for those familiar CQL, the SQL-like query language of Cassandra, you may know that the UPDATE action is actually syntactic sugar for reading the current value and writing the updated value over it.

With the Spark Cassandra connector, there are two ways to perform a batch update.

Joining RDDs

We can join the stats table with the RDD of updates, add them up and save the result back to Cassandra.

case class Stats(
  cookie: String,
  domain: String,
  nviews: Int)

val statsTable: RDD[(CookieDomain,Stats)] =
  sc.cassandraTable("test", "stats")
    .keyBy(row => (row.getString("cookie"), row.getString("domain")))

val updatedStatsTable: RDD[(CookieDomain, Long)] =
  .mapValues({ case (oldStats: Option[Stats], newStats: Stats) => + newStats.nviews

  .map(x => (x._1._1, x._1._2, x._2))
  .saveToCassandra("test", "stats",

Note the use of rightOuterJoin with the new datas as the right part, since we might have some cookies and domains that are not already in the database.

Using the COUNTER type

If the stat to be updated is a counter, like the page views, things become much simpler:
Cassandra has a special COUNTER type. It starts at 0 and can only be increased by writing increments.
In other words, it only requires to save a RDD to Cassandra with the updates.
This also ensures consistency if your batches overlap !

In cqlsh, create the stats_counter table as follows:

CREATE TABLE test.stats_counter (
  cookie TEXT,
  domain TEXT,
  nviews COUNTER,
  PRIMARY KEY (cookie, domain));

And here’s the Scala code:

  .map(x => (x._1._1, x._1._2, x._2))
  .saveToCassandra("test", "stats_counter",

That was easy !

Note however that using the COUNTER type comes with some restrictions: it must be the only column that is not part of the primary key. For instance, we cannot store within the same table a timestamp that would indicate the last time we performed an update.

Cassandra has a specific cache for counters, with a default size bounded at 50 MB. If you choose the counter approach, be sure to tune the counter_cache_size_in_mb parameter in cassandra.yaml for maximum performance !

Laisser un commentaire

Entrez vos coordonnées ci-dessous ou cliquez sur une icône pour vous connecter:


Vous commentez à l'aide de votre compte Déconnexion / Changer )

Image Twitter

Vous commentez à l'aide de votre compte Twitter. Déconnexion / Changer )

Photo Facebook

Vous commentez à l'aide de votre compte Facebook. Déconnexion / Changer )

Photo Google+

Vous commentez à l'aide de votre compte Google+. Déconnexion / Changer )

Connexion à %s