Skip to content
Advertisement

Spark SQL: keep a non-key row after join

I have two dataset as following:

smoothieDs.show()

|smoothie_id | smoothie     | price | 
|1           | Tropical     | 10    |
|2           | Green vegie  | 20    |

and:

ingredientDs.show()

|smoothie     | ingredient    | 
|Tropical     | Mango         | 
|Tropical     | Passion fruit | 
|Green veggie | Cucumber      |
|Green veggie | Kiwi          |

I want to join two datasets so that I could get ingredient information for each smoothie whose price is lower than 15$, but keep those even if the price is higher, and fill in with a string To be communicated for the ingredient field.

I tried smoothieDs.join(ingredientDs).filter(col(price).lt(15)) and it gives:

|smoothie_id  |price | smoothie     | ingredient    |
|1            |10    | Tropical     | Mango         | 
|1            |10    | Tropical     | Passion fruit | 

But my expected result should be:

|smoothie_id  |price | smoothie     | ingredient         |
|1            |10    | Tropical     | Mango              | 
|1            |10    | Tropical     | Passion fruit      | 
|2            |20    | Green veggie | To be communicated | 

Is it possible to achieve this using join directly, if not what is the best way to achieve this ?

Advertisement

Answer

You can replace the ingredient based on the price after the join:

import org.apache.spark.sql.functions._

smoothieDs.join(ingredientDs, "smoothie")
  .withColumn("ingredient", when('price.lt(15), 'ingredient).otherwise("To be communicated"))
  .distinct()
  .show()

Output:

+------------+-----------+-----+------------------+
|    smoothie|smoothie_id|price|        ingredient|
+------------+-----------+-----+------------------+
|Green veggie|          2|   20|To be communicated|
|    Tropical|          1|   10|             Mango|
|    Tropical|          1|   10|     Passion fruit|
+------------+-----------+-----+------------------+

Edit: another option would be to filter the ingredient dataset first and then do the join. This would avoid using distinct but comes at the price of a second join. Depending on the data this can or can not be faster.

smoothieDs.join(
  ingredientDs.join(smoothieDs.filter('price.lt(15)), Seq("smoothie"), "left_semi"),
  Seq("smoothie"), "left_outer")
  .na.fill("To be communicated", Seq("ingredient"))
  .show()
User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement