PySpark Read.Parquet()

Pyspark Read Parquet



A PySpark, la funció write.parquet() escriu el DataFrame al fitxer parquet i read.parquet() llegeix el fitxer parquet al PySpark DataFrame o qualsevol altra font de dades. Per processar les columnes a Apache Spark de manera ràpida i eficient, hem de comprimir les dades. La compressió de dades estalvia la nostra memòria i totes les columnes es converteixen a nivell pla. Això vol dir que existeix l'emmagatzematge a nivell de columna plana. El fitxer que emmagatzema aquests es coneix com a fitxer PARQUET.

En aquesta guia, ens centrarem principalment a llegir/carregar el fitxer de parquet al PySpark DataFrame/SQL mitjançant la funció read.parquet() que està disponible a la classe pyspark.sql.DataFrameReader.

Tema de continguts:







Obteniu l'arxiu Parquet



Llegiu el fitxer Parquet al PySpark DataFrame



Llegiu el fitxer Parquet al PySpark SQL





Pyspark.sql.DataFrameReader.parquet()

Aquesta funció s'utilitza per llegir el fitxer parquet i carregar-lo al PySpark DataFrame. Pren el camí/nom del fitxer del fitxer parquet. Simplement podem utilitzar la funció read.parquet() ja que aquesta és la funció genèrica.

Sintaxi:



Vegem la sintaxi de read.parquet():

spark_app.read.parquet(nom_fitxer.parquet/camí)

Primer, instal·leu el mòdul PySpark mitjançant l'ordre pip:

pip instal·lar pyspark

Obteniu l'arxiu Parquet

Per llegir un fitxer de parquet, necessiteu les dades en què es genera el fitxer de parquet a partir d'aquestes dades. En aquesta part, veurem com generar un fitxer de parquet des del PySpark DataFrame.

Creem un PySpark DataFrame amb 5 registres i escrivim-ho al fitxer de parquet 'industry_parquet'.

importar pyspark

des de pyspark.sql importació SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# crear el marc de dades que emmagatzema els detalls de la indústria

industry_df = linuxhint_spark_app.createDataFrame([Row(Tipus= 'Agricultura' ,Àrea= 'EUA' ,
Valoració= 'calent' ,Total_empleats= 100 ),

Fila (Tipus= 'Agricultura' ,Àrea= 'Índia' ,Valoració= 'calent' ,Total_empleats= 200 ),

Fila (Tipus= 'Desenvolupament' ,Àrea= 'EUA' ,Valoració= 'Càlid' ,Total_empleats= 100 ),

Fila (Tipus= 'Educació' ,Àrea= 'EUA' ,Valoració= 'Guai' ,Total_empleats= 400 ),

Fila (Tipus= 'Educació' ,Àrea= 'EUA' ,Valoració= 'Càlid' ,Total_empleats= 20 )

])

# DataFrame real

industry_df.show()

# Escriviu l'industria_df al fitxer parquet

industria_df.coalesce( 1 ).write.parquet( 'parquet_indústria' )

Sortida:

Aquest és el DataFrame que conté 5 registres.

Es crea un fitxer de parquet per al DataFrame anterior. Aquí, el nostre nom de fitxer amb una extensió és 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet'. Utilitzem aquest fitxer a tot el tutorial.

Llegiu el fitxer Parquet al PySpark DataFrame

Tenim la fitxa de parquet. Llegim aquest fitxer amb la funció read.parquet() i carreguem-lo al PySpark DataFrame.

importar pyspark

des de pyspark.sql importació SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# Llegiu el fitxer parquet a l'objecte dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Mostra el dataframe_from_parquet-DataFrame

dataframe_from_parquet.show()

Sortida:

Mostrem el DataFrame mitjançant el mètode show() que es va crear a partir del fitxer parquet.

Consultes SQL amb fitxer Parquet

Després de carregar al DataFrame, pot ser possible crear les taules SQL i mostrar les dades que hi ha presents al DataFrame. Hem de crear una VISTA TEMPORAL i utilitzar les ordres SQL per retornar els registres del DataFrame que es crea a partir del fitxer parquet.

