I have two dataset as following:
smoothieDs.show() |smoothie_id | smoothie | price | |1 | Tropical | 10 | |2 | Green vegie | 20 |
and:
ingredientDs.show() |smoothie | ingredient | |Tropical | Mango | |Tropical | Passion fruit | |Green veggie | Cucumber | |Green veggie | Kiwi |
I want to join two datasets so that I could get ingredient information for each smoothie whose price
is lower than 15$, but keep those even if the price is higher, and fill in with a string To be communicated
for the ingredient
field.
I tried smoothieDs.join(ingredientDs).filter(col(price).lt(15))
and it gives:
|smoothie_id |price | smoothie | ingredient | |1 |10 | Tropical | Mango | |1 |10 | Tropical | Passion fruit |
But my expected result should be:
|smoothie_id |price | smoothie | ingredient | |1 |10 | Tropical | Mango | |1 |10 | Tropical | Passion fruit | |2 |20 | Green veggie | To be communicated |
Is it possible to achieve this using join
directly, if not what is the best way to achieve this ?
Advertisement
Answer
You can replace the ingredient
based on the price after the join:
import org.apache.spark.sql.functions._ smoothieDs.join(ingredientDs, "smoothie") .withColumn("ingredient", when('price.lt(15), 'ingredient).otherwise("To be communicated")) .distinct() .show()
Output:
+------------+-----------+-----+------------------+ | smoothie|smoothie_id|price| ingredient| +------------+-----------+-----+------------------+ |Green veggie| 2| 20|To be communicated| | Tropical| 1| 10| Mango| | Tropical| 1| 10| Passion fruit| +------------+-----------+-----+------------------+
Edit: another option would be to filter the ingredient dataset first and then do the join. This would avoid using distinct
but comes at the price of a second join. Depending on the data this can or can not be faster.
smoothieDs.join( ingredientDs.join(smoothieDs.filter('price.lt(15)), Seq("smoothie"), "left_semi"), Seq("smoothie"), "left_outer") .na.fill("To be communicated", Seq("ingredient")) .show()