I have a table with a lot of records (6+ million) but most of the rows per ID are all the same.
Example:
Row | Date | ID | Col1 | Col2 | Col3 | Col4 | Col5 |
---|---|---|---|---|---|---|---|
1 | 01-01-2021 | 1 | a | b | c | d | e |
2 | 02-01-2021 | 1 | a | b | c | d | x |
3 | 03-01-2021 | 1 | a | b | c | d | x |
4 | 04-01-2021 | 1 | a | b | c | d | x |
5 | 01-01-2021 | 2 | a | b | c | d | e |
6 | 02-01-2021 | 2 | a | b | x | d | e |
7 | 03-01-2021 | 2 | a | b | x | d | e |
8 | 01-01-2021 | 3 | a | b | c | d | e |
9 | 02-01-2021 | 3 | a | b | c | d | e |
10 | 03-01-2021 | 3 | a | b | c | d | e |
To save space but also make querying easier I want to create a table where only rows are shown if there was a change vs the previous row (except for the date).
For above table this means I would only like to see:
- Row 1 > First entry for ID 1
- Row 2 > Because something changed vs the previous row
- Row 5 > First entry for ID 2
- Row 6 > Because something changed vs the previous row
- Row 8 > First entry for ID 3 and no changes after this
So the table would look like this:
Row | Date | ID | Col1 | Col2 | Col3 | Col4 | Col5 |
---|---|---|---|---|---|---|---|
1 | 01-01-2021 | 1 | a | b | c | d | e |
2 | 02-01-2021 | 1 | a | b | c | d | x |
5 | 01-01-2021 | 2 | a | b | c | d | e |
6 | 02-01-2021 | 2 | a | b | x | d | e |
8 | 01-01-2021 | 3 | a | b | c | d | e |
I am trying to do this in PySpark and have had some success with 1 column only using LAG
but having some trouble when there are more columns (there are about 20 in my own table). I would prefer to do this in PySpark but a working version in SQL or Python could also work!
I was wondering if there are better ways to do this.
Advertisement
Answer
You can create a lagged array column of all columns of interest and compare it to the current row, then do a filter:
from pyspark.sql import functions as F, Window cols = df.columns[3:] w = Window.partitionBy('ID').orderBy('Date') df2 = df.withColumn( 'diff', F.coalesce( F.lag(F.array(*cols)).over(w) != F.array(*cols), F.lit(True) # take care of first row where the lag is null ) ).filter('diff').drop('diff') df2.show() +---+----------+---+----+----+----+----+----+ |Row| Date| ID|Col1|Col2|Col3|Col4|Col5| +---+----------+---+----+----+----+----+----+ | 1|01-01-2021| 1| a| b| c| d| e| | 2|02-01-2021| 1| a| b| c| d| x| | 5|01-01-2021| 2| a| b| c| d| e| | 6|02-01-2021| 2| a| b| x| d| e| | 8|01-01-2021| 3| a| b| c| d| e| +---+----------+---+----+----+----+----+----+