I am using scala with spark and having a hard time understanding how to calculate the maximum count of pickups from a location corresponding to each hour. Currently I have a df with three columns (Location,hour,Zone) where Location is an integer, hour is an integer 0-23 signifying the hour of the day and Zone is a string. Something like this below:
Location hour Zone
97 0 A
49 5 B
97 0 A
10 6 D
25 5 B
97 0 A
97 3 A
What I need to do is find out for each hour of the day 0-23, what zone has the largest number of pickups from a particular location
So the answer should look something like this:
hour Zone max_count
0 A 3
1 B 4
2 A 6
3 D 1
. . .
. . .
23 D 8
What I first tried was to use an intermediate step to figure out the counts per zone and hour
val df_temp = df.select("Location","hour","Zone")
.groupBy("hour","Zone").agg(count($"Location").alias("count"))
This gives me a dataframe that looks like this:
hour Zone count
3 A 5
8 B 9
3 B 2
23 F 8
23 A 1
23 C 4
3 D 12
. . .
. . .
I then tried doing the following:
val df_final = df_temp.select("hours","Zone","count")
.groupBy("hours","Zone").agg(max($"count").alias("max_count")).orderBy($"hours")
This doesn’t do anything except just grouping by hours and zone but I still have 1000s of rows. I also tried:
val df_final = df_temp.select("hours","Zone","count")
.groupBy("hours").agg(max($"count").alias("max_count")).orderBy($"hours")
The above gives me the max count and 24 rows from 0-23 but there is no Zone column there. So the answer looks like this:
hour max_count
0 12
1 15
. .
. .
23 8
I would like the Zone column included so I know which zone had the max count for each of those hours. I was also looking into the window function to do rank but I wasn’t sure how to use it.
Advertisement
Answer
You can use spark window functions for this task.
At first you can group by the data to get a count of number of zones.
val df = read_df.groupBy("hour", "zone").agg(count("*").as("count_order"))
Then create a window to partition the data by hour and order it by total count. Then you have to calculate the rank over this partition of data.
val byZoneName = Window.partitionBy($"hour").orderBy($"count_order".desc)
val rankZone = rank().over(byZoneName)
This will perform the operation and list out the rank of all the zones grouped by hour.
val result_df = df.select($"*", rankZone as "rank")
The output will be something like this:
+----+----+-----------+----+
|hour|zone|count_order|rank|
+----+----+-----------+----+
| 0| A| 3| 1|
| 0| C| 2| 2|
| 0| B| 1| 3|
| 3| A| 1| 1|
| 5| B| 2| 1|
| 6| D| 1| 1|
+----+----+-----------+----+
You can then filter out the data with rank 1.
result_df.filter($"rank" === 1).orderBy("hour").show()
You can check my code here: https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/5114666914683617/1792645088721850/4927717998130263/latest.html