Writing Spark DataFrame to HBase

Have you ever wondered how could you write your DataFrame to HBase? You are in luck. This post shows you how to do it.

You can find the code in this repo if you don’t feel like following along. You can also find the breakdown of the unit testing part of that code in this post.

First, you should start by setting up your environment.

Getting HBase up & running

To prepare the HBase environment, use the HBase Docker image from Dajobe.

docker run -d --name hbase -p 2181:2181 -p 8080:8080 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16201:16201 -p 16301:16301 -p 16020:16020 --hostname hbase dajobe/hbase

The preceding command starts the HBase container. Check the status of the container by running docker ps.

To open the HBase shell inside the container, run the following command:

docker exec -it hbase hbase shell

List the tables in HBase:

hbase(main):006:0> list
TABLE
0 row(s)
Took 0.4777 seconds
=> []

To create a test table in HBase with column family cf:

hbase(main):006:0> create 'test', 'cf'
Created table test
Took 0.2019 seconds
=> Hbase::Table - test

After creating the HBase container, time to build the HBase connector for Spark.

Building the HBase Connector

Building you said? What about maven repository?

The latest version available in the maven repository hasn’t received an update in while. It doesn’t contain the latest updates and fixes. Hence, building the jar.

First, clone the hbase-connectors repo:

git clone https://github.com/apache/hbase-connectors.git
cd hbase-connectors/spark

Per the documentation, launch the maven command with the following parameters:

# hbase-connectors/spark
mvn -D spark.version=2.4.8 -Dscala.version=2.11.12 -Dhadoop.profile=2.0 -Dscala.binary.version=2.11 -Dhbase.version=2.3.0 clean install -DskipTests

The -DskipTests flag sneaked into the command. Remove it if you want to run the tests.

Even though you use HBase 2.1.2 in the container, compiling the connector with HBase 2.3.0 works.

If you meet the following error:

An API incompatibility was encountered while executing net.alchim31.maven:scala-maven-plugin:4.3.0:compile: java.lang.NoSuchMethodError: org.fusesource.jansi.AnsiConsole.wrapOutputStream(Ljava/io/OutputStream;)Ljava/io/OutputStream;

Bump the version of the scala-maven-plugin to 4.7.2 in the pom.xml file inside the hbase-connectors/spark/hbase-spark folder.

<!--hbase-connectors/spark/hbase-spark/pom.xml-->
...
  <groupId>net.alchim31.maven</groupId>
  <artifactId>scala-maven-plugin</artifactId>
  <version>4.7.2</version>
...

If you meet the following error:

(checkstyle) on project hbase-spark: Failed during checkstyle configuration: cannot initialize module TreeWalker - TreeWalker is not allowed as a parent of LineLength Please review 'Parent Module' section for this Check in web documentation if Check is standard.

Remove the following lines from the file hbase-connectors/spark/hbase-spark/pom.xml:

<!--hbase-connectors/spark/hbase-spark/pom.xml-->
<plugin>
  <groupId>org.apache.maven.plugins</groupId>
  <artifactId>maven-checkstyle-plugin</artifactId>
</plugin>

Now, you should have the hbase-spark-1.0.1-SNAPSHOT.jar in the hbase-connectors/spark/hbase-spark/target folder. Moving to writing the code.

Spark job

Clone the following project and open it in your favorite IDE. You can create a project from scratch. Get the necessary code and dependencies from the repo.

git clone https://github.com/ayblbd/spark-boilerplate.git

Add the connector to your project by pasting the complied jar spark/hbase-spark/target/hbase-spark-1.0.1-SNAPSHOT.jar in the lib folder at the root of the project. sbt automatically adds it to the CLASSPATH.

Then add the following dependency to your build.sbt file.

libraryDependencies ++= Seq(
  "org.apache.hbase" % "hbase-client" % "2.3.0",
  "org.apache.hbase" % "hbase-server" % "2.3.0",
  "org.apache.hbase" % "hbase-mapreduce" % "2.3.0"
)

The following unit test creates a DataFrame and writes it to HBase:

// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
import org.apache.spark.sql.SparkSession

class HBaseWriterTest extends BaseTest {
  "dataframe" should "be written to hbase" in {
    import spark.implicits._

    val df = Seq(
      ("KEY-1", "foo"),
      ("KEY-2", "bar"),
      ("KEY-3", "baz")
    ).toDF("key", "name")

    val options = Map(
      ("hbase.table", "test")
    )

    df.write
      .format("org.apache.hadoop.hbase.spark")
      .option("hbase.spark.use.hbasecontext", value = false)
      .option(
        "hbase.columns.mapping",
        f"key STRING :key, name STRING cf:name"
      )
      .options(options)
      .save()
  }

}

