I currently work as a Big Data Engineer at the University of St. Gallen. Many researchers work here and are using R to make their research easier. They are familiar with R’s limitations and workarounds.
One of my tasks is introducing SparkR to the researchers. This post gives a short introduction to SparkR and R and clears out any doubt about data frames.
R is a popular tool for statistics and data analysis. It uses dataframes (data.frame), has rich visualization capabilities and many libraries the R community is developing.
The challenge with R is how to make it work on big data; how to use R one huge datasets and on big data clusters.
Spark is a fast and general engine for data processing. It is growing rapidly and has been adopted by many organizations for running faster calculations on big datasets.
SparkR is an R package that provides an interface to use Spark from R. it enables R users to run job on big data clusters with Spark. SparkR API 1.6.0 is available here.
data.frame in R is a list of vectors with equal length.
DataFrame in Spark is a distributed collection of data organized into named columns.
When working with SparkR and R, it is very important to understand that there are two different data frames in question – R data.frame and Spark DataFrame. Proper combination of both is what gets the job done on big data with R.
In practice, the first step is to process the big data using SparkR and its DataFrames. As much as possible is done in this stage (cleaning, filtering, aggregation, various statistical operations).
When the dataset is processed by Spark R it can be collected into an R data.frame. This is done by calling collect() which “transforms” a SparkR DataFrame into an R data.frame. When collect() is called the elements of SparkR DataFrame from all workers are collected and pushed into an R data.frame on the client – where SparkR functionality can be used.
Figure 1
Common question many R users ask is “Can I run collect on all my big data and then do R analysis?” The answer is no. If you do that then you are where you were before looking into SparkR – you are doing all your processing, cleaning, wrangling, data science on your client.
Useful installation posts
How to manually install Spark 1.6.0 on a multinode Hadoop cluster is described here:Installing Apache Spark 1.6.0 on a multinode cluster
How to install SparkR on a multinode Hadoop cluster is described here: Installing R on Hadoop cluster to run sparkR
I am testing SparkR and Pyspark in Zeppelin and the Zeppelin installation process is here: Building Zeppelin-With-R on Spark and Zeppelin
Practical examples
Let us see how this works in practice:
I have a file in Hadoop (HDFS), file size is 1.9 GB, it is a CSV file with something over 20 million rows. Looking at the Figure 1, this file is in the blue box.
I run a read.df() command to load the data from the data source into a DataFrame (orange box in Figure 1).
crsp <- read.df(sqlContext, "/tmp/crsp/AllCRSP.csv", source = "com.databricks.spark.csv", inferSchema = "true", header = "true")
Object crsp is a SparkR object, not an R object, which means I can run SparkR commands on it.
If I run str(crsp) command, which is an R command I get the following:
Formal class ‘DataFrame’ [package “SparkR”] with 2 slots
..@ env:<environment: 0x3b44be0>
..@ sdf:Class ‘jobj’ <environment: 0x2aaf5f8>
This does not look familiar to an R user. Data frame crsp is a SparkR DataFrame.
SparkR DataFrame to R data.frame
Since DataFrame crsp has over 20 million rows, I am going to take a small sample to create a new Dataframe for this example:
df_sample <- sample(crsp, withReplacement = False, fraction = 0.0001)
I have created a new DataFrame called df_frame. It has a bit over 2000 rows and SparkR functionality can be used to manipulate it.
I am going to create an R data.frame out of the df_sample DataFrame:
rdf_sample <- collect(df_sample)
I now have two data frames: SparkR DataFrame called df_sample and R data.frame called rdf_sample.
Running str() on both object gives me the following outputs:
str(df_sample)
Formal class ‘DataFrame’ [package “SparkR”] with 2 slots
..@ env:<environment: 0xdb19d40>
..@ sdf:Class ‘jobj’ <environment: 0xd9b4500>
str(rdf_sample)
‘data.frame’: 2086 obs. of 16 variables:
$ PERMNO : int 15580 59248 90562 15553 11499 61241 14219 14868 14761 11157 …
$ Date : chr “20151120” “20111208” “20061213” “20110318” …
$ SICCD : chr “6320” “2082” “2082” “6719” …
$ TICKER : chr “AAME” “TAP” “TAP” “AGL” …
$ NAICS : int 524113 312120 312120 551112 333411 334413 541511 452910 211111 334511 …
$ PERMCO : int 5 33 33 116 176 211 283 332 352 376 …
$ HSICMG : int 63 20 20 49 35 36 73 54 29 38 …
$ BIDLO : num 4.51 40.74 74 38.75 4.03 …
$ ASKHI : num 4.89 41.26 75 39.4 4.11 …
$ PRC : num 4.69 41.01 -74.5 38.89 4.09 …
$ VOL : int 1572 1228200 0 449100 20942 10046911 64258 100 119798 19900 …
$ SHROUT : int 20547 175544 3073 78000 14289 778060 24925 2020 24165 3728 …
$ CFACPR : num 1 1 2 1 1 1 0.2 1 1 1 …
$ CFACSHR: num 1 1 2 1 1 1 0.2 1 1 1 …
$ OPENPRC: num 4.88 41.23 NA 39.07 4.09 …
$ date : chr “20151120” “20111208” “20061213” “20110318” …
Datatype conversion example
Running a SparkR command on the DataFrame:
df_sample$Date <- cast(df_sample$Date, "string")
Is going to convert the datatype from Integer to String.
Running an R command on it will not be effective:
df_sample$Date <- as.character(df_sample$Date)
Output:
Error in as.character.default(df_sample$Date) :
no method for coercing this S4 class to a vector
Since I have converted the DataFrame df_sample to data.frame rdf_sample, I can now put my R hat on and use R functionalities:
rdf_sample$Date <- as.character(rdf_sample$Date)
R data.frame to SparkR DataFrame
In some cases, you have to go the other way – converting an R data.frame to SparkR DataFrame. This is done by using createDataFrame() method
new_df_sample <- createDataFrame(sqlContext, rdf_sample)
If I run str(new_df_sample) I get the following output:
Formal class ‘DataFrame’ [package “SparkR”] with 2 slots
..@ env:<environment: 0xdd9ddb0>
..@ sdf:Class ‘jobj’ <environment: 0xdd99f40>
Conclusion
Knowing which data frame you are about to manipulate saves you a great deal of trouble. It is important to understand what can SparkR do for you so that you can take the best from both worlds.
SparkR is a newer addition to the Spark family. The community is still small so patience is the key ingredient.
very Informative and thanks for clearing the confusion
LikeLike
Nice article
LikeLike
Thanks for providing a detailed explanation on SparkR DataFrame and R DataFrame.
When I try to convert a SparkR DataFrame having 10million records to R DataFrame, I get Java OutOfMemory Exception. Could you please provide me a solution for this issue?
LikeLike
Converting from SparkR DF to R df is like collecting (collect()) from Spark DF – you take all data from the DF and you try to put it on your client machine. Meaning you take data from the distributed system and put it on the client. Same is with converting to R df – you pretty much ask to have the data on your client. The resources aree not ready to handle that load, it seems. I hope im helping with this answer. The key is to understand that SparkR DFs work on cluster, while R dfs work on one machine.
LikeLike
Thanks for the explanation. I could solve the issue by increasing the executor memory. I got another issue while converting.
The error states “Decimal precision 39 exceeds max precision 38″. All the data which is in Spark Dataframe is from Oracle Database, where I believe decimal precision is <38. Is there any way I can achieve this without modifying the data?
R script used:
#Load required table into memory from Oracle database
df_ac <- loadDF(sqlContext, source="jdbc", url="jdbc:oracle:thin:usr/pass@url..com:1521" , dbtable="TBL_NM")
#Register DataFrame as a temp table so that we can use them with sql queries
registerTempTable(df_ac, "Table")
RawDataAC DATE(‘2015-01-01’)”)
RawData <- as.data.frame(RawDataAC)
#Gives error
Below is the stacktrace:
WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, 10.42.22.167, executor 0): java.lang.IllegalArgumentException: requirement failed: Decimal precision 39 exceeds max precision 38
at scala.Predef$.require(Predef.scala:224)
at org.apache.spark.sql.types.Decimal.set(Decimal.scala:113)
at org.apache.spark.sql.types.Decimal$.apply(Decimal.scala:426)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$3$$anonfun$9.apply(JdbcUtils.scala:337)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$3$$anonfun$9.apply(JdbcUtils.scala:337)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$nullSafeConvert(JdbcUtils.scala:438)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$3.apply(JdbcUtils.scala:337)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$makeGetter$3.apply(JdbcUtils.scala:335)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:286)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anon$1.getNext(JdbcUtils.scala:268)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.util.CompletionIterator.hasNext(CompletionIterator.scala:32)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
LikeLike
I cant be of much help here, sorry. I checked the Spark 2.1 documentation and it seems as if max precision is 38, yes.
LikeLike
I’m operating with >100 million records. I have created Spark Dataframe for this data. As per our algorithm, we have to find the count of records for different conditions. If I use count(SparkDataframe), it take lot of time as the pipelined steps will be executed for that dataframe and data is loaded to memeory. Could you please let me know if there is any better option?
If I collect full Dataframe initially and continue with further execution on that data.frame, everything gets executed fast. But collecting full Dataframe is not a good practice I suppose. It would be great if you can share some details on when can we use Spark Dataframe and when to use data.frame. please let me know if there is any place I can find these details?
Will increasing machine memory/ number of cores help in increasing the performance?
Also let me know if there is any other platform where we can connect and discuss.
All help would be appreciated.
Thanks
LikeLike
Nice article. Many thanks
LikeLike
Its good intro, would have been great if you show a few more operations like Merge/Joins two data frames or DataFrames.
Also using Sparklyr instead of SparkR with dplyr should be more practical
LikeLike
Thanks for the feedback, Ali. At the moment, Im doing data engineering with Scala and machine learning with Python, so R is not on the priority list.
LikeLike