I have two tables as below. What I’m trying to do is to join A and B base on date and id, to get the value
from B. The problem is, I want to join using add_month(A.Date, -1) = B.month
(find the data in table B from one month earlier). If that’s not available, I want to join using two months earlier add_month(A.Date, -2) = B.month
How can I achieve this in one query? In the result all 3 rows should be joint. Spark sql is preferred instead of api. Many thanks.
Table A: -------------- ID. |Date | --------------- A |2022-02 | --------------- B |2022-02 | --------------- C |2022-02 | Table B: ---------------- ID. |Date |value| ----------------- A |2022-01 | V1 ----------------- B |2022-01 | V2 --------------- C |2021-12 | V3 Expected output: ---------------- ID. |ADate |value| ----------------- A |2022-02 | V1 --result from join condition add_month(A.Date, -1) = B.month ----------------- B |2022-02. | V2 --------------- C |2022-02 | V3 ---result from join condition add_month(A.Date, -2) = B.month
Advertisement
Answer
Once way I can think of is , you can create the required lag date
for A
columns and join with date
with B as below –
Data Preparation
df1 = pd.DataFrame({ 'id':['A','B','C'], 'Date':['2022-02'] * 3 }) sparkDF1 = sql.createDataFrame(df1) sparkDF1 = sparkDF1.withColumn('date_lag_1',F.add_months(F.col('Date'),-1)) .withColumn('date_lag_2',F.add_months(F.col('Date'),-2)) df2 = pd.DataFrame({ 'id':['A','B','C'], 'Date':['2022-01','2022-01','2021-12'] , 'Value':['V1','V2','V3'] }) sparkDF2 = sql.createDataFrame(df2) sparkDF1.show() +---+-------+----------+----------+ | id| Date|date_lag_1|date_lag_2| +---+-------+----------+----------+ | A|2022-02|2022-01-01|2021-12-01| | B|2022-02|2022-01-01|2021-12-01| | C|2022-02|2022-01-01|2021-12-01| +---+-------+----------+----------+ sparkDF2.show() +---+-------+-----+ | id| Date|Value| +---+-------+-----+ | A|2022-01| V1| | B|2022-01| V2| | C|2021-12| V3| +---+-------+-----+
Join – Spark API
finalDF = sparkDF1.join(sparkDF2 , ( sparkDF1['id'] == sparkDF2['id'] ) & ( (sparkDF1['date_lag_1'] == F.to_date(sparkDF2['date'],'yyyy-MM')) | (sparkDF1['date_lag_2'] == F.to_date(sparkDF2['date'],'yyyy-MM')) ) ,'inner' ).select(sparkDF1['id'] ,sparkDF1['Date'] ,sparkDF2['Value'] ).orderBy(F.col('id')) finalDF.show() +---+-------+-----+ | id| Date|Value| +---+-------+-----+ | A|2022-02| V1| | B|2022-02| V2| | C|2022-02| V3| +---+-------+-----+
Join – SparkSQL
sparkDF1.registerTempTable("TB1") sparkDF2.registerTempTable("TB2") sql.sql(""" SELECT a.ID ,a.DATE ,b.VALUE FROM TB1 a INNER JOIN TB2 b ON a.ID = b.ID AND (ADD_MONTHS(a.DATE,-1) = B.DATE OR ADD_MONTHS(a.DATE,-2) = B.DATE) ORDER BY a.ID """).show() +---+-------+-----+ | ID| DATE|VALUE| +---+-------+-----+ | A|2022-02| V1| | B|2022-02| V2| | C|2022-02| V3| +---+-------+-----+