I have the following dataframe in PySpark:
DT_BORD_REF
: Date column for the month
REF_DATE
: A date reference for current day separating past and future
PROD_ID
: Product ID
COMPANY_CODE
: Company ID
CUSTOMER_CODE
: Customer ID
MTD_WD
: Month to Date count of working days (Date = DT_BORD_REF)
QUANTITY
: Number of items sold
QTE_MTD
: Number of items month to date
+-------------------+-------------------+-----------------+------------+-------------+-------------+------+--------+-------+ | DT_BORD_REF| REF_DATE| PROD_ID|COMPANY_CODE|CUSTOMER_CODE|COUNTRY_ALPHA|MTD_WD|QUANTITY|QTE_MTD| +-------------------+-------------------+-----------------+------------+-------------+-------------+------+--------+-------+ |2020-11-02 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 1| 4.0| 4.0| |2020-11-05 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 3| null| 4.0| |2020-11-06 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 4| null| 4.0| |2020-11-09 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 5| null| 4.0| |2020-11-10 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 6| null| 4.0| |2020-11-11 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 7| null| 4.0| |2020-11-12 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 8| null| 4.0| |2020-11-13 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 9| null| 4.0| |2020-11-16 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 10| null| 4.0| |2020-11-17 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 11| null| 4.0| |2020-11-18 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 12| null| 4.0| |2020-11-19 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 13| null| 4.0| |2020-11-20 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 14| null| 4.0| |2020-11-23 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 15| null| 4.0| |2020-11-24 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 16| null| 4.0| |2020-11-25 00:00:00|2020-11-04 00:00:00| 0000043| 503| KDAI3982| RUS| 17| null| 4.0|
for DT_BORD_REF < REF_DATE
all rows are actual sales and do not necessarily occurs every working day. Sometimes happens in non working days too.
for DT_BORD_REF >= REF_DATE
there are no sales (it’s the future)
The objective is to forecast the sales for all future rows using the formula: QTE_MTD/MTD_WD
calculated on the REF_DATE
for each product, customer and country.
The QTE_MTD was calculated from QUANTITY column using a window function. I need to divide that for MTD_WD
on REF_DATE
which in this exemple is 3
How can I add a column with MTD_WD
on REF_DATE
partitioning by product, customer and country?
In other words, I need to add a column with the first occurrence of MTD_WD
when the condition DT_BORD_REF > REF_DATE
is met (again, which is 3 in this exemple) for each product, customer and country.
This dataset has millions of row for different products, customers and countries The working days are provided by country
Hope it was clear 🙂
Advertisement
Answer
You can use first
with ignorenulls=True
, and when
with the appropriate condition, to get the first MTD_WD
where DT_BORD_REF
> REF_DATE
:
from pyspark.sql import functions as F, Window df2 = df.withColumn( 'val', F.first( F.when( F.col('DT_BORD_REF') > F.col('REF_DATE'), F.col('MTD_WD') ), ignorenulls=True ).over( Window.partitionBy('PROD_ID','COMPANY_CODE','CUSTOMER_CODE','COUNTRY_ALPHA') .orderBy('DT_BORD_REF') .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing) ) )