Da un dataframe PySpark come SQL
name age city
abc 20 A
def 30 B
Come ottenere l'ultima riga. (Come da df.limit (1) posso ottenere la prima riga di dataframe in un nuovo dataframe).
E come posso accedere alle righe del dataframe per index.like riga no. 12 o 200.
In panda che posso fare
df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]
Sono solo curioso di sapere come accedere al dataframe di pyspark in questi modi o in modi alternativi.
Grazie
Come ottenere l'ultima riga.
Lungo e brutto modo che presuppone che tutte le colonne sono oderable:
from pyspark.sql.functions import (
col, max as max_, struct, monotonically_increasing_id
)
last_row = (df
.withColumn("_id", monotonically_increasing_id())
.select(max(struct("_id", *df.columns))
.alias("tmp")).select(col("tmp.*"))
.drop("_id"))
Se non tutte le colonne possono essere ordinate puoi provare:
with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]
with_id.where(col("_id") == i).drop("_id")
Nota. Esiste la funzione last
in pyspark.sql.functions
`o.a.s.sql.functions ma considerando la descrizione delle espressioni corrispondenti non è una buona scelta qui.
come posso accedere alle righe del dataframe per index.like
Non puoi. Spark DataFrame
e accessibile per indice. Puoi aggiungere indici usando zipWithIndex
e filtrare successivamente. Tieni a mente questa operazione _/O(N).
Come ottenere l'ultima riga.
Se si dispone di una colonna che è possibile utilizzare per ordinare dataframe, ad esempio "indice", quindi un modo semplice per ottenere l'ultimo record utilizza SQL: 1) ordinare la tabella in ordine decrescente e 2) prendere 1o valore da questo ordine
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()
E come posso accedere alle righe del dataframe per index.like riga no. 12 o 200.
Modo simile puoi ottenere record in qualsiasi linea
row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()
Se non hai la colonna "indice" puoi crearla usando
from pyspark.sql.functions import monotonically_increasing_id
df = df.withColumn("index", monotonically_increasing_id())
from pyspark.sql import functions as F
expr = [F.last(col).alias(col) for col in df.columns]
df.groupBy().agg(*expr)
Solo un consiglio: Sembra che tu abbia ancora la mentalità di qualcuno che sta lavorando con i panda o R. Spark è un paradigma diverso nel modo in cui lavoriamo con i dati. Non si accede più ai dati all'interno delle singole celle, ora si lavora con blocchi interi. Se continui a collezionare roba e a fare azioni, come hai appena fatto, perdi l'intero concetto di parallelismo che la scintilla fornisce. Dai un'occhiata al concetto di trasformazioni e azioni in Spark.
Usare il comando seguente per ottenere una colonna indice che contiene numeri interi monotonicamente crescenti, univocieconsecutivi, che è non come funziona monotonically_increasing_id()
. Gli indici ascenderanno nello stesso ordine di colName
del tuo DataFrame.
import pyspark.sql.functions as F
from pyspark.sql.window import Window as W
window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)
df = df\
.withColumn('int', F.lit(1))\
.withColumn('index', F.sum('int').over(window))\
.drop('int')\
Usa il seguente codice per guardare la coda, o l'ultimo rownums
di DataFrame.
rownums = 10
df.where(F.col('index')>df.count()-rownums).show()
Utilizzare il codice seguente per esaminare le righe da start_row
a end_row
DataFrame.
start_row = 20
end_row = start_row + 10
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show()
zipWithIndex()
è un metodo RDD che restituisce numeri interi monotonicamente crescenti, univoci e consecutivi, ma sembra essere molto più lento da implementare in modo da poter tornare al tuo DataFrame originale modificato con una colonna id.