it-swarm-eu.dev

Come selezionare l'ultima riga e anche come accedere al dataframe PySpark per indice?

Da un dataframe PySpark come SQL 

name age city
abc   20  A
def   30  B

Come ottenere l'ultima riga. (Come da df.limit (1) posso ottenere la prima riga di dataframe in un nuovo dataframe).

E come posso accedere alle righe del dataframe per index.like riga no. 12 o 200.

In panda che posso fare

df.tail(1) # for last row
df.ix[rowno or index] # by index
df.loc[] or by df.iloc[]

Sono solo curioso di sapere come accedere al dataframe di pyspark in questi modi o in modi alternativi.

Grazie

7
Satya

Come ottenere l'ultima riga.

Lungo e brutto modo che presuppone che tutte le colonne sono oderable:

from pyspark.sql.functions import (
    col, max as max_, struct, monotonically_increasing_id
)

last_row = (df
    .withColumn("_id", monotonically_increasing_id())
    .select(max(struct("_id", *df.columns))
    .alias("tmp")).select(col("tmp.*"))
    .drop("_id"))

Se non tutte le colonne possono essere ordinate puoi provare:

with_id = df.withColumn("_id", monotonically_increasing_id())
i = with_id.select(max_("_id")).first()[0]

with_id.where(col("_id") == i).drop("_id")

Nota. Esiste la funzione last in pyspark.sql.functions`o.a.s.sql.functions ma considerando la descrizione delle espressioni corrispondenti non è una buona scelta qui.

come posso accedere alle righe del dataframe per index.like

Non puoi. Spark DataFrame e accessibile per indice. Puoi aggiungere indici usando zipWithIndex e filtrare successivamente. Tieni a mente questa operazione _/O(N)

5
zero323

Come ottenere l'ultima riga.

Se si dispone di una colonna che è possibile utilizzare per ordinare dataframe, ad esempio "indice", quindi un modo semplice per ottenere l'ultimo record utilizza SQL: 1) ordinare la tabella in ordine decrescente e 2) prendere 1o valore da questo ordine

df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM table_df ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec)
latest_rec.show()

E come posso accedere alle righe del dataframe per index.like riga no. 12 o 200.

Modo simile puoi ottenere record in qualsiasi linea

row_number = 12
df.createOrReplaceTempView("table_df")
query_latest_rec = """SELECT * FROM (select * from table_df ORDER BY index ASC limit {0}) ord_lim ORDER BY index DESC limit 1"""
latest_rec = self.sqlContext.sql(query_latest_rec.format(row_number))
latest_rec.show()

Se non hai la colonna "indice" puoi crearla usando

from pyspark.sql.functions import monotonically_increasing_id

df = df.withColumn("index", monotonically_increasing_id())
3
from pyspark.sql import functions as F

expr = [F.last(col).alias(col) for col in df.columns]

df.groupBy().agg(*expr)

Solo un consiglio: Sembra che tu abbia ancora la mentalità di qualcuno che sta lavorando con i panda o R. Spark è un paradigma diverso nel modo in cui lavoriamo con i dati. Non si accede più ai dati all'interno delle singole celle, ora si lavora con blocchi interi. Se continui a collezionare roba e a fare azioni, come hai appena fatto, perdi l'intero concetto di parallelismo che la scintilla fornisce. Dai un'occhiata al concetto di trasformazioni e azioni in Spark.

2

Usare il comando seguente per ottenere una colonna indice che contiene numeri interi monotonicamente crescenti, univocieconsecutivi, che è non come funziona monotonically_increasing_id(). Gli indici ascenderanno nello stesso ordine di colName del tuo DataFrame.

import pyspark.sql.functions as F
from pyspark.sql.window import Window as W

window = W.orderBy('colName').rowsBetween(W.unboundedPreceding, W.currentRow)

df = df\
 .withColumn('int', F.lit(1))\
 .withColumn('index', F.sum('int').over(window))\
 .drop('int')\

Usa il seguente codice per guardare la coda, o l'ultimo rownums di DataFrame.

rownums = 10
df.where(F.col('index')>df.count()-rownums).show()

Utilizzare il codice seguente per esaminare le righe da start_row a end_row DataFrame. 

start_row = 20
end_row = start_row + 10
df.where((F.col('index')>start_row) & (F.col('index')<end_row)).show()

zipWithIndex() è un metodo RDD che restituisce numeri interi monotonicamente crescenti, univoci e consecutivi, ma sembra essere molto più lento da implementare in modo da poter tornare al tuo DataFrame originale modificato con una colonna id.

0
Clay