PySpark Pandas_Udf()

Pyspark Pandas Udf



La transformació del PySpark DataFrame és possible mitjançant la funció pandas_udf(). És una funció definida per l'usuari que s'aplica al PySpark DataFrame amb fletxa. Podem realitzar les operacions vectoritzades utilitzant pandas_udf(). Es pot implementar passant aquesta funció com a decorador. Submergem-nos en aquesta guia per conèixer la sintaxi, els paràmetres i diferents exemples.

Tema de continguts:

Si voleu conèixer la instal·lació del mòdul i el DataFrame de PySpark, feu-ho article .







Pyspark.sql.functions.pandas_udf()

El pandas_udf () està disponible al mòdul sql.functions de PySpark que es pot importar mitjançant la paraula clau 'from'. S'utilitza per realitzar les operacions vectoritzades al nostre PySpark DataFrame. Aquesta funció s'implementa com un decorador passant tres paràmetres. Després d'això, podem crear una funció definida per l'usuari que retorni les dades en format vectorial (com si fem servir series/NumPy per a això) mitjançant una fletxa. Dins d'aquesta funció, podem retornar el resultat.



Estructura i sintaxi:



Primer, mirem l'estructura i la sintaxi d'aquesta funció:

@pandas_udf (tipus de dades)
def nom_funció (operació) -> format_convertir:
declaració de retorn

Aquí, el nom_funció és el nom de la nostra funció definida. El tipus de dades especifica el tipus de dades que retorna aquesta funció. Podem tornar el resultat amb la paraula clau 'retorn'. Totes les operacions es realitzen dins de la funció amb l'assignació de fletxa.





Pandas_udf (Funció i Tipus de retorn)

  1. El primer paràmetre és la funció definida per l'usuari que se li passa.
  2. El segon paràmetre s'utilitza per especificar el tipus de dades de retorn de la funció.

Dades:

En tota aquesta guia, només fem servir un PySpark DataFrame per a la demostració. Totes les funcions definides per l'usuari que definim s'apliquen en aquest PySpark DataFrame. Assegureu-vos de crear aquest DataFrame al vostre entorn primer després de la instal·lació de PySpark.



importar pyspark

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

des de pyspark.sql.functions importar pandas_udf

des de la importació de pyspark.sql.types *

importar pandes com a panda

# Detalls vegetals

vegetal =[{ 'tipus' : 'vegetal' , 'nom' : 'tomàquet' , 'localitzar_país' : 'EUA' , 'quantitat' : 800 },

{ 'tipus' : 'fruita' , 'nom' : 'plàtan' , 'localitzar_país' : 'XINA' , 'quantitat' : 20 },

{ 'tipus' : 'vegetal' , 'nom' : 'tomàquet' , 'localitzar_país' : 'EUA' , 'quantitat' : 800 },

{ 'tipus' : 'vegetal' , 'nom' : 'Mango' , 'localitzar_país' : 'JAPÓ' , 'quantitat' : 0 },

{ 'tipus' : 'fruita' , 'nom' : 'llimona' , 'localitzar_país' : 'ÍNDIA' , 'quantitat' : 1700 },

{ 'tipus' : 'vegetal' , 'nom' : 'tomàquet' , 'localitzar_país' : 'EUA' , 'quantitat' : 1200 },

{ 'tipus' : 'vegetal' , 'nom' : 'Mango' , 'localitzar_país' : 'JAPÓ' , 'quantitat' : 0 },

{ 'tipus' : 'fruita' , 'nom' : 'llimona' , 'localitzar_país' : 'ÍNDIA' , 'quantitat' : 0 }

]

# creeu el marc de dades del mercat a partir de les dades anteriors

market_df = linuxhint_spark_app.createDataFrame (vegetal)

market_df.show()

Sortida:

Aquí, creem aquest DataFrame amb 4 columnes i 8 files. Ara, utilitzem pandas_udf() per crear les funcions definides per l'usuari i aplicar-les a aquestes columnes.

Pandas_udf() amb diferents tipus de dades

En aquest escenari, creem algunes funcions definides per l'usuari amb pandas_udf() i les apliquem a les columnes i mostrem els resultats mitjançant el mètode select(). En cada cas, fem servir els pandas.Series mentre realitzem les operacions vectoritzades. Això considera els valors de la columna com una matriu unidimensional i l'operació s'aplica a la columna. Al decorador mateix, especifiquem el tipus de retorn de la funció.

Exemple 1: Pandas_udf() amb tipus de cadena

Aquí, creem dues funcions definides per l'usuari amb el tipus de retorn de cadena per convertir els valors de la columna del tipus de cadena en majúscules i minúscules. Finalment, apliquem aquestes funcions a les columnes 'type' i 'locate_country'.

# Converteix la columna tipus en majúscules amb pandas_udf

@pandas_udf(StringType())

def type_upper_case(i: panda.Series) -> panda.Series:

retornar i.str.superior()

# Converteix la columna locate_country en minúscules amb pandas_udf

@pandas_udf(StringType())

def country_lower_case(i: panda.Series) -> panda.Series:

retorna i.str.lower()

# Mostra les columnes amb select()

market_df.select( 'tipus' ,tipus_majúscules( 'tipus' ), 'localitzar_país' ,
país_minúscula( 'localitzar_país' )).espectacle()

