Skip to content
Advertisement

Spark: How to transpose and explode columns with dynamic nested arrays

I applied an algorithm from the question Spark: How to transpose and explode columns with nested arrays to transpose and explode nested spark dataframe with dynamic arrays.

I have added to the dataframe """{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}""" , with new column c, where array has new val_dynamic field which can appear on random basis.

I’m looking for required output 2 (Transpose and Explode ) but even example of required output 1 (Transpose) will be very useful.

Input df:

+------------------+--------+-----------+---+
|                 a|       b|          c| id|
+------------------+--------+-----------+---+
|[{1, 1}, {11, 11}]|    null|       null|  1|
|              null|[{2, 2}]|       null|  2|
|              null|    null|[{3, 3, 3}]|  3|   !!! NOTE: Added `val_dynamic`
+------------------+--------+-----------+---+


root
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)  !!! NOTE: Added `val_dynamic`
 |-- id: long (nullable = true)

Required output 1 (transpose_df):

+---+------+-------------------+
| id| cols |       arrays      |
+---+------+-------------------+
|  1|  a   | [{1, 1}, {11, 11}]|
|  2|  b   | [{2, 2}]          |
|  3|  c   | [{3, 3, 3}]       | !!! NOTE: Added `val_dynamic`
+---+------+-------------------+

Required output 2 (explode_df):

+---+----+----+---+-----------+
| id|cols|date|val|val_dynamic|
+---+----+----+---+-----------+
|  1|   a|   1|  1|   null    |
|  1|   a|  11| 11|   null    |
|  2|   b|   2|  2|   null    |
|  3|   c|   3|  3|      3    |  !!! NOTE: Added `val_dynamic`
+---+----+----+---+-----------+

Current code:

import pyspark.sql.functions as f

df = spark.read.json(sc.parallelize([
  """{"id":1,"a":[{"date":1,"val":1},{"date":11,"val":11}]}""",
  """{"id":2,"b":[{"date":2,"val":2}]}}""",
  """{"id":3,"c":[{"date":3,"val":3, "val_dynamic":3}]}}"""
    ]))

df.show()

cols = [ 'a', 'b', 'c']

#expr = stack(2,'a',a,'b',b,'c',c )
expr = f"stack({len(cols)}," + 
    ",".join([f"'{c}',{c}" for c in cols]) + 
    ")"


transpose_df = df.selectExpr("id", expr) 
    .withColumnRenamed("col0", "cols") 
    .withColumnRenamed("col1", "arrays") 
    .filter("not arrays is null")

transpose_df.show()

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')
explode_df.show()

Current outcome

AnalysisException: cannot resolve 'stack(3, 'a', `a`, 'b', `b`, 'c', `c`)' due to data type mismatch: Argument 2 (array<struct<date:bigint,val:bigint>>) != Argument 6 (array<struct<date:bigint,val:bigint,val_dynamic:bigint>>); line 1 pos 0;
'Project [id#2304L, unresolvedalias(stack(3, a, a#2301, b, b#2302, c, c#2303), Some(org.apache.spark.sql.Column$$Lambda$2580/0x00000008411d3040@4d9eefd0))]
+- LogicalRDD [a#2301, b#2302, c#2303, id#2304L], false

ref : Transpose column to row with Spark

Answer

stack requires that all stacked columns have the same type. The problem here is that the structs inside of the arrays have different members. One approach would be to add the missing members to all structs so that the approach of my previous answer works again.

cols = ['a', 'b', 'c']

#create a map containing all struct fields per column
existing_fields = {c:list(map(lambda field: field.name, df.schema.fields[i].dataType.elementType.fields)) 
      for i,c in enumerate(df.columns) if c in cols}

#get a (unique) set of all fields that exist in all columns
all_fields = set(sum(existing_fields.values(),[]))

#create a list of transform expressions to fill up the structs will null fields
transform_exprs = [f"transform({c}, e -> named_struct(" + 
    ",".join([f"'{f}', {('e.'+f) if f in existing_fields[c] else 'cast(null as long)'}" for f in all_fields]) 
    + f")) as {c}" for c in cols]

#create a df where all columns contain arrays with the same struct
full_struct_df = df.selectExpr("id", *transform_exprs)

full_struct_df has now the schema

root
 |-- id: long (nullable = true)
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = false)
 |    |    |-- val: long (nullable = true)
 |    |    |-- val_dynamic: long (nullable = true)
 |    |    |-- date: long (nullable = true)

From here the logic works as before:

stack_expr = f"stack({len(cols)}," + 
    ",".join([f"'{c}',{c}" for c in cols]) + 
    ")"

transpose_df = full_struct_df.selectExpr("id", stack_expr) 
    .withColumnRenamed("col0", "cols") 
    .withColumnRenamed("col1", "arrays") 
    .filter("not arrays is null")

explode_df = transpose_df.selectExpr('id', 'cols', 'inline(arrays)')

The first part of this answer requires that

  • each column mentioned in cols is an array of structs
  • all members of all structs are longs. The reason for this restriction is the cast(null as long) when creating the transform expression.