I have two dataset as following:
smoothieDs.show() |smoothie_id | smoothie | price | |1 | Tropical | 10 | |2 | Green vegie | 20 |
and:
ingredientDs.show() |smoothie | ingredient | |Tropical | Mango | |Tropical | Passion fruit | |Green veggie | Cucumber | |Green veggie | Kiwi |
I want to join two datasets so that I could get ingredient information for each smoothie whose price is lower than 15$, but keep those even if the price is higher, and fill in with a string To be communicated for the ingredient field.
I tried smoothieDs.join(ingredientDs).filter(col(price).lt(15)) and it gives:
|smoothie_id |price | smoothie | ingredient | |1 |10 | Tropical | Mango | |1 |10 | Tropical | Passion fruit |
But my expected result should be:
|smoothie_id |price | smoothie | ingredient | |1 |10 | Tropical | Mango | |1 |10 | Tropical | Passion fruit | |2 |20 | Green veggie | To be communicated |
Is it possible to achieve this using join directly, if not what is the best way to achieve this ?
Advertisement
Answer
You can replace the ingredient based on the price after the join:
import org.apache.spark.sql.functions._
smoothieDs.join(ingredientDs, "smoothie")
.withColumn("ingredient", when('price.lt(15), 'ingredient).otherwise("To be communicated"))
.distinct()
.show()
Output:
+------------+-----------+-----+------------------+ | smoothie|smoothie_id|price| ingredient| +------------+-----------+-----+------------------+ |Green veggie| 2| 20|To be communicated| | Tropical| 1| 10| Mango| | Tropical| 1| 10| Passion fruit| +------------+-----------+-----+------------------+
Edit: another option would be to filter the ingredient dataset first and then do the join. This would avoid using distinct but comes at the price of a second join. Depending on the data this can or can not be faster.
smoothieDs.join(
ingredientDs.join(smoothieDs.filter('price.lt(15)), Seq("smoothie"), "left_semi"),
Seq("smoothie"), "left_outer")
.na.fill("To be communicated", Seq("ingredient"))
.show()