I have a DataFrame called product_relationship_current and I’m doing a self-join to retrieve a new DataFrame like bellow:
First I’m giving it an alias so I could consider them like two different dataframes:
val pr1 = product_relationship_current.alias("pr1").where(col("TYPE").isin("contains", "CONTAINS")) val pr2 = product_relationship_current.alias("pr2")
And then I’m doing a self-join to get a new dataframe:
val stackoutput = pr1.join(pr2, pr1("PRODUCT_VERSION_ID_RELATED_FK") === pr2("PRODUCT_VERSION_ID_FK"), "left") .select(pr1("PRODUCT_ID"), pr1("PRODUCT_VERSION"), pr1("RELATED_PRODUCT_ID"), pr1("RELATED_PRODUCT_VERSION"), pr1("TYPE"), pr1("PRODUCT_VERSION_ID_RELATED_FK")) .distinct()
But I’m looking for another way to do that without doing a self-join, so I don’t have to load the same dataframe twice because it is taking so long to be executed. (my product_relationship_current dataframe is too large).
This is the SQL query I tried to perform using spark scala:
select distinct pr1.product_id as IO, pr1.product_version as IOV, pr1.related_product_id, pr1.related_product_version, pr1.type, pr1.product_version_id_related_fk from product_relationship_current as pr1 left join product_relationship_current as pr2 on pr1.product_version_id_related_fk = pr2.product_version_id_fk where pr1.type = 'contains'
Advertisement
Answer
I’ll structure this answer in 2 parts: my answer, and a question.
My answer
If you want to avoid to read a dataframe twice, you can use df.cache
to cache it into memory/disk. df.cache
is basically df.persist
using the default storage level (MEMORY_AND_DISK).
I made a makeshift CSV file (data.csv) with your columns in there. Then I wrote something like this:
val df = spark.read.option("delimiter", ";").option("header", "true").csv("data.csv").cache val df1 = df.filter(df("TYPE").isin("contains", "CONTAINS")) val output = df1.alias("df1") .join(df.alias("df"), col("df1.PRODUCT_VERSION_ID_RELATED_FK") === col("df.PRODUCT_VERSION_ID_FK"), "left") .select(df1("PRODUCT_ID"), df1("PRODUCT_VERSION"), df1("RELATED_PRODUCT_ID"), df1("RELATED_PRODUCT_VERSION"), df1("TYPE"), df1("PRODUCT_VERSION_ID_RELATED_FK")) .distinct()
So you see that I used the .cache
method on the first line. Now, how can we verify that this worked? I had a look at the Spark UI (runs on port 4040 by default wherever your driver process is running. In my case, I ran spark-shell
locally so I could access the UI on localhost:4040
)
By looking at the query plan or visualisation of it we can understand what the effect of .cache
is. I ran the code above twice: once where I did not .cache
my dataframe and one where I did.
The first image that I’ll drop here is one where I did not .cache
my dataframe.
The second image is one where I did .cache
my dataframe.
So you see the difference at the top of the images: because the dataframe was cached the CSV file (or whatever source you are using) won’t be read in twice: you see that we have a InMemoryTableScan
block in both branches instead of a Scan csv
block.
Important: If your data is so large that it does not fit into the combined memory of your executors, the data will spill over to disk. This will make this operation slower.
My question to you: It seems like you’re doing a left join with pr1
being the left table, and afterwards you’re only selecting columns from the pr1 part of the table. Is this join even needed? Seems like you could just select the wanted columns and then .distinct
the dataframe.
Hope this helps!