I am trying to join two hive tables on databricks.
tab1:
consumer_id (string) question_id some_questions "reghvsdvwe" "rvsvbetvs-dvewdqwavd-363tr13r" The contents are shown below
The schema of “some_questions”
array<struct<question_id:string, answers:array<struct<answer_id:string, date_made:timestamp, updated:timestamp>>>>
“some_questions” example:
0: question_id: "rvsvbetvs-dvewdqwavd-363tr13r" answers: 0: {"answer_id": "4363r23-46745y3-2er296", "date_made": "2006-11-02T00:00:00.000+0000", "Updated": "2006-12-01T00:00:00.000+0000"} 1: question_id: "rthdcva45-3t342r34y-vdvsdvds" answers: 0: {"answer_id": "eewgrg-2353t3-thetber", "date_made": "2006-05-12T00:00:00.000+0000", "Updated": "2006-05-12T00:00:00.000+0000"}
tab2:
question_id (string) answer_id(string) question_contents answer_contents "rvsvbetvs-dvewdqwavd-363tr13r" "4363r23-46745y3-2er296" "what do you like the food?" "smell is good" "rthdcva45-3t342r34y-vdvsdvds" "eewgrg-2353t3-thetber" "how do you enjoy the travel ?" "too much traffic in rush hour"
I need to join tab1 and tab2 by “question_id” such that I get a new table
consumer_id question_id question_content answer_content "reghvsdvwe" "rvsvbetvs-dvewdqwavd-363tr13r" "what do you like the food?" "smell is good" "reghvsdvwe" "rthdcva45-3t342r34y-vdvsdvds" "how do you enjoy the travel ?" "too much traffic in rush hour"
I try to join them by pyspark. But, I am not sure how to decompose the array with embedded struct/array.
thanks
Advertisement
Answer
For SparkSQL, you can use either exists:
spark.sql(""" SELECT t1.consumer_id, t2.answer_id, t2.question_contents, t2.answer_contents FROM tab1 as t1 JOIN tab2 as t2 ON exists(t1.some_questions, x -> x.question_id=t2.question_id) """).show() +-----------+--------------------+--------------------+--------------------+ |consumer_id| answer_id| question_contents| answer_contents| +-----------+--------------------+--------------------+--------------------+ | reghvsdvwe|4363r23-46745y3-2...|what do you like ...| smell is good| | reghvsdvwe|eewgrg-2353t3-the...|how do you enjoy ...|too much traffic ...| +-----------+--------------------+--------------------+--------------------+
or array_contains:
spark.sql(""" SELECT t1.consumer_id, t2.answer_id, t2.question_contents, t2.answer_contents FROM tab1 as t1 JOIN tab2 as t2 ON array_contains(t1.some_questions.question_id, t2.question_id) """).show()
with PySpark syntax:
from pyspark.sql.functions import expr df_new = tab1.alias('t1').join( tab2.alias('t2'), expr("array_contains(t1.some_questions.question_id, t2.question_id)") ).select('t1.consumer_id', 't2.question_id', 't2.question_contents', 't2.answer_contents')