it-swarm-eu.dev

Dividi la colonna della stringa Dataframe Spark in più colonne

Ho visto varie persone che suggeriscono che Dataframe.explode è un modo utile per farlo, ma risulta in più file rispetto al dataframe originale, che non è affatto quello che voglio. Voglio semplicemente fare l'equivalente di Dataframe del molto semplice:

rdd.map(lambda row: row + [row.my_str_col.split('-')])

che prende qualcosa come:

col1 | my_str_col
-----+-----------
  18 |  856-yygrm
 201 |  777-psgdg

e lo converte in questo:

col1 | my_str_col | _col3 | _col4
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

Sono a conoscenza di pyspark.sql.functions.split(), ma risulta in una colonna di array nidificata invece di due colonne di livello superiore come desidero. 

Idealmente, voglio che anche queste nuove colonne siano nominate.

32
Peter Gaultney

pyspark.sql.functions.split() è l'approccio giusto qui - è sufficiente appiattire la colonna nidificata ArrayType in più colonne di livello superiore. In questo caso, dove ogni array contiene solo 2 elementi, è molto semplice. Basta usare Column.getItem() per recuperare ogni parte dell'array come una colonna stessa:

split_col = pyspark.sql.functions.split(df['my_str_col'], '-')
df = df.withColumn('NAME1', split_col.getItem(0))
df = df.withColumn('NAME2', split_col.getItem(1))

Il risultato sarà:

col1 | my_str_col | NAME1 | NAME2
-----+------------+-------+------
  18 |  856-yygrm |   856 | yygrm
 201 |  777-psgdg |   777 | psgdg

Non sono sicuro di come risolverei questo in un caso generale in cui gli array annidati non erano della stessa dimensione da riga a riga.

54
Peter Gaultney

Ecco una soluzione al caso generale che non comporta la necessità di conoscere la lunghezza dell'array in anticipo, utilizzando collect, o utilizzando udfs. Sfortunatamente questo funziona solo per spark versione 2.1 e successive, perché richiede la funzione posexplode .

Supponiamo di avere il seguente DataFrame:

df = spark.createDataFrame(
    [
        [1, 'A, B, C, D'], 
        [2, 'E, F, G'], 
        [3, 'H, I'], 
        [4, 'J']
    ]
    , ["num", "letters"]
)
df.show()
#+---+----------+
#|num|   letters|
#+---+----------+
#|  1|A, B, C, D|
#|  2|   E, F, G|
#|  3|      H, I|
#|  4|         J|
#+---+----------+

Dividere la colonna letters e quindi utilizzare posexplode per far esplodere l'array risultante insieme alla posizione nell'array. Quindi usa pyspark.sql.functions.expr per afferrare l'elemento all'indice pos in questo array.

import pyspark.sql.functions as f

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .show()
#+---+------------+---+---+
#|num|     letters|pos|val|
#+---+------------+---+---+
#|  1|[A, B, C, D]|  0|  A|
#|  1|[A, B, C, D]|  1|  B|
#|  1|[A, B, C, D]|  2|  C|
#|  1|[A, B, C, D]|  3|  D|
#|  2|   [E, F, G]|  0|  E|
#|  2|   [E, F, G]|  1|  F|
#|  2|   [E, F, G]|  2|  G|
#|  3|      [H, I]|  0|  H|
#|  3|      [H, I]|  1|  I|
#|  4|         [J]|  0|  J|
#+---+------------+---+---+

Ora creiamo due nuove colonne da questo risultato. Il primo è il nome della nostra nuova colonna, che sarà una concatenazione di letter e l'indice nella matrice. La seconda colonna sarà il valore nell'indice corrispondente nella matrice. Otteniamo quest'ultimo sfruttando la funzionalità di pyspark.sql.functions.expr che ci consente utilizzare i valori delle colonne come parametri .

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .show()
#+---+-------+---+
#|num|   name|val|
#+---+-------+---+
#|  1|letter0|  A|
#|  1|letter1|  B|
#|  1|letter2|  C|
#|  1|letter3|  D|
#|  2|letter0|  E|
#|  2|letter1|  F|
#|  2|letter2|  G|
#|  3|letter0|  H|
#|  3|letter1|  I|
#|  4|letter0|  J|
#+---+-------+---+

Ora possiamo solo groupBy il num e pivot il DataFrame. Mettendo tutto insieme, otteniamo:

df.select(
        "num",
        f.split("letters", ", ").alias("letters"),
        f.posexplode(f.split("letters", ", ")).alias("pos", "val")
    )\
    .drop("val")\
    .select(
        "num",
        f.concat(f.lit("letter"),f.col("pos").cast("string")).alias("name"),
        f.expr("letters[pos]").alias("val")
    )\
    .groupBy("num").pivot("name").agg(f.first("val"))\
    .show()
#+---+-------+-------+-------+-------+
#|num|letter0|letter1|letter2|letter3|
#+---+-------+-------+-------+-------+
#|  1|      A|      B|      C|      D|
#|  3|      H|      I|   null|   null|
#|  2|      E|      F|      G|   null|
#|  4|      J|   null|   null|   null|
#+---+-------+-------+-------+-------+
12
pault

Ho trovato una soluzione per il caso non uniforme generale (o quando ottieni le colonne nidificate, ottenute con la funzione .split ()):

import pyspark.sql.functions as f

@f.udf(StructType([StructField(col_3, StringType(), True),
                   StructField(col_4, StringType(), True)]))

 def splitCols(array):
    return array[0],  ''.join(array[1:len(array)])

 df = df.withColumn("name", splitCols(f.split(f.col("my_str_col"), '-')))\
        .select(df.columns+['name.*'])

Fondamentalmente, devi solo selezionare tutte le colonne precedenti + quelle nidificate 'nome_colonna. *' E in questo caso le otterrai come due colonne di livello superiore.

0
Jasminyas