Pyspark.sql.DataFrameWriter.saveAsTable()
Primer, veurem com escriure el PySpark DataFrame existent a la taula mitjançant la funció write.saveAsTable(). Es necessita el nom de la taula i altres paràmetres opcionals com modes, partionBy, etc., per escriure el DataFrame a la taula. S'emmagatzema com a fitxer de parquet.
Sintaxi:
dataframe_obj.write.saveAsTable (camí/nom_taula, mode, partitionBy,...)
- El nom_taula és el nom de la taula que es crea a partir de dataframe_obj.
- Podem afegir/sobreescriure les dades de la taula utilitzant el paràmetre mode.
- El partitionBy pren les columnes simples o múltiples per crear particions basades en els valors d'aquestes columnes proporcionades.
Exemple 1:
Creeu un PySpark DataFrame amb 5 files i 4 columnes. Escriu aquest Dataframe en una taula anomenada 'Agri_Table1'.
importar pyspark
des de pyspark.sql importació SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()
# dades agrícoles amb 5 files i 5 columnes
agri =[{ 'Tipus_sòl' : 'Negre' , 'Disponibilitat_de_reg' : 'No' , 'Acres' : 2500 , 'Estat_sòl' : 'Sec' ,
'País' : 'EUA' },
{ 'Tipus_sòl' : 'Negre' , 'Disponibilitat_de_reg' : 'Sí' , 'Acres' : 3500 , 'Estat_sòl' : 'Mullat' ,
'País' : 'Índia' },
{ 'Tipus_sòl' : 'vermell' , 'Disponibilitat_de_reg' : 'Sí' , 'Acres' : 210 , 'Estat_sòl' : 'Sec' ,
'País' : 'UK' },
{ 'Tipus_sòl' : 'Altre' , 'Disponibilitat_de_reg' : 'No' , 'Acres' : 1000 , 'Estat_sòl' : 'Mullat' ,
'País' : 'EUA' },
{ 'Tipus_sòl' : 'Sorra' , 'Disponibilitat_de_reg' : 'No' , 'Acres' : 500 , 'Estat_sòl' : 'Sec' ,
'País' : 'Índia' }]
# creeu el marc de dades a partir de les dades anteriors
agri_df = linuxhint_spark_app.createDataFrame(agri)
agri_df.show()
# Escriviu el DataFrame anterior a la taula.
agri_df.coalesce( 1 ).write.saveAsTable( 'Agri_Taula1' )
Sortida:
Podem veure que es crea un fitxer de parquet amb les dades anteriors de PySpark.
Exemple 2:
Considereu el DataFrame anterior i escriviu 'Agri_Table2' a la taula particionant els registres segons els valors de la columna 'País'.
# Escriviu el DataFrame anterior a la taula amb el paràmetre partitionByagri_df.write.saveAsTable( 'Agri_Taula2' ,partitionBy=[ 'País' ])
Sortida:
Hi ha tres valors únics a la columna 'País': 'Índia', 'Regne Unit' i 'EUA'. Per tant, es creen tres particions. Cada mampara conté els fitxers de parquet.
Pyspark.sql.DataFrameReader.table()
Carreguem la taula al PySpark DataFrame mitjançant la funció spark.read.table(). Només pren un paràmetre que és el nom del camí/taula. Carrega directament la taula al PySpark DataFrame i totes les funcions SQL que s'apliquen al PySpark DataFrame també es poden aplicar en aquest DataFrame carregat.
Sintaxi:
spark_app.read.table (camí/'Nom_taula')En aquest escenari, utilitzem la taula anterior que es va crear a partir del PySpark DataFrame. Assegureu-vos que necessiteu implementar els fragments de codi de l'escenari anterior al vostre entorn.
Exemple:
Carregueu la taula 'Agri_Table1' al DataFrame anomenada 'loaded_data'.
loaded_data = linuxhint_spark_app.read.table ( 'Agri_Taula1' )loaded_data.show()
Sortida:
Podem veure que la taula es carrega al PySpark DataFrame.
Execució de consultes SQL
Ara, executem algunes consultes SQL al DataFrame carregat mitjançant la funció spark.sql().
# Utilitzeu l'ordre SELECT per mostrar totes les columnes de la taula anterior.linuxhint_spark_app.sql( 'SELECT * de Agri_Table1' ).espectacle()
# Clàusula ON
linuxhint_spark_app.sql( 'SELECT * de Agri_Table1 WHERE Soil_status='Dry'' ).espectacle()
linuxhint_spark_app.sql( 'SELECT * de Agri_Table1 WHERE Acres > 2000' ).espectacle()
Sortida:
- La primera consulta mostra totes les columnes i registres del DataFrame.
- La segona consulta mostra els registres basats en la columna 'Estat_sòl'. Només hi ha tres registres amb l'element 'Sec'.
- L'última consulta retorna dos registres amb 'Acres' que són superiors a 2000.
Pyspark.sql.DataFrameWriter.insertInto()
Mitjançant la funció insertInto(), podem afegir el DataFrame a la taula existent. Podem utilitzar aquesta funció juntament amb el selectExpr() per definir els noms de les columnes i després inserir-los a la taula. Aquesta funció també pren el tableName com a paràmetre.
Sintaxi:
DataFrame_obj.write.insertInto('Nom_taula')En aquest escenari, utilitzem la taula anterior que es va crear a partir del PySpark DataFrame. Assegureu-vos que necessiteu implementar els fragments de codi de l'escenari anterior al vostre entorn.
Exemple:
Creeu un nou DataFrame amb dos registres i inseriu-los a la taula 'Agri_Table1'.
importar pysparkdes de pyspark.sql importació SparkSession
linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()
# dades agrícoles amb 2 files
agri =[{ 'Tipus_sòl' : 'Sorra' , 'Disponibilitat_de_reg' : 'No' , 'Acres' : 2500 , 'Estat_sòl' : 'Sec' ,
'País' : 'EUA' },
{ 'Tipus_sòl' : 'Sorra' , 'Disponibilitat_de_reg' : 'No' , 'Acres' : 1200 , 'Estat_sòl' : 'Mullat' ,
'País' : 'Japó' }]
# creeu el marc de dades a partir de les dades anteriors
agri_df2 = linuxhint_spark_app.createDataFrame(agri)
agri_df2.show()
# write.insertInto()
agri_df2.selectExpr( 'Acres' , 'País' , 'Disponibilitat_de_reg' , 'Tipus_sòl' ,
'Estat_sòl' ).write.insertInto( 'Agri_Taula1' )
# Mostra l'última Agri_Table1
linuxhint_spark_app.sql( 'SELECT * de Agri_Table1' ).espectacle()
Sortida:
Ara, el nombre total de files presents al DataFrame és 7.
Conclusió
Ara enteneu com escriure el PySpark DataFrame a la taula mitjançant la funció write.saveAsTable(). Pren el nom de la taula i altres paràmetres opcionals. A continuació, vam carregar aquesta taula al PySpark DataFrame mitjançant la funció spark.read.table(). Només pren un paràmetre que és el nom del camí/taula. Si voleu afegir el nou DataFrame a la taula existent, utilitzeu la funció insertInto().