Sortida:

Explicació:

La funció StringType() està disponible al mòdul pyspark.sql.types. Ja hem importat aquest mòdul mentre creàvem el PySpark DataFrame.

  1. Primer, UDF (funció definida per l'usuari) retorna les cadenes en majúscules mitjançant la funció str.upper(). El str.upper() està disponible a l'estructura de dades de la sèrie (ja que estem convertint a sèrie amb una fletxa dins de la funció) que converteix la cadena donada en majúscules. Finalment, aquesta funció s'aplica a la columna 'tipus' que s'especifica dins del mètode select(). Anteriorment, totes les cadenes de la columna de tipus estan en minúscula. Ara, es canvien a majúscules.
  2. En segon lloc, l'UDF torna les cadenes en majúscules mitjançant la funció str.lower(). El str.lower() està disponible a l'estructura de dades de la sèrie que converteix la cadena donada a minúscules. Finalment, aquesta funció s'aplica a la columna 'tipus' que s'especifica dins del mètode select(). Anteriorment, totes les cadenes de la columna de tipus estan en majúscules. Ara, es canvien a minúscules.

Exemple 2: Pandas_udf() amb tipus d'enter

Creem una UDF que converteixi la columna entera de PySpark DataFrame a la sèrie Pandas i afegim 100 a cada valor. Passeu la columna 'quantitat' a aquesta funció dins del mètode select().

#Afegiu 100

@pandas_udf(IntegerType())

def add_100(i: panda.Series) -> panda.Series:

retorna i+ 100

# Passeu la columna de quantitat a la funció anterior i visualitzeu-la.

market_df.select( 'quantitat' ,afegiu_100( 'quantitat' )).espectacle()

Sortida:

Explicació:

Dins de l'UDF, iterem tots els valors i els convertim a Sèries. Després d'això, afegim 100 a cada valor de la sèrie. Finalment, passem la columna 'quantitat' a aquesta funció i podem veure que s'afegeix 100 a tots els valors.

Pandas_udf() amb diferents tipus de dades utilitzant Groupby() i Agg()

Vegem els exemples per passar l'UDF a les columnes agregades. Aquí, els valors de les columnes s'agrupen primer mitjançant la funció groupby() i l'agregació es fa amb la funció agg(). Passem la nostra UDF dins d'aquesta funció agregada.

Sintaxi:

pyspark_dataframe_object.groupby( 'columna_agrupació' ).agg(UDF
(pyspark_dataframe_object[ 'columna' ]))

Aquí, els valors de la columna d'agrupació s'agrupen primer. A continuació, es fa l'agregació de cada dada agrupada respecte a la nostra UDF.

Exemple 1: Pandas_udf() amb Mitjana agregada()

Aquí, creem una funció definida per l'usuari amb un tipus de retorn flotant. Dins de la funció, calculem la mitjana mitjançant la funció mean(). Aquesta UDF es passa a la columna 'quantitat' per obtenir la quantitat mitjana de cada tipus.

# retorna la mitjana/mitjana

@pandas_udf( 'flotar' )

def average_function(i: panda.Series) -> float:

retorn i.mean()

# Passeu la columna de quantitat a la funció agrupant la columna de tipus.

market_df.groupby( 'tipus' ).agg(funció_mitjana(market_df[ 'quantitat' ])).espectacle()

Sortida:

Estem agrupant en funció dels elements de la columna 'tipus'. Es formen dos grups: 'fruita' i 'verdura'. Per a cada grup, es calcula i es retorna la mitjana.

Exemple 2: Pandas_udf() amb Agregate Max() i Min()

Aquí, creem dues funcions definides per l'usuari amb el tipus de retorn enter (int). La primera UDF retorna el valor mínim i la segona UDF retorna el valor màxim.

# pandas_udf que retornen el valor mínim

@pandas_udf( 'int' )

def min_(i: panda.Series) -> int:

retorna i.min()

# pandas_udf que retornen el valor màxim

@pandas_udf( 'int' )

def max_(i: panda.Series) -> int:

retorna i.max()

# Passeu la columna de quantitat al min_ pandas_udf agrupant locate_country.

market_df.groupby( 'localitzar_país' ).agg(min_(market_df[ 'quantitat' ])).espectacle()

# Passeu la columna de quantitat al max_ pandas_udf agrupant locate_country.

market_df.groupby( 'localitzar_país' ).agg(max_(market_df[ 'quantitat' ])).espectacle()

Sortida:

Per retornar els valors mínims i màxims, utilitzem les funcions min() i max() en el tipus de retorn de les UDF. Ara, agrupem les dades a la columna 'localitzar_país'. Es formen quatre grups (“XINA”, “ÍNDIA”, “JAPÓ”, “EUA”). Per a cada grup, retornem la quantitat màxima. De la mateixa manera, retornem la quantitat mínima.

Conclusió

Bàsicament, pandas_udf () s'utilitza per realitzar les operacions vectoritzades al nostre PySpark DataFrame. Hem vist com crear el pandas_udf() i aplicar-lo al PySpark DataFrame. Per a una millor comprensió, vam discutir els diferents exemples tenint en compte tots els tipus de dades (cadena, flotant i enter). Pot ser possible utilitzar pandas_udf() amb groupby() mitjançant la funció agg().