Skip to content
Advertisement

how to Avoid self-join in spark scala

I have a DataFrame called product_relationship_current and I’m doing a self-join to retrieve a new DataFrame like bellow:

First I’m giving it an alias so I could consider them like two different dataframes:

val pr1 = product_relationship_current.alias("pr1").where(col("TYPE").isin("contains", "CONTAINS"))
val pr2 = product_relationship_current.alias("pr2")

And then I’m doing a self-join to get a new dataframe:

val stackoutput = pr1.join(pr2, pr1("PRODUCT_VERSION_ID_RELATED_FK") === pr2("PRODUCT_VERSION_ID_FK"), "left")
  .select(pr1("PRODUCT_ID"), pr1("PRODUCT_VERSION"), pr1("RELATED_PRODUCT_ID"), pr1("RELATED_PRODUCT_VERSION"), pr1("TYPE"), pr1("PRODUCT_VERSION_ID_RELATED_FK"))
  .distinct()

But I’m looking for another way to do that without doing a self-join, so I don’t have to load the same dataframe twice because it is taking so long to be executed. (my product_relationship_current dataframe is too large).

This is the SQL query I tried to perform using spark scala:

select 
  distinct pr1.product_id as IO, 
  pr1.product_version as IOV, 
  pr1.related_product_id, 
  pr1.related_product_version, 
  pr1.type, 
  pr1.product_version_id_related_fk 
from 
  product_relationship_current as pr1 
  left join product_relationship_current as pr2 on pr1.product_version_id_related_fk = pr2.product_version_id_fk 
where 
  pr1.type = 'contains' 

Advertisement

Answer

I’ll structure this answer in 2 parts: my answer, and a question.

My answer

If you want to avoid to read a dataframe twice, you can use df.cache to cache it into memory/disk. df.cache is basically df.persist using the default storage level (MEMORY_AND_DISK).

I made a makeshift CSV file (data.csv) with your columns in there. Then I wrote something like this:

val df = spark.read.option("delimiter", ";").option("header", "true").csv("data.csv").cache
val df1 = df.filter(df("TYPE").isin("contains", "CONTAINS"))
val output = df1.alias("df1")
  .join(df.alias("df"), col("df1.PRODUCT_VERSION_ID_RELATED_FK") === col("df.PRODUCT_VERSION_ID_FK"), "left")
  .select(df1("PRODUCT_ID"), df1("PRODUCT_VERSION"), df1("RELATED_PRODUCT_ID"), df1("RELATED_PRODUCT_VERSION"), df1("TYPE"), df1("PRODUCT_VERSION_ID_RELATED_FK"))
  .distinct()

So you see that I used the .cache method on the first line. Now, how can we verify that this worked? I had a look at the Spark UI (runs on port 4040 by default wherever your driver process is running. In my case, I ran spark-shell locally so I could access the UI on localhost:4040)

By looking at the query plan or visualisation of it we can understand what the effect of .cache is. I ran the code above twice: once where I did not .cache my dataframe and one where I did.

The first image that I’ll drop here is one where I did not .cache my dataframe.

Dataframe was not cached

The second image is one where I did .cache my dataframe.

Dataframe was cached

So you see the difference at the top of the images: because the dataframe was cached the CSV file (or whatever source you are using) won’t be read in twice: you see that we have a InMemoryTableScan block in both branches instead of a Scan csv block.

Important: If your data is so large that it does not fit into the combined memory of your executors, the data will spill over to disk. This will make this operation slower.

My question to you: It seems like you’re doing a left join with pr1 being the left table, and afterwards you’re only selecting columns from the pr1 part of the table. Is this join even needed? Seems like you could just select the wanted columns and then .distinct the dataframe.

Hope this helps!

User contributions licensed under: CC BY-SA
6 People found this is helpful
Advertisement