it-swarm-eu.dev

Spark Dataframe distinguere le colonne con il nome duplicato

Così come so in Spark Dataframe, quello per più colonne può avere lo stesso nome mostrato nell'istantanea dataframe sottostante:

[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]

Sopra il risultato viene creato unendo un dataframe a se stesso, puoi vedere che ci sono 4 colonne con due a e f.

Il problema è che quando provo a fare più calcoli con la colonna a, non riesco a trovare un modo per selezionare a, ho provato df[0] e df.select('a'), entrambi mi hanno restituito sotto error mesaage:

AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.

Esiste comunque in Spark API che posso distinguere nuovamente le colonne dai nomi duplicati? o forse un modo per farmi cambiare i nomi delle colonne?

39
resec

Ti consiglio di cambiare i nomi delle colonne per join

df1.select('a as "df1_a", 'f as "df1_f")
   .join(df2.select('a as "df2_a", 'f as "df2_f"), 'df1_a === 'df2_a)

DataFrame risultante avrà schema 

(df1_a, df1_f, df2_a, df2_f)

Iniziamo con alcuni dati:

from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row

df1 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=125231, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])

df2 = sqlContext.createDataFrame([
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
    Row(a=107831, f=SparseVector(
        5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])

Ci sono alcuni modi in cui puoi affrontare questo problema. Prima di tutto è possibile fare riferimento in modo univoco alle colonne della tabella figlio utilizzando le colonne parent:

df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Puoi anche usare gli alias di tabella:

from pyspark.sql.functions import col

df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")

df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)

##  +--------------------+
##  |                   f|
##  +--------------------+
##  |(5,[0,1,2,3,4],[0...|
##  |(5,[0,1,2,3,4],[0...|
##  +--------------------+

Finalmente puoi rinominare le colonne a livello di codice:

df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df1.select(*(col(x).alias(x + '_df2') for x in df2.columns))

df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)

## +--------------------+
## |               f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
60
zero323

Esiste un modo più semplice rispetto alla scrittura di alias per tutte le colonne a cui si sta partecipando:

df1.join(df2,['a'])

Funziona se la chiave a cui stai partecipando è la stessa in entrambe le tabelle.

See https://docs.databricks.com/spark/latest/faq/join-two-dataframes-duplicated-column.html

6
Paul Bendevis

Dopo aver esplorato l'API Spark, ho scoperto che posso usare alias per creare un alias per il dataframe originale, quindi uso withColumnRenamed per rinominare manualmente ogni colonna dell'alias, questo farà la variabile join senza causare la duplicazione del nome della colonna.

Maggiori dettagli possono essere consultati qui sotto Spark Dataframe API :

pyspark.sql.DataFrame.alias

pyspark.sql.DataFrame.withColumnRenamed

Tuttavia, penso che questa sia solo una soluzione fastidiosa, e mi chiedo se c'è un modo migliore per la mia domanda.

5
resec

È possibile utilizzare il metodo def drop(col: Column) per eliminare la colonna duplicata, ad esempio:

DataFrame:df1

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

DataFrame:df2

+-------+-----+
| a     | f   |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+

quando mi unisco a df1 con df2, il DataFrame sarà come sotto:

val newDf = df1.join(df2,df1("a")===df2("a"))

DataFrame:newDf

+-------+-----+-------+-----+
| a     | f   | a     | f   |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+

Ora, possiamo usare il metodo def drop(col: Column) per eliminare la colonna duplicata 'a' o 'f', proprio come segue:

val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
3
StrongYoung

In questo modo possiamo unire due Dataframes sugli stessi nomi di colonna in PySpark.

df = df1.join(df2, ['col1','col2','col3'])

Se fai printSchema() dopo questo, puoi vedere che le colonne duplicate sono state rimosse.

1
Nikhil Redij

Supponiamo che i DataFrames che vuoi unire siano df1 e df2, e li stai unendo alla colonna 'a', allora hai 2 metodi

Metodo 1

df1.join (DF2, 'a', 'left_outer')

Questo è un metodo stupendo ed è altamente raccomandato.

Metodo 2

df1.join (df2, df1.a == df2.a, 'left_outer'). drop (df2.a)

1
typhoonbxq