I have 10000 jsons with different ids each has 10000 names. How to flatten nested arrays by merging values by int or str in pyspark?
EDIT: I have added column name_10000_xvz to explain better data structure. I have updated Notes, Input df, required output df and input json files as well.
Notes:
- Input dataframe has more than 10000 columns name_1_a, name_1000_xx so column(array) names can not be hardcoded as it will requires to write 10000 names
id,date,valhas always the same naming convention across all columns and all jsons- array size can vary but
date,valare always there so they can be hardcoded datecan be different in each array, for example name_1_a starts with 2001, but name_10000_xvz for id == 1 starts with 2000 and finnish with 2004, however for id == 2 starts with 1990 and finish with 2004
Input df:
root
|-- id: long (nullable = true)
|-- name_10000_xvz: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- name_1_a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- name_1_b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- name_2_a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|id |name_10000_xvz |name_1_a |name_1_b |name_2_a |
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
|2 |[{1990, 39}, {2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}, {2004, 34}]|[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
|1 |[{2000, 30}, {2001, 31}, {2002, 32}, {2003, 33}] |[{2001, 1}, {2002, 2}, {2003, 3}]|[{2001, 4}, {2002, 5}, {2003, 6}]|[{2001, 21}, {2002, 22}, {2003, 23}]|
+---+------------------------------------------------------------------------+---------------------------------+---------------------------------+------------------------------------+
Required output df:
+---+---------+----------+-----------+---------+----------------+ |id | date | name_1_a | name_1_b |name_2_a | name_10000_xvz | +---+---------+----------+-----------+---------+----------------+ |1 | 2000 | 0 | 0 | 0 | 30 | |1 | 2001 | 1 | 4 | 21 | 31 | |1 | 2002 | 2 | 5 | 22 | 32 | |1 | 2003 | 3 | 6 | 23 | 33 | |2 | 1990 | 0 | 0 | 0 | 39 | |2 | 2000 | 0 | 0 | 0 | 30 | |2 | 2001 | 1 | 4 | 21 | 31 | |2 | 2002 | 2 | 5 | 22 | 32 | |2 | 2003 | 3 | 6 | 23 | 33 | |2 | 2004 | 0 | 0 | 0 | 34 | +---+---------+----------+-----------+---------+----------------+
To reproduce input df:
df = spark.read.json(sc.parallelize([
"""{"id":1,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33}]}""",
"""{"id":2,"name_1_a":[{"date":2001,"val":1},{"date":2002,"val":2},{"date":2003,"val":3}],"name_1_b":[{"date":2001,"val":4},{"date":2002,"val":5},{"date":2003,"val":6}],"name_2_a":[{"date":2001,"val":21},{"date":2002,"val":22},{"date":2003,"val":23}],"name_10000_xvz":[{"date":1990,"val":39},{"date":2000,"val":30},{"date":2001,"val":31},{"date":2002,"val":32},{"date":2003,"val":33},{"date":2004,"val":34}]}}"""
]))
Useful links:
- How to flatten data frame with dynamic nested structs / arrays in PySpark
- https://docs.databricks.com/_static/notebooks/higher-order-functions.html
Advertisement
Answer
UPDATE
As @werner has mentioned, it’s necessary to transform all structs to append the column name into it.
import pyspark.sql.functions as f
names = [column for column in df.columns if column.startswith('name_')]
expressions = []
for name in names:
expressions.append(f.expr('TRANSFORM({name}, el -> STRUCT("{name}" AS name, el.date, el.val))'.format(name=name)))
flatten_df = (df
.withColumn('flatten', f.flatten(f.array(*expressions)))
.selectExpr('id', 'inline(flatten)'))
output_df = (flatten_df
.groupBy('id', 'date')
.pivot('name', names)
.agg(f.first('val')))
output_df.sort('id', 'date').show(truncate=False)
+---+----+--------------+--------+--------+--------+
|id |date|name_10000_xvz|name_1_a|name_1_b|name_2_a|
+---+----+--------------+--------+--------+--------+
|1 |2000|30 |null |null |null |
|1 |2001|31 |1 |4 |21 |
|1 |2002|32 |2 |5 |22 |
|1 |2003|33 |3 |6 |23 |
|2 |1990|39 |null |null |null |
|2 |2000|30 |null |null |null |
|2 |2001|31 |1 |4 |21 |
|2 |2002|32 |2 |5 |22 |
|2 |2003|33 |3 |6 |23 |
|2 |2004|34 |null |null |null |
+---+----+--------------+--------+--------+--------+
OLD
Assuming:
datevalue is always the same value all columnsname_1_a, name_1_b, name_2_atheir sizes are equals
import pyspark.sql.functions as f
output_df = (df
.withColumn('flatten', f.expr('TRANSFORM(SEQUENCE(0, size(name_1_a) - 1), i -> '
'STRUCT(name_1_a[i].date AS date, '
' name_1_a[i].val AS name_1_a, '
' name_1_b[i].val AS name_1_b, '
' name_2_a[i].val AS name_2_a))'))
.selectExpr('id', 'inline(flatten)'))
output_df.sort('id', 'date').show(truncate=False)
+---+----+--------+--------+--------+
|id |date|name_1_a|name_1_b|name_2_a|
+---+----+--------+--------+--------+
|1 |2001|1 |4 |21 |
|1 |2002|2 |5 |22 |
|1 |2003|3 |6 |23 |
|2 |2001|1 |4 |21 |
|2 |2002|2 |5 |22 |
|2 |2003|3 |6 |23 |
+---+----+--------+--------+--------+