The hbase.columns.mapping option serves to map the columns of the DataFrame to the columns of the HBase table.

The format is columnFamily:columnName. The columnFamily is the column family of the HBase table. The columnName is the column name of the HBase table. If the columnName isn’t specified, the columnName is the same as the DataFrame column name. If the columnFamily isn’t specified, the columnFamily is the same as the hbase.table option.

Run the test. Using your IDE, or sbt test in the terminal.

Note

If you are running into this error: Can not resolve hbase, please check your network.
You should add the line '127.0.0.1 hbase' to your /etc/hosts file. hbase is the hostname of your HBase container as specified in the docker run command.

After running the test, verify the content of the HBase table:

hbase(main):001:0> scan 'test'
ROW                                      COLUMN+CELL
KEY-1                                    column=cf:name, timestamp=1636700000000, value=foo
KEY-2                                    column=cf:name, timestamp=1636700000000, value=bar
KEY-3                                    column=cf:name, timestamp=1636700000000, value=baz
3 row(s)
Took 0.0110 seconds

And you just did it. You have written your DataFrame to HBase.

You can also read the data from HBase if you want. Change the test to also read that data from HBase by adding the following lines:

// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
...

df.write
  .format("org.apache.hadoop.hbase.spark")
  .option("hbase.spark.use.hbasecontext", value = false)
  .option(
    "hbase.columns.mapping",
    f"key STRING :key, name STRING cf:name"
  )
  .options(options)
  .save()

val actual = spark.read
  .format("org.apache.hadoop.hbase.spark")
  .option(
    "hbase.columns.mapping",
    f"key STRING :key, name STRING cf:name"
  )
  .options(options)
  .load()

assertDataFrameEquality(actual, df)

...

Run the test and it should pass.

The most important detail to note here is that the hbase.columns.mapping option is the same for both the write and the read operations.

.option(
  "hbase.columns.mapping",
  f"key STRING :key, name STRING cf:name"
)

You have just written and read a DataFrame from HBase. But, you can make your code more generic to write any DataFrame to HBase.

Optimizations

To make the code more generic, write the code to generate the hbase.columns.mapping option dynamically from your DataFrame schema:

// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
package me.ayoublabiad.io.hbase

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame}

object HBaseWriter {

  def hbaseColumnsMapping(df: DataFrame, cf: String): String =
    df.schema.fields
      .filter(_.name != "key")
      .map {
        case StructField(c: String, _: DoubleType, _, _) =>
          f"""$c DOUBLE $cf:$c"""
        case StructField(c: String, _: IntegerType, _, _) =>
          f"""$c INTEGER $cf:$c"""
        case StructField(c: String, _: LongType, _, _) =>
          f"""$c LONG $cf:$c"""
        case StructField(c: String, _: BooleanType, _, _) =>
          f"""$c BOOLEAN $cf:$c"""
        case sf @ (StructField(_: String, _: ArrayType, _, _) | StructField(_: String, _: ByteType, _, _)) =>
          f"""${sf.name} BINARY $cf:${sf.name}"""
        case StructField(c: String, _: StringType, _, _) =>
          f"""$c STRING $cf:$c"""
        case StructField(c: String, _: FloatType, _, _) =>
          f"""$c FLOAT $cf:$c"""
        case StructField(c: String, _: ShortType, _, _) =>
          f"""$c SHORT $cf:$c"""
        case StructField(c: String, _: DateType, _, _) =>
          f"""$c DATE $cf:$c"""
        case StructField(c: String, _: TimestampType, _, _) =>
          f"""$c TIMESTAMP $cf:$c"""
        case StructField(c: String, _: DecimalType, _, _) =>
          f"""$c STRING $cf:$c"""
      }
      .mkString(",")
}

Warning

We cast DecimalType into string to avoid precision loss.

Now, you can use the hbaseColumnsMapping method to generate the hbase.columns.mapping option. The key column is the rowkey of the HBase table. The assumtion is that the DataFrame has a key column and of type StringType.

Change the test to use the hbaseColumnsMapping method:

// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
...