Exemple 1:

Creeu una vista temporal anomenada 'Sectors' i utilitzeu l'ordre SELECT per mostrar els registres al DataFrame. Podeu fer referència a això tutorial que explica com crear una VISTA a Spark - SQL.

importar pyspark

des de pyspark.sql importació SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# Llegiu el fitxer parquet a l'objecte dataframe_from_parquet.

dataframe_from_parquet=linuxhint_spark_app.read.parquet( 'part-00000-ff70f69d-f1fb-4450-b4b4-dfd5a8d6c7ad-c000.snappy.parquet' )

# Crea una vista des del fitxer de parquet anterior anomenat - 'Sectors'

dataframe_from_parquet.createOrReplaceTempView( 'Sectors' )

# Consulta per mostrar tots els registres dels sectors

linuxhint_spark_app.sql( 'seleccioneu * dels sectors' ).espectacle()

Sortida:

Exemple 2:

Utilitzant la VISTA anterior, escriviu la consulta SQL:

  1. Per mostrar tots els registres dels sectors que pertanyen a 'Índia'.
  2. Per mostrar tots els registres dels sectors amb un empleat superior a 100.
# Consulta per mostrar tots els registres dels sectors pertanyents a 'Índia'.

linuxhint_spark_app.sql( 'seleccioneu * dels sectors on Area='Índia'' ).espectacle()

# Consulta per mostrar tots els registres dels sectors amb empleats superiors a 100

linuxhint_spark_app.sql( 'seleccioneu * dels sectors on Total_empleats>100' ).espectacle()

Sortida:

Només hi ha un registre amb àrea que és 'Índia' i dos registres amb empleats superiors a 100.

Llegiu el fitxer Parquet al PySpark SQL

Primer, hem de crear una VISTA amb l'ordre CREATE. Utilitzant la paraula clau 'path' dins de la consulta SQL, podem llegir el fitxer parquet a l'Spark SQL. Després del camí, hem d'especificar el nom/ubicació del fitxer.

Sintaxi:

spark_app.sql( 'CREA UNA VISUALITZACIÓ TEMPORAL view_name AMB LES OPCIONS DE parquet (camí ' nom_fitxer.parquet ')' )

Exemple 1:

Creeu una vista temporal anomenada 'Sector2' i llegiu-hi el fitxer de parquet. Amb la funció sql(), escriviu la consulta de selecció per mostrar tots els registres que hi ha a la vista.

importar pyspark

des de pyspark.sql importació SparkSession,Row

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# Llegiu el fitxer de parquet a Spark-SQL

linuxhint_spark_app.sql( 'CREAR VISUALITZACIÓ TEMPORAL Sector2 AMB LES OPCIONS DE parquet (camí ' part-00000-ff70f69d-f1fb- 4450 -b4b4-dfd5a8d6c7ad-c000.snappy.parquet ')' )

# Consulta per mostrar tots els registres del Sector2

linuxhint_spark_app.sql( 'selecciona * del Sector2' ).espectacle()

Sortida:

Exemple 2:

Utilitzeu la VISTA anterior i escriviu la consulta per mostrar tots els registres amb la qualificació de 'Calent' o 'Cool'.

# Consulta per mostrar tots els registres del Sector2 amb qualificació: calent o fresc.

linuxhint_spark_app.sql( 'seleccioneu * del Sector2 on Rating='Hot' O Rating='Cool'' ).espectacle()

Sortida:

Hi ha tres registres amb la qualificació de 'Calent' o 'Cool'.

Conclusió

A PySpark, la funció write.parquet() escriu el DataFrame al fitxer parquet. La funció read.parquet() llegeix el fitxer de parquet al PySpark DataFrame o a qualsevol altra font de dades. Hem après a llegir el fitxer de parquet al PySpark DataFrame i a la taula PySpark. Com a part d'aquest tutorial, també vam parlar de com crear les taules des del PySpark DataFrame i filtrar les dades mitjançant la clàusula WHERE.