import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit
Instructions Not Clear; No Hoodies Found
The data landscape has moved along since the introduction of Hadoop. And, sadly I had been falling ever so further behind in this discussion of where and how data should be deposited due to other competing priorities. But at last, after all the talks of Warehouse this and Lake that I decided to start learning these tools for myself.
Among these, I thought Apache Hudi looked approachable and usable out of the box. Initially, I mistook how this was setup as I was expecting this to replace Hadoop as a technology. Hadoop serves as a distributed File System to store large amounts of data. When setting that up, you would initalize the clustered file system and it would sit between the Container or OS filesystem and something like Apache Spark which would access the data there. However, this technology actually maanges the data and not where it sits at rest so it is not a replacement for Apache Hadoop.
I started with the documentation page and the example code worked while running in the console. Setting this up in a Notebook did not work out of the box at all. Since my routine now focuses around working with Notebooks, I needed this working in a Notebook: and, here we are.
Working Backwards.
Looking at the command in the example:
pyspark
--packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.0
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
… most of these are configuration calls which are easy to setup when starting up a spark cluster. But, this --packages
call is unfamilar to me. While I have setup and configured Spark Clusters in the past, I have never used any Third party tools along with it. Sparks has now grown up to the point where there are lots of 3rd party packages you can install and this is the interface to tell Apache Spark to download and include in the classpaths when starting up. However, when doing this via a Notebook you will need to specify where Spark should look to load the Jar Files. Thankfully, when you download the file via the console with --packages
is will install the file locally for you in a hidden directory. For me, this was in a new directory called .ivy2
.
What is Ivy?
Naturally, this caused me to ask the question: What the heck is Ivy”? It is a an Agile Dependency Manager which I also did not know existed. This looks like a General Purpose Dependecy Manger which is suposed to act like Rust’s cargo but for other languages. The formatting is in XML - which I am not a fan of - and it primarily serves Java projects - which I am also not a fan of. But, that at least solves what this is: A General Purpose (But really Java) Dependency and Build Manager.
Back To The Example
So, now that we’ve downloaded the JAR file we can simply ask to start a session and use it right? After all, the .~/.ivy2
should be the default directory when looking for jars so if we ask it to include the classes maybe it will work?
= (
spark "Testing_Hudi") # Name the Application
SparkSession.builder.appName('spark.log.level', 'ERROR') # Don't Drown me bro
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.config(
.getOrCreate() )
23/11/15 02:07:04 WARN Utils: Your hostname, hermione resolves to a loopback address: 127.0.1.1; using 192.168.40.147 instead (on interface eno1)
23/11/15 02:07:04 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
23/11/15 02:07:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/11/15 02:07:05 WARN SparkSession: Cannot use org.apache.spark.sql.hudi.HoodieSparkSessionExtension to configure session extensions.
java.lang.ClassNotFoundException: org.apache.spark.sql.hudi.HoodieSparkSessionExtension
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.util.Utils$.classForName(Utils.scala:218)
at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1221)
at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1219)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1219)
at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:106)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
Looks like not: it does not know to look for the class in that directory by default.
There are Configuration Option!
If we check the Spark documentation again, there are config options for Ivy! So, let’s try to use those - with the other added configuration options:
# Stop the old session
spark.stop()
= (
spark "Testing_Hudi") # Name the Application
SparkSession.builder.appName('spark.log.level', 'ERROR') # Don't Drown me bro
.config('spark.jars.ivy', '/home/ranuse/.ivy2/jars') # This is where to find the .jar please
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.config(
.getOrCreate() )
23/11/15 02:07:06 WARN SparkSession: Cannot use org.apache.spark.sql.hudi.HoodieSparkSessionExtension to configure session extensions.
java.lang.ClassNotFoundException: org.apache.spark.sql.hudi.HoodieSparkSessionExtension
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.spark.util.Utils$.classForName(Utils.scala:218)
at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1(SparkSession.scala:1221)
at org.apache.spark.sql.SparkSession$.$anonfun$applyExtensions$1$adapted(SparkSession.scala:1219)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.sql.SparkSession$.org$apache$spark$sql$SparkSession$$applyExtensions(SparkSession.scala:1219)
at org.apache.spark.sql.SparkSession.<init>(SparkSession.scala:106)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at java.base/jdk.internal.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.base/java.lang.reflect.Constructor.newInstance(Constructor.java:490)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:247)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
at py4j.Gateway.invoke(Gateway.java:238)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:80)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:69)
at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
at java.base/java.lang.Thread.run(Thread.java:829)
Still not working. Back to the documentation again. Let us try to manually tell it where the JAR files are and hopefully it works this time. There is an option for this in the documentation called sparks.jar
so let’s pass this along.
# Stop the old session
spark.stop()
= (
spark "Testing_Hudi")
SparkSession.builder.appName('spark.log.level', 'ERROR')
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog")
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension")
.config("spark.kryo.registrator", "org.apache.spark.HoodieSparkKryoRegistrar")
.config(
.config("spark.jars", # Please look here and include this file:
"/home/ranuse/.ivy2/jars/org.apache.hudi_hudi-spark3.3-bundle_2.12-0.14.0.jar"
)
.getOrCreate() )
# Some Hudi options
= "trips_table"
tableName = "file:///tmp/trips_table"
basePath
# Our pretend Data:
= ["ts","uuid","rider","driver","fare","city"]
columns =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
data 1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-F","driver-P",34.15,"sao_paulo"),
(1695115999911,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa","rider-J","driver-T",17.85,"chennai")]
(= spark.createDataFrame(data).toDF(*columns)
inserts
= {
hudi_options 'hoodie.table.name': tableName,
'hoodie.datasource.write.partitionpath.field': 'city'
}
# Write the table to disk
format("hudi"). \
inserts.write.**hudi_options). \
options("overwrite"). \
mode( save(basePath)
23/11/15 02:30:38 WARN DFSPropertiesConfiguration: Cannot find HUDI_CONF_DIR, please set it as the dir of hudi-defaults.conf
23/11/15 02:30:38 WARN DFSPropertiesConfiguration: Properties file file:/etc/hudi/conf/hudi-defaults.conf not found. Ignoring to load props file
23/11/15 02:30:38 WARN HoodieSparkSqlWriter$: Choosing BULK_INSERT as the operation type since auto record key generation is applicable
23/11/15 02:30:38 WARN HoodieSparkSqlWriter$: hoodie table at file:/tmp/trips_table already exists. Deleting existing data & overwriting with new data.
23/11/15 02:30:38 WARN AutoRecordKeyGenerationUtils$: Precombine field ts will be ignored with auto record key generation enabled
23/11/15 02:30:39 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-hbase.properties,hadoop-metrics2.properties
# WARNING: Unable to get Instrumentation. Dynamic Attach failed. You may add this JAR as -javaagent manually, or supply -Djdk.attach.allowAttachSelf
# WARNING: Unable to attach Serviceability Agent. Unable to attach even with module exceptions: [org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed., org.apache.hudi.org.openjdk.jol.vm.sa.SASupportException: Sense failed.]
There we go. You might have to fight with the JVM still though; I certainly did. Now we can finally walk through the example on the Documentation page.
# Load the data:
= spark.read.format("hudi").load(basePath)
tripsDF "trips_table") tripsDF.createOrReplaceTempView(
# query the data via SQL
spark.sql(("SELECT uuid, fare, ts, rider, driver, city "
"FROM trips_table WHERE fare > 20.0"))\
.show()
+--------------------+-----+-------------+-------+--------+-------------+
| uuid| fare| ts| rider| driver| city|
+--------------------+-----+-------------+-------+--------+-------------+
|9909a8b1-2d15-4d3...| 33.9|1695046462179|rider-D|driver-L|san_francisco|
|e96c4396-3fad-413...| 27.7|1695091554788|rider-C|driver-M|san_francisco|
|e3cf430c-889d-401...|34.15|1695516137016|rider-F|driver-P| sao_paulo|
+--------------------+-----+-------------+-------+--------+-------------+
spark.stop()
Conclusion
There we go. If you’re running this in a container of some kind and you have a store of JAR files you can then modify this to include the options you want. In the future we will want to explore more of how this works with real data and not pretend data. Looking over the documentation site, this tool has quite a lot to offer and learn so look forward to more posts in the future about Using Apache Hudi.