df.write
  .format("org.apache.hadoop.hbase.spark")
  .option("hbase.spark.use.hbasecontext", value = false)
  .option(
    "hbase.columns.mapping",
    s"key STRING :key, ${hbaseColumnsMapping(df, "cf")}"
  )
  .options(options)
  .save()

val actual = spark.read
  .format("org.apache.hadoop.hbase.spark")
  .option(
    "hbase.columns.mapping",
    s"key STRING :key, ${hbaseColumnsMapping(df, "cf")}"
  )
  .options(options)
  .load()
...

You can sense the power of the hbaseColumnsMapping method by adding new columns to the DataFrame and writing it to HBase.

Notice that the column key is present in the DataFrame as a string.

// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
val df = createDataFrame(
  Map(
    "key" -> "KEY-1",
    "integer" -> 1,
    "long" -> 2L,
    "boolean" -> true,
    "double" -> 5555.5555d,
    "float" -> 6666.7777f,
    "string" -> "string",
    "date" -> "2012-12-12",
    "timestamp" -> "2012-12-13 00:10:00",
    "decimal" -> "88888888888888888888888.88888888888"
  )
).withColumn("integer", col("integer").cast(IntegerType))
  .withColumn("date", col("date").cast(DateType))
  .withColumn("timestamp", col("timestamp").cast(TimestampType))
  .withColumn("decimal", col("decimal").cast(DecimalType(38, 18)))

What if you want to do something like df.toHbase() instead of duplicating the write code all over the place?

You can use the Pimp my Library pattern to achieve that.

// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
import ...

object HBaseWriter {
  implicit class HBaseWriterOps(df: DataFrame) {
    def toHbase(cf: String = "d", options: Map[String, String]): Unit =
      df.write
        .format("org.apache.hadoop.hbase.spark")
        .option("hbase.spark.use.hbasecontext", value = false)
        .option(
          "hbase.columns.mapping",
          f"key STRING :key, ${hbaseColumnsMapping(df, cf)}"
        )
        .options(options)
        .save()
  }

  def hbaseColumnsMapping(df: DataFrame, cf: String): String =
    ...
}

This way you can shave off some lines from your test:

// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
df.write
  .format("org.apache.hadoop.hbase.spark")
  .option("hbase.spark.use.hbasecontext", value = false)
  .option(
    "hbase.columns.mapping",
    s"key STRING :key, ${hbaseColumnsMapping(df, "cf")}"
  )
  .options(options)
  .save()

becomes:

// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
import me.ayoublabiad.io.hbase.HBaseWriter._
df.toHbase("cf", options)

As explained in the earlier section, The assumption is that the column key exists and of type StringType. You make it easier to build the key column, using something like df.buildKey("id", "CONSTANT").

The changes to the code should look like this:

// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
import ...
object HBaseWriter {
  implicit class HBaseWriterOps(df: DataFrame) {
    def buildKey(columns: String*): DataFrame =
      df.withColumn(
        "key",
        concat_ws(
          "-",
          stringToColumn(df.columns, columns): _*
        )
      )
      
    def toHbase(cf: String = "d", options: Map[String, String]): Unit = ...
  }
  ...
}

This function adds a new column to the DataFrame with the name key and the value {id}-CONSTANT.

The stringToColumn function on line 10 converts Seq[String] to a Seq[Column]. If the column exists in the DataFrame, it uses the col function to get the Column object. Otherwise, it uses the lit function to create a Column object with that value. You can use this function to create a key column using constant values or column values.

You can implement the stringToColumn function as follows:

// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
 def stringToColumn(
    dfColumns: Seq[String],
    columns: Seq[String]
  ): Seq[Column] =
    columns.map(c => if (dfColumns.contains(c)) col(c) else lit(c))

To avoid the nulls in HBase key, you should add a line of defense to the buildKey function. Before you create the key, you should filter out the nulls and empty values:

// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
def filterKeyEmptyNullValues(df: DataFrame, columns: Seq[String]): DataFrame =
    columns
      .filter(c => df.columns.contains(c))
      .foldLeft(df)((df, c) => df.filter(trim(col(c)) =!= lit("")))

Put it all together and you get the following code:

// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
import ...

object HBaseWriter {
  implicit class HBaseWriterOps(df: DataFrame) {
    def buildKey(columns: String*): DataFrame =
      filterKeyEmptyNullValues(df, columns).withColumn(
        "key",
        concat_ws(
          "-",
          stringToColumn(df.columns, columns): _*
        )
      )

    def toHbase(cf: String = "d", options: Map[String, String]): Unit = ...
  }

