PySpark Llegir JSON()

Pyspark Llegir Json



Quan es treballa amb PySpark DataFrames, s'ha d'emmagatzemar al PySpark DataFrame si voleu processar les dades JSON. Després d'emmagatzemar al DataFrame, podem aplicar les diferents operacions i mètodes a les dades. A més, hi ha molts avantatges si convertim JSON a PySpark DataFrame, ja que és senzill i podem transformar/particionar les dades d'una manera més senzilla.

Tema de continguts:

Llegint JSON al PySpark DataFrame mitjançant Pandas.read_json()







Llegint JSON a PySpark DataFrame mitjançant Spark.read.json()



Llegint JSON a PySpark DataFrame mitjançant el PySpark SQL



En aquest tutorial, veurem com llegir JSON al PySpark DataFrame mitjançant pandas.read_json(), spark.read.json() i spark.sql. En tots els escenaris, veurem els diferents exemples tenint en compte els diferents formats JSON.





Instal·leu la biblioteca PySpark abans d'implementar els exemples següents.

pip instal·lar pyspark

Després de la instal·lació correcta, podeu veure la sortida de la següent manera:



Llegint JSON al PySpark DataFrame mitjançant Pandas.read_json()

A PySpark, el mètode createDataFrame() s'utilitza per crear el DataFrame directament. Aquí, només hem de passar el fitxer/ruta JSON al fitxer JSON mitjançant el mètode pandas.read_json(). Aquest mètode read_json() pren el nom del fitxer/ruta que està disponible al mòdul Pandas. Per això és necessari importar i utilitzar el mòdul Pandas.

Sintaxi:

spark_app.createDataFrame(pandas.read_json( 'nom_fitxer.json' ))

Exemple:

Creem un fitxer JSON anomenat 'student_skill.json' que conté 2 registres. Aquí, les claus/columnes són 'Estudiant 1' i 'Estudiant 2'. Les files són nom, edat, habilitat1 i habilitat2.

importar pyspark

importar pandes

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# Utilitzant pandas.read_json()

candidate_skills = linuxhint_spark_app.createDataFrame(pandas.read_json( 'student_skill.json' ))

candidate_skills.show()

Sortida:

Podem veure que les dades JSON es converteixen a PySpark DataFrame amb columnes i files especificades.

2. Llegir JSON a PySpark DataFrame mitjançant Spark.read.json()

El read.json() és un mètode similar a read_json() a Pandas. Aquí, read.json() pren un camí a JSON o directament al fitxer JSON i el carrega directament al PySpark DataFrame. No cal utilitzar el mètode createDataFrame() en aquest escenari. Si voleu llegir diversos fitxers JSON alhora, hem de passar una llista de noms de fitxers JSON a través d'una llista separada per comes. Tots els registres JSON s'emmagatzemen en un únic DataFrame.

Sintaxi:

Fitxer únic - spark_app.read.json( 'nom_fitxer.json' )

Diversos fitxers - spark_app.read.json([ 'fitxer1.json' , 'fitxer2.json' ,...])

Escenari 1: llegiu JSON que té una línia única

Si el vostre fitxer JSON està en els formats record1, record2, record3... (línia única), podem anomenar-lo com a JSON amb línies simples. Spark processa aquests registres i els emmagatzema al PySpark DataFrame com a files. Cada registre és una fila del PySpark DataFrame.

Creem un fitxer JSON anomenat 'candidate_skills.json' que conté 3 registres. Llegiu aquest JSON al PySpark DataFrame.

importar pyspark

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# Llegiu candidate_skills.json al PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )

candidate_skills.show()

Sortida:

Podem veure que les dades JSON es converteixen a PySpark DataFrame amb registres i noms de columnes especificats.

Escenari 2: llegiu JSON amb diverses línies

Si el vostre fitxer JSON té diverses línies, heu d'utilitzar el mètode read.option().json() per passar el paràmetre multilínia que s'ha d'establir com a true. Això ens permet carregar JSON amb diverses línies al PySpark DataFrame.

read.option( 'multilínia' , 'veritat' ).json( 'nom_fitxer.json' )

Creem un fitxer JSON anomenat 'multi.json' que conté 3 registres. Llegiu aquest JSON al PySpark DataFrame.

importar pyspark

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# Llegiu multi.json (que té diverses línies) al PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.option( 'multilínia' , 'veritat' ).json( 'multi.json' )

candidate_skills.show()

Sortida:

Escenari 3: llegiu múltiples JSON

Ja vam parlar a la fase inicial d'aquest tutorial sobre diversos fitxers JSON. Si voleu llegir diversos fitxers JSON alhora i emmagatzemar-los en un únic PySpark DataFrame, hem de passar una llista de noms de fitxers al mètode read.json().

