Skip to content
Advertisement

Daily forecast on a PySpark dataframe

I have the following dataframe in PySpark:

DT_BORD_REF: Date column for the month
REF_DATE: A date reference for current day separating past and future
PROD_ID: Product ID
COMPANY_CODE: Company ID
CUSTOMER_CODE: Customer ID
MTD_WD: Month to Date count of working days (Date = DT_BORD_REF)
QUANTITY: Number of items sold
QTE_MTD: Number of items month to date

+-------------------+-------------------+-----------------+------------+-------------+-------------+------+--------+-------+
|        DT_BORD_REF|           REF_DATE|          PROD_ID|COMPANY_CODE|CUSTOMER_CODE|COUNTRY_ALPHA|MTD_WD|QUANTITY|QTE_MTD|
+-------------------+-------------------+-----------------+------------+-------------+-------------+------+--------+-------+
|2020-11-02 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     1|     4.0|    4.0|
|2020-11-05 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     3|    null|    4.0|
|2020-11-06 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     4|    null|    4.0|
|2020-11-09 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     5|    null|    4.0|
|2020-11-10 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     6|    null|    4.0|
|2020-11-11 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     7|    null|    4.0|
|2020-11-12 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     8|    null|    4.0|
|2020-11-13 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|     9|    null|    4.0|
|2020-11-16 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    10|    null|    4.0|
|2020-11-17 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    11|    null|    4.0|
|2020-11-18 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    12|    null|    4.0|
|2020-11-19 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    13|    null|    4.0|
|2020-11-20 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    14|    null|    4.0|
|2020-11-23 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    15|    null|    4.0|
|2020-11-24 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    16|    null|    4.0|
|2020-11-25 00:00:00|2020-11-04 00:00:00|          0000043|         503|     KDAI3982|          RUS|    17|    null|    4.0|

for DT_BORD_REF < REF_DATE all rows are actual sales and do not necessarily occurs every working day. Sometimes happens in non working days too.

for DT_BORD_REF >= REF_DATE there are no sales (it’s the future)

The objective is to forecast the sales for all future rows using the formula: QTE_MTD/MTD_WD calculated on the REF_DATE for each product, customer and country.

The QTE_MTD was calculated from QUANTITY column using a window function. I need to divide that for MTD_WD on REF_DATE which in this exemple is 3

How can I add a column with MTD_WD on REF_DATE partitioning by product, customer and country?

In other words, I need to add a column with the first occurrence of MTD_WD when the condition DT_BORD_REF > REF_DATE is met (again, which is 3 in this exemple) for each product, customer and country.

This dataset has millions of row for different products, customers and countries The working days are provided by country

Hope it was clear 🙂

Advertisement

Answer

You can use first with ignorenulls=True, and when with the appropriate condition, to get the first MTD_WD where DT_BORD_REF > REF_DATE:

from pyspark.sql import functions as F, Window

df2 = df.withColumn(
    'val',
    F.first(
        F.when(
            F.col('DT_BORD_REF') > F.col('REF_DATE'),
            F.col('MTD_WD')
        ), 
        ignorenulls=True
    ).over(
        Window.partitionBy('PROD_ID','COMPANY_CODE','CUSTOMER_CODE','COUNTRY_ALPHA')
              .orderBy('DT_BORD_REF')
              .rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
    )
)
User contributions licensed under: CC BY-SA
9 People found this is helpful
Advertisement