  def stringToColumn(
    dfColumns: Seq[String],
    columns: Seq[String]
  ): Seq[Column] =
    columns.map(c => if (dfColumns.contains(c)) col(c) else lit(c))

  def filterKeyEmptyNullValues(df: DataFrame, columns: Seq[String]): DataFrame =
    columns
      .filter(c => df.columns.contains(c))
      .foldLeft(df)((df, c) => df.filter(trim(col(c)) =!= lit("")))

  def hbaseColumnsMapping(df: DataFrame, cf: String): String = ...
}

Back to the test, use the buildKey function to build the key column:

// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
...
import me.ayoublabiad.io.hbase.HBaseWriter._

val df = createDataFrame(
  Map(
    "integer" -> 1,
    "long" -> 2L,
    "boolean" -> true,
    "double" -> 5555.5555d,
    "float" -> 6666.7777f,
    "string" -> "string",
    "date" -> "2012-12-12",
    "timestamp" -> "2012-12-13 00:10:00",
    "decimal" -> "88888888888888888888888.88888888888"
  )
).withColumn("integer", col("integer").cast(IntegerType))
  .withColumn("date", col("date").cast(DateType))
  .withColumn("timestamp", col("timestamp").cast(TimestampType))
  .withColumn("decimal", col("decimal").cast(DecimalType(38, 18)))

val options = Map(
  ("hbase.table", "test")
)

df.buildKey("KEY", "integer").toHbase("cf", options)

...

And it works.

hbase(main):001:0> scan 'test'
ROW                             COLUMN+CELL
 KEY-1                          column=cf:boolean, timestamp=1668853442682, value=\xFF
 KEY-1                          column=cf:date, timestamp=1668853442682, value=\x00\x00\x01;\x8Ci(\x00
 KEY-1                          column=cf:decimal, timestamp=1668799664987, value=88888888888888888888888.88888
 KEY-1                          column=cf:double, timestamp=1668853442682, value=@\xB5\xB3\x8E5?|\xEE
 KEY-1                          column=cf:float, timestamp=1668853442682, value=@\xBA\x0A\xC7 \x00\x00\x00
 KEY-1                          column=cf:integer, timestamp=1668853442682, value=\x00\x00\x00\x01
 KEY-1                          column=cf:long, timestamp=1668853442682, value=\x00\x00\x00\x00\x00\x00\x00\x02
 KEY-1                          column=cf:string, timestamp=1668853442682, value=string
 KEY-1                          column=cf:timestamp, timestamp=1668853442682, value=\x00\x00\x01;\x91\x98\xAB\xC0
1 row(s)
Took 0.5324 seconds

You have now a clean API to write data to HBase. All you need to do is add import me.ayoublabiad.io.hbase.HBaseWriter._ to your code and you can use the buildKey and toHbase functions.

All that’s left is to deploy your code to your cluster and run it.

Deploying the job

To deploy this job to production, you need to create a jar file and run it using spark-submit. To build a fat jar, use the sbt-assembly plugin. Add the following line to your project/plugins.sbt file:

// project/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")

Then build the jar file:

sbt assembly

If you run into a deduplication error, add the following line to your build.sbt file:

assemblyMergeStrategy in assembly := {
  case PathList("META-INF", xs @ _*) => MergeStrategy.discard
  case x => MergeStrategy.first
}

Then, run the spark job:

spark-submit --class me.ayoublabiad.Main --master yarn -mode client target/scala-2.11/spark-boilerplate-assembly-0.1.jar

You need to give the hbase-site.xml file to your Spark job. You can find it in here /etc/hbase/conf/hbase-site.xml. Then add it to your Spark job using the --files option:

spark-submit --class me.ayoublabiad.Main \
--master yarn \
--mode client \
--files hbase-site.xml \
target/scala-2.11/spark-boilerplate-assembly-0.1.jar 

Server-side configuration

According to the documentation you need to add following jars to the CLASSPATH of the HBase region servers:

Note

The CLASSPATH of the HBase region server is the lib folder in of the HBase installation directory. For example in /opt/hbase/lib/ or /usr/lib/hbase/lib/.

Add the jars to the CLASSPATH of the HBase region servers by copying the jars to the lib directory of the HBase installation.

If you can’t perform the server-side configuration, consider using:

.option("hbase.spark.pushdown.columnfilter", false)

This turn off the pushdown of the filters to the HBase region servers.

And now, you have your Spark job writing data to HBase.