Creem dos fitxers JSON anomenats 'candidate_skills.json' i 'candidate_skills2.json' i els carreguem al PySpark DataFrame.

El fitxer 'candidate_skills.json' conté tres registres.

El fitxer 'candidate_skill2.json' només conté un únic registre.

importar pyspark

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# Llegiu els fitxers candidate_skills i candidate_skills2 alhora al PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.json([ 'candidate_skills.json' , 'candidate_skills2.json' ])

candidate_skills.show()

Sortida:

Finalment, el DataFrame té quatre registres. Els tres primers registres pertanyen al primer JSON i els últims registres pertanyen al segon JSON.

Llegint JSON a PySpark DataFrame mitjançant Spark.read.json()

El read.json() és un mètode similar a read_json() a Pandas. Aquí, read.json() pren un camí a JSON o directament al fitxer JSON i el carrega directament al PySpark DataFrame. No cal utilitzar el mètode createDataFrame() en aquest escenari. Si voleu llegir diversos fitxers JSON alhora, hem de passar una llista de noms de fitxers JSON a través d'una llista separada per comes. Tots els registres JSON s'emmagatzemen en un únic DataFrame.

Sintaxi:

Fitxer únic - spark_app.read.json( 'nom_fitxer.json' )

Diversos fitxers - spark_app.read.json([ 'fitxer1.json' , 'fitxer2.json' ,...])

Escenari 1: llegiu JSON que té una línia única

Si el vostre fitxer JSON està en format record1, record2, record3... (línia única), podem anomenar-lo com a JSON amb línies simples. Spark processa aquests registres i els emmagatzema al PySpark DataFrame com a files. Cada registre és una fila del PySpark DataFrame.

Creem un fitxer JSON anomenat 'candidate_skills.json' que conté 3 registres. Llegiu aquest JSON al PySpark DataFrame.

importar pyspark

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# Llegiu candidate_skills.json al PySpark DataFrame

candidate_skills = linuxhint_spark_app.read.json( 'candidate_skills.json' )

candidate_skills.show()

Sortida:

Podem veure que les dades JSON es converteixen a PySpark DataFrame amb registres i noms de columnes especificats.

Llegint JSON a PySpark DataFrame mitjançant el PySpark SQL

Pot ser possible crear una vista temporal de les nostres dades JSON mitjançant el PySpark SQL. Directament, podem proporcionar el JSON en el moment de crear la vista temporal. Mireu la sintaxi següent. Després d'això, podem utilitzar l'ordre SELECT per mostrar el PySpark DataFrame.

Sintaxi:

spark_app.sql( 'CREA LA VISUALITZACIÓ TEMPORAL DE LA VIEW_NAME AMB LES OPCIONS JSON (camí 'nom_fitxer.json')' )

Aquí, 'VIEW_NAME' és la vista de les dades JSON i 'file_name' és el nom del fitxer JSON.

Exemple 1:

Considereu el fitxer JSON que s'utilitza als exemples anteriors: 'candidate_skills.json'. Seleccioneu totes les files del DataFrame mitjançant SELECT amb l'operador '*'. Aquí, * selecciona totes les columnes del PySpark DataFrame.

importar pyspark

importar pandes

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# Utilitzant spark.sql per crear VIEW des del JSON

candidate_skills = linuxhint_spark_app.sql( 'CREA UNA VISUALITZACIÓ TEMPORAL Candidate_data AMB LES OPCIONS JSON (camí 'candidate_skills.json')' )

# Utilitzeu la consulta SELECT per seleccionar tots els registres de Candidate_data.

linuxhint_spark_app.sql( 'SELECT * de Candidate_data' ).espectacle()

Sortida:

El nombre total de registres al PySpark DataFrame (llegit des de JSON) és 3.

Exemple 2:

Ara, filtreu els registres del PySpark DataFrame en funció de la columna d'edat. Utilitzeu l'operador 'més gran que' a l'edat per obtenir les files amb una edat superior a 22 anys.

# Utilitzeu la consulta SELECT per seleccionar registres amb edat > 22.

linuxhint_spark_app.sql( 'SELECT * de Candidate_data on edat> 22' ).espectacle()

Sortida:

Només hi ha un registre al PySpark DataFrame amb una edat superior als 22 anys.

Conclusió

Hem après les tres maneres diferents de llegir el JSON al PySpark DataFrame. Primer, vam aprendre a utilitzar el mètode read_json() disponible al mòdul Pandas per llegir JSON a PySpark DataFrame. A continuació, vam aprendre a llegir els fitxers JSON d'una o diverses línies mitjançant el mètode spark.read.json() amb option(). Per llegir diversos fitxers JSON alhora, hem de passar una llista de noms de fitxers a aquest mètode. Amb PySpark SQL, el fitxer JSON es llegeix a la vista temporal i el DataFrame es mostra mitjançant la consulta SELECT.