Writing Spark DataFrame to HBase
Have you ever wondered how could you write your DataFrame
to HBase? You are in luck. This post shows you how to do it.
You can find the code in this repo if you don’t feel like following along. You can also find the breakdown of the unit testing part of that code in this post.
First, you should start by setting up your environment.
Getting HBase up & running
To prepare the HBase environment, use the HBase Docker image from Dajobe.
docker run -d --name hbase -p 2181:2181 -p 8080:8080 -p 9090:9090 -p 9095:9095 -p 16000:16000 -p 16010:16010 -p 16201:16201 -p 16301:16301 -p 16020:16020 --hostname hbase dajobe/hbase
The preceding command starts the HBase container. Check the status of the container by running docker ps
.
To open the HBase shell inside the container, run the following command:
docker exec -it hbase hbase shell
List the tables in HBase:
hbase(main):006:0> list
TABLE
0 row(s)
Took 0.4777 seconds
=> []
To create a test
table in HBase with column family cf
:
hbase(main):006:0> create 'test', 'cf'
Created table test
Took 0.2019 seconds
=> Hbase::Table - test
After creating the HBase container, time to build the HBase connector for Spark.
Building the HBase Connector
Building you said? What about maven repository?
The latest version available in the maven repository hasn’t received an update in while. It doesn’t contain the latest updates and fixes. Hence, building the jar
.
First, clone the hbase-connectors repo:
git clone https://github.com/apache/hbase-connectors.git
cd hbase-connectors/spark
Per the documentation, launch the maven command with the following parameters:
# hbase-connectors/spark
mvn -D spark.version=2.4.8 -Dscala.version=2.11.12 -Dhadoop.profile=2.0 -Dscala.binary.version=2.11 -Dhbase.version=2.3.0 clean install -DskipTests
The -DskipTests
flag sneaked into the command. Remove it if you want to run the tests.
Even though you use HBase 2.1.2
in the container, compiling the connector with HBase 2.3.0
works.
If you meet the following error:
An API incompatibility was encountered while executing net.alchim31.maven:scala-maven-plugin:4.3.0:compile: java.lang.NoSuchMethodError: org.fusesource.jansi.AnsiConsole.wrapOutputStream(Ljava/io/OutputStream;)Ljava/io/OutputStream;
Bump the version of the scala-maven-plugin
to 4.7.2
in the pom.xml
file inside the hbase-connectors/spark/hbase-spark
folder.
<!--hbase-connectors/spark/hbase-spark/pom.xml-->
...
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<version>4.7.2</version>
...
If you meet the following error:
(checkstyle) on project hbase-spark: Failed during checkstyle configuration: cannot initialize module TreeWalker - TreeWalker is not allowed as a parent of LineLength Please review 'Parent Module' section for this Check in web documentation if Check is standard.
Remove the following lines from the file hbase-connectors/spark/hbase-spark/pom.xml
:
<!--hbase-connectors/spark/hbase-spark/pom.xml-->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
Now, you should have the hbase-spark-1.0.1-SNAPSHOT.jar
in the hbase-connectors/spark/hbase-spark/target
folder.
Moving to writing the code.
Spark job
Clone the following project and open it in your favorite IDE. You can create a project from scratch. Get the necessary code and dependencies from the repo.
git clone https://github.com/ayblbd/spark-boilerplate.git
Add the connector to your project by pasting the complied jar spark/hbase-spark/target/hbase-spark-1.0.1-SNAPSHOT.jar
in the lib
folder at the root of the project. sbt
automatically adds it to the CLASSPATH.
Then add the following dependency to your build.sbt
file.
libraryDependencies ++= Seq(
"org.apache.hbase" % "hbase-client" % "2.3.0",
"org.apache.hbase" % "hbase-server" % "2.3.0",
"org.apache.hbase" % "hbase-mapreduce" % "2.3.0"
)
The following unit test creates a DataFrame and writes it to HBase:
// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
import org.apache.spark.sql.SparkSession
class HBaseWriterTest extends BaseTest {
"dataframe" should "be written to hbase" in {
import spark.implicits._
val df = Seq(
("KEY-1", "foo"),
("KEY-2", "bar"),
("KEY-3", "baz")
).toDF("key", "name")
val options = Map(
("hbase.table", "test")
)
df.write
.format("org.apache.hadoop.hbase.spark")
.option("hbase.spark.use.hbasecontext", value = false)
.option(
"hbase.columns.mapping",
f"key STRING :key, name STRING cf:name"
)
.options(options)
.save()
}
}
The hbase.columns.mapping
option serves to map the columns of the DataFrame
to the columns of the HBase table.
The format is columnFamily:columnName
. The columnFamily
is the column family of the HBase table. The columnName
is the column name of the HBase table. If the columnName
isn’t specified, the columnName
is the same as the DataFrame
column name. If the columnFamily
isn’t specified, the columnFamily
is the same as the hbase.table
option.
Run the test. Using your IDE, or sbt test
in the terminal.
Note
If you are running into this error: Can not resolve hbase, please check your network.
You should add the line '127.0.0.1 hbase' to your /etc/hosts file. hbase is the hostname of your HBase container as specified in the docker run command.
After running the test, verify the content of the HBase table:
hbase(main):001:0> scan 'test'
ROW COLUMN+CELL
KEY-1 column=cf:name, timestamp=1636700000000, value=foo
KEY-2 column=cf:name, timestamp=1636700000000, value=bar
KEY-3 column=cf:name, timestamp=1636700000000, value=baz
3 row(s)
Took 0.0110 seconds
And you just did it. You have written your DataFrame
to HBase.
You can also read the data from HBase if you want. Change the test to also read that data from HBase by adding the following lines:
// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
...
df.write
.format("org.apache.hadoop.hbase.spark")
.option("hbase.spark.use.hbasecontext", value = false)
.option(
"hbase.columns.mapping",
f"key STRING :key, name STRING cf:name"
)
.options(options)
.save()
val actual = spark.read
.format("org.apache.hadoop.hbase.spark")
.option(
"hbase.columns.mapping",
f"key STRING :key, name STRING cf:name"
)
.options(options)
.load()
assertDataFrameEquality(actual, df)
...
Run the test and it should pass.
The most important detail to note here is that the hbase.columns.mapping
option is the same for both the write
and the read
operations.
.option(
"hbase.columns.mapping",
f"key STRING :key, name STRING cf:name"
)
You have just written and read a DataFrame
from HBase. But, you can make your code more generic to write any DataFrame
to HBase.
Optimizations
To make the code more generic, write the code to generate the hbase.columns.mapping
option dynamically from your DataFrame
schema:
// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
package me.ayoublabiad.io.hbase
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.{DataFrame}
object HBaseWriter {
def hbaseColumnsMapping(df: DataFrame, cf: String): String =
df.schema.fields
.filter(_.name != "key")
.map {
case StructField(c: String, _: DoubleType, _, _) =>
f"""$c DOUBLE $cf:$c"""
case StructField(c: String, _: IntegerType, _, _) =>
f"""$c INTEGER $cf:$c"""
case StructField(c: String, _: LongType, _, _) =>
f"""$c LONG $cf:$c"""
case StructField(c: String, _: BooleanType, _, _) =>
f"""$c BOOLEAN $cf:$c"""
case sf @ (StructField(_: String, _: ArrayType, _, _) | StructField(_: String, _: ByteType, _, _)) =>
f"""${sf.name} BINARY $cf:${sf.name}"""
case StructField(c: String, _: StringType, _, _) =>
f"""$c STRING $cf:$c"""
case StructField(c: String, _: FloatType, _, _) =>
f"""$c FLOAT $cf:$c"""
case StructField(c: String, _: ShortType, _, _) =>
f"""$c SHORT $cf:$c"""
case StructField(c: String, _: DateType, _, _) =>
f"""$c DATE $cf:$c"""
case StructField(c: String, _: TimestampType, _, _) =>
f"""$c TIMESTAMP $cf:$c"""
case StructField(c: String, _: DecimalType, _, _) =>
f"""$c STRING $cf:$c"""
}
.mkString(",")
}
Warning
We cast DecimalType into string to avoid precision loss.
Now, you can use the hbaseColumnsMapping
method to generate the hbase.columns.mapping
option. The key
column is the rowkey
of the HBase table.
The assumtion is that the DataFrame
has a key
column and of type StringType
.
Change the test to use the hbaseColumnsMapping
method:
// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
...
df.write
.format("org.apache.hadoop.hbase.spark")
.option("hbase.spark.use.hbasecontext", value = false)
.option(
"hbase.columns.mapping",
s"key STRING :key, ${hbaseColumnsMapping(df, "cf")}"
)
.options(options)
.save()
val actual = spark.read
.format("org.apache.hadoop.hbase.spark")
.option(
"hbase.columns.mapping",
s"key STRING :key, ${hbaseColumnsMapping(df, "cf")}"
)
.options(options)
.load()
...
You can sense the power of the hbaseColumnsMapping
method by adding new columns to the DataFrame
and writing it to HBase.
Notice that the column key
is present in the DataFrame
as a string
.
// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
val df = createDataFrame(
Map(
"key" -> "KEY-1",
"integer" -> 1,
"long" -> 2L,
"boolean" -> true,
"double" -> 5555.5555d,
"float" -> 6666.7777f,
"string" -> "string",
"date" -> "2012-12-12",
"timestamp" -> "2012-12-13 00:10:00",
"decimal" -> "88888888888888888888888.88888888888"
)
).withColumn("integer", col("integer").cast(IntegerType))
.withColumn("date", col("date").cast(DateType))
.withColumn("timestamp", col("timestamp").cast(TimestampType))
.withColumn("decimal", col("decimal").cast(DecimalType(38, 18)))
What if you want to do something like df.toHbase()
instead of duplicating the write code all over the place?
You can use the Pimp my Library pattern to achieve that.
// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
import ...
object HBaseWriter {
implicit class HBaseWriterOps(df: DataFrame) {
def toHbase(cf: String = "d", options: Map[String, String]): Unit =
df.write
.format("org.apache.hadoop.hbase.spark")
.option("hbase.spark.use.hbasecontext", value = false)
.option(
"hbase.columns.mapping",
f"key STRING :key, ${hbaseColumnsMapping(df, cf)}"
)
.options(options)
.save()
}
def hbaseColumnsMapping(df: DataFrame, cf: String): String =
...
}
This way you can shave off some lines from your test:
// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
df.write
.format("org.apache.hadoop.hbase.spark")
.option("hbase.spark.use.hbasecontext", value = false)
.option(
"hbase.columns.mapping",
s"key STRING :key, ${hbaseColumnsMapping(df, "cf")}"
)
.options(options)
.save()
becomes:
// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
import me.ayoublabiad.io.hbase.HBaseWriter._
df.toHbase("cf", options)
As explained in the earlier section, The assumption is that the column key
exists and of type StringType
. You make it easier to build the key
column, using something like df.buildKey("id", "CONSTANT")
.
The changes to the code should look like this:
// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
import ...
object HBaseWriter {
implicit class HBaseWriterOps(df: DataFrame) {
def buildKey(columns: String*): DataFrame =
df.withColumn(
"key",
concat_ws(
"-",
stringToColumn(df.columns, columns): _*
)
)
def toHbase(cf: String = "d", options: Map[String, String]): Unit = ...
}
...
}
This function adds a new column to the DataFrame
with the name key
and the value {id}-CONSTANT
.
The stringToColumn
function on line 10 converts Seq[String]
to a Seq[Column]
. If the column exists in the DataFrame
, it uses the col
function to get the Column
object. Otherwise, it uses the lit
function to create a Column
object with that value. You can use this function to create a key
column using constant values or column values.
You can implement the stringToColumn
function as follows:
// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
def stringToColumn(
dfColumns: Seq[String],
columns: Seq[String]
): Seq[Column] =
columns.map(c => if (dfColumns.contains(c)) col(c) else lit(c))
To avoid the null
s in HBase key, you should add a line of defense to the buildKey
function. Before you create the key, you should filter out the null
s and empty values:
// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
def filterKeyEmptyNullValues(df: DataFrame, columns: Seq[String]): DataFrame =
columns
.filter(c => df.columns.contains(c))
.foldLeft(df)((df, c) => df.filter(trim(col(c)) =!= lit("")))
Put it all together and you get the following code:
// src/main/scala/me/ayoublabiad/io/hbase/HBaseWriter.scala
import ...
object HBaseWriter {
implicit class HBaseWriterOps(df: DataFrame) {
def buildKey(columns: String*): DataFrame =
filterKeyEmptyNullValues(df, columns).withColumn(
"key",
concat_ws(
"-",
stringToColumn(df.columns, columns): _*
)
)
def toHbase(cf: String = "d", options: Map[String, String]): Unit = ...
}
def stringToColumn(
dfColumns: Seq[String],
columns: Seq[String]
): Seq[Column] =
columns.map(c => if (dfColumns.contains(c)) col(c) else lit(c))
def filterKeyEmptyNullValues(df: DataFrame, columns: Seq[String]): DataFrame =
columns
.filter(c => df.columns.contains(c))
.foldLeft(df)((df, c) => df.filter(trim(col(c)) =!= lit("")))
def hbaseColumnsMapping(df: DataFrame, cf: String): String = ...
}
Back to the test, use the buildKey
function to build the key column:
// src/test/scala/me/ayoublabiad/io/hbase/HBaseWriterTest.scala
...
import me.ayoublabiad.io.hbase.HBaseWriter._
val df = createDataFrame(
Map(
"integer" -> 1,
"long" -> 2L,
"boolean" -> true,
"double" -> 5555.5555d,
"float" -> 6666.7777f,
"string" -> "string",
"date" -> "2012-12-12",
"timestamp" -> "2012-12-13 00:10:00",
"decimal" -> "88888888888888888888888.88888888888"
)
).withColumn("integer", col("integer").cast(IntegerType))
.withColumn("date", col("date").cast(DateType))
.withColumn("timestamp", col("timestamp").cast(TimestampType))
.withColumn("decimal", col("decimal").cast(DecimalType(38, 18)))
val options = Map(
("hbase.table", "test")
)
df.buildKey("KEY", "integer").toHbase("cf", options)
...
And it works.
hbase(main):001:0> scan 'test'
ROW COLUMN+CELL
KEY-1 column=cf:boolean, timestamp=1668853442682, value=\xFF
KEY-1 column=cf:date, timestamp=1668853442682, value=\x00\x00\x01;\x8Ci(\x00
KEY-1 column=cf:decimal, timestamp=1668799664987, value=88888888888888888888888.88888
KEY-1 column=cf:double, timestamp=1668853442682, value=@\xB5\xB3\x8E5?|\xEE
KEY-1 column=cf:float, timestamp=1668853442682, value=@\xBA\x0A\xC7 \x00\x00\x00
KEY-1 column=cf:integer, timestamp=1668853442682, value=\x00\x00\x00\x01
KEY-1 column=cf:long, timestamp=1668853442682, value=\x00\x00\x00\x00\x00\x00\x00\x02
KEY-1 column=cf:string, timestamp=1668853442682, value=string
KEY-1 column=cf:timestamp, timestamp=1668853442682, value=\x00\x00\x01;\x91\x98\xAB\xC0
1 row(s)
Took 0.5324 seconds
You have now a clean API to write data to HBase. All you need to do is add import me.ayoublabiad.io.hbase.HBaseWriter._
to your code and you can use the buildKey
and toHbase
functions.
All that’s left is to deploy your code to your cluster and run it.
Deploying the job
To deploy this job to production, you need to create a jar file and run it using spark-submit
. To build a fat jar, use the sbt-assembly
plugin. Add the following line to your project/plugins.sbt
file:
// project/plugins.sbt
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
Then build the jar file:
sbt assembly
If you run into a deduplication error, add the following line to your build.sbt
file:
assemblyMergeStrategy in assembly := {
case PathList("META-INF", xs @ _*) => MergeStrategy.discard
case x => MergeStrategy.first
}
Then, run the spark job:
spark-submit --class me.ayoublabiad.Main --master yarn -mode client target/scala-2.11/spark-boilerplate-assembly-0.1.jar
You need to give the hbase-site.xml
file to your Spark job. You can find it in here /etc/hbase/conf/hbase-site.xml
. Then add it to your Spark job using the --files
option:
spark-submit --class me.ayoublabiad.Main \
--master yarn \
--mode client \
--files hbase-site.xml \
target/scala-2.11/spark-boilerplate-assembly-0.1.jar
Server-side configuration
According to the documentation you need to add following jar
s to the CLASSPATH of the HBase region servers:
scala-library
: The Scala library version must match the Scala version (2.11 or 2.12) used for compiling the connector. Which isscala-library-2.11.12.jar
in this case.hbase-spark
:hbase-connectors/spark/hbase-spark/hbase-spark-1.0.1-SNAPSHOT.jar
is the jar file you built in the earlier step.hbase-spark-protocol-shaded
:hbase-connectors/spark/hbase-spark-protocol-shaded/hbase-spark-protocol-shaded-1.0.1-SNAPSHOT.jar
is the jar file you built in the earlier step.
Note
The CLASSPATH of the HBase region server is the lib folder in of the HBase installation directory. For example in /opt/hbase/lib/ or /usr/lib/hbase/lib/.
Add the jars to the CLASSPATH
of the HBase region servers by copying the jar
s to the lib
directory of the HBase installation.
If you can’t perform the server-side configuration, consider using:
.option("hbase.spark.pushdown.columnfilter", false)
This turn off the pushdown of the filters to the HBase region servers.
And now, you have your Spark job writing data to HBase.