Vorrei aggiungere una nuova colonna su dataframe "df" dalla funzione get_distance
:
def get_distance(x, y):
dfDistPerc = hiveContext.sql("select column3 as column3, \
from tab \
where column1 = '" + x + "' \
and column2 = " + y + " \
limit 1")
result = dfDistPerc.select("column3").take(1)
return result
df = df.withColumn(
"distance",
lit(get_distance(df["column1"], df["column2"]))
)
Ma, ho capito:
TypeError: 'Column' object is not callable
Penso che succeda perché x e y sono oggetti Column
e ho bisogno di essere convertito in String
da usare nella mia query. Ho ragione? Se è così, come posso fare questo?
Non è possibile utilizzare direttamente la funzione Python su oggetti Column
, a meno che non sia intesa per operare su oggetti/espressioni Column
. Hai bisogno di udf
per questo:
@udf
def get_distance(x, y):
...
Ma non è possibile utilizzare SQLContext
in udf (o mapper in generale).
Solo join
:
tab = hiveContext.table("tab").groupBy("column1", "column2").agg(first("column3"))
df.join(tab, ["column1", "column2"])
Spark dovrebbe sapere che la funzione che stai usando non è una funzione ordinaria, ma l'UDF.
Quindi, ci sono 2 modi in cui possiamo usare l'UDF sui dataframes.
Metodo 1: con annotazione @udf
@udf
def get_distance(x, y):
dfDistPerc = hiveContext.sql("select column3 as column3, \
from tab \
where column1 = '" + x + "' \
and column2 = " + y + " \
limit 1")
result = dfDistPerc.select("column3").take(1)
return result
df = df.withColumn(
"distance",
lit(get_distance(df["column1"], df["column2"]))
)
Metodo 2: riavviare udf con pyspark.sql.functions.udf
def get_distance(x, y):
dfDistPerc = hiveContext.sql("select column3 as column3, \
from tab \
where column1 = '" + x + "' \
and column2 = " + y + " \
limit 1")
result = dfDistPerc.select("column3").take(1)
return result
calculate_distance_udf = udf(get_distance, IntegerType())
df = df.withColumn(
"distance",
lit(calculate_distance_udf(df["column1"], df["column2"]))
)