Com llegir i escriure dades d'una taula a PySpark

Com Llegir I Escriure Dades D Una Taula A Pyspark



El processament de dades a PySpark és més ràpid si les dades es carreguen en forma de taula. Amb això, utilitzant les Expressions SQL, el processament serà ràpid. Per tant, convertir el PySpark DataFrame/RDD en una taula abans d'enviar-lo per processar-lo és el millor enfocament. Avui veurem com llegir les dades de la taula al PySpark DataFrame, escriure el PySpark DataFrame a la taula i inserir un nou DataFrame a la taula existent mitjançant les funcions integrades. Som-hi!

Pyspark.sql.DataFrameWriter.saveAsTable()

Primer, veurem com escriure el PySpark DataFrame existent a la taula mitjançant la funció write.saveAsTable(). Es necessita el nom de la taula i altres paràmetres opcionals com modes, partionBy, etc., per escriure el DataFrame a la taula. S'emmagatzema com a fitxer de parquet.

Sintaxi:







dataframe_obj.write.saveAsTable (camí/nom_taula, mode, partitionBy,...)
  1. El nom_taula és el nom de la taula que es crea a partir de dataframe_obj.
  2. Podem afegir/sobreescriure les dades de la taula utilitzant el paràmetre mode.
  3. El partitionBy pren les columnes simples o múltiples per crear particions basades en els valors d'aquestes columnes proporcionades.

Exemple 1:

Creeu un PySpark DataFrame amb 5 files i 4 columnes. Escriu aquest Dataframe en una taula anomenada 'Agri_Table1'.



importar pyspark

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# dades agrícoles amb 5 files i 5 columnes

agri =[{ 'Tipus_sòl' : 'Negre' , 'Disponibilitat_de_reg' : 'No' , 'Acres' : 2500 , 'Estat_sòl' : 'Sec' ,
'País' : 'EUA' },

{ 'Tipus_sòl' : 'Negre' , 'Disponibilitat_de_reg' : 'Sí' , 'Acres' : 3500 , 'Estat_sòl' : 'Mullat' ,
'País' : 'Índia' },

{ 'Tipus_sòl' : 'vermell' , 'Disponibilitat_de_reg' : 'Sí' , 'Acres' : 210 , 'Estat_sòl' : 'Sec' ,
'País' : 'UK' },

{ 'Tipus_sòl' : 'Altre' , 'Disponibilitat_de_reg' : 'No' , 'Acres' : 1000 , 'Estat_sòl' : 'Mullat' ,
'País' : 'EUA' },

{ 'Tipus_sòl' : 'Sorra' , 'Disponibilitat_de_reg' : 'No' , 'Acres' : 500 , 'Estat_sòl' : 'Sec' ,
'País' : 'Índia' }]



# creeu el marc de dades a partir de les dades anteriors

agri_df = linuxhint_spark_app.createDataFrame(agri)

agri_df.show()

# Escriviu el DataFrame anterior a la taula.

agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Taula1' )

Sortida:







Podem veure que es crea un fitxer de parquet amb les dades anteriors de PySpark.



Exemple 2:

Considereu el DataFrame anterior i escriviu 'Agri_Table2' a la taula particionant els registres segons els valors de la columna 'País'.

# Escriviu el DataFrame anterior a la taula amb el paràmetre partitionBy

agri_df.write.saveAsTable( 'Agri_Taula2' ,partitionBy=[ 'País' ])

Sortida:

Hi ha tres valors únics a la columna 'País': 'Índia', 'Regne Unit' i 'EUA'. Per tant, es creen tres particions. Cada mampara conté els fitxers de parquet.

Pyspark.sql.DataFrameReader.table()

Carreguem la taula al PySpark DataFrame mitjançant la funció spark.read.table(). Només pren un paràmetre que és el nom del camí/taula. Carrega directament la taula al PySpark DataFrame i totes les funcions SQL que s'apliquen al PySpark DataFrame també es poden aplicar en aquest DataFrame carregat.

Sintaxi:

spark_app.read.table (camí/'Nom_taula')

En aquest escenari, utilitzem la taula anterior que es va crear a partir del PySpark DataFrame. Assegureu-vos que necessiteu implementar els fragments de codi de l'escenari anterior al vostre entorn.

Exemple:

Carregueu la taula 'Agri_Table1' al DataFrame anomenada 'loaded_data'.

loaded_data = linuxhint_spark_app.read.table ( 'Agri_Taula1' )

loaded_data.show()

Sortida:

Podem veure que la taula es carrega al PySpark DataFrame.

Execució de consultes SQL

Ara, executem algunes consultes SQL al DataFrame carregat mitjançant la funció spark.sql().

# Utilitzeu l'ordre SELECT per mostrar totes les columnes de la taula anterior.

linuxhint_spark_app.sql( 'SELECT * de Agri_Table1' ).espectacle()

# Clàusula ON

linuxhint_spark_app.sql( 'SELECT * de Agri_Table1 WHERE Soil_status='Dry'' ).espectacle()

linuxhint_spark_app.sql( 'SELECT * de Agri_Table1 WHERE Acres > 2000' ).espectacle()

Sortida:

  1. La primera consulta mostra totes les columnes i registres del DataFrame.
  2. La segona consulta mostra els registres basats en la columna 'Estat_sòl'. Només hi ha tres registres amb l'element 'Sec'.
  3. L'última consulta retorna dos registres amb 'Acres' que són superiors a 2000.

Pyspark.sql.DataFrameWriter.insertInto()

Mitjançant la funció insertInto(), podem afegir el DataFrame a la taula existent. Podem utilitzar aquesta funció juntament amb el selectExpr() per definir els noms de les columnes i després inserir-los a la taula. Aquesta funció també pren el tableName com a paràmetre.

Sintaxi:

DataFrame_obj.write.insertInto('Nom_taula')

En aquest escenari, utilitzem la taula anterior que es va crear a partir del PySpark DataFrame. Assegureu-vos que necessiteu implementar els fragments de codi de l'escenari anterior al vostre entorn.

Exemple:

Creeu un nou DataFrame amb dos registres i inseriu-los a la taula 'Agri_Table1'.

importar pyspark

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# dades agrícoles amb 2 files

agri =[{ 'Tipus_sòl' : 'Sorra' , 'Disponibilitat_de_reg' : 'No' , 'Acres' : 2500 , 'Estat_sòl' : 'Sec' ,
'País' : 'EUA' },

{ 'Tipus_sòl' : 'Sorra' , 'Disponibilitat_de_reg' : 'No' , 'Acres' : 1200 , 'Estat_sòl' : 'Mullat' ,
'País' : 'Japó' }]

# creeu el marc de dades a partir de les dades anteriors

agri_df2 = linuxhint_spark_app.createDataFrame(agri)

agri_df2.show()

# write.insertInto()

agri_df2.selectExpr( 'Acres' , 'País' , 'Disponibilitat_de_reg' , 'Tipus_sòl' ,
'Estat_sòl' ).write.insertInto( 'Agri_Taula1' )

# Mostra l'última Agri_Table1

linuxhint_spark_app.sql( 'SELECT * de Agri_Table1' ).espectacle()

Sortida:

Ara, el nombre total de files presents al DataFrame és 7.

Conclusió

Ara enteneu com escriure el PySpark DataFrame a la taula mitjançant la funció write.saveAsTable(). Pren el nom de la taula i altres paràmetres opcionals. A continuació, vam carregar aquesta taula al PySpark DataFrame mitjançant la funció spark.read.table(). Només pren un paràmetre que és el nom del camí/taula. Si voleu afegir el nou DataFrame a la taula existent, utilitzeu la funció insertInto().