Conversió de PySpark DataFrame a JSON

Conversio De Pyspark Dataframe A Json



La transmissió de dades estructurades mitjançant JSON és possible i també consumeix poca memòria. En comparació amb PySpark RDD o PySpark DataFrame, JSON consumeix poca memòria i serialització que és possible amb JSON. Podem convertir el PySpark DataFrame a JSON mitjançant el mètode pyspark.sql.DataFrameWriter.json(). A part d'això, hi ha altres dues maneres de convertir el DataFrame a JSON.

Tema de continguts:

Considerem un PySpark DataFrame senzill en tots els exemples i el convertim a JSON mitjançant les funcions esmentades.







Mòdul requerit:

Instal·leu la biblioteca PySpark al vostre entorn si encara no està instal·lada. Podeu consultar l'ordre següent per instal·lar-lo:



pip instal·lar pyspark

PySpark DataFrame a JSON utilitzant To_json() amb ToPandas()

El mètode to_json() està disponible al mòdul Pandas que converteix el Pandas DataFrame a JSON. Podem utilitzar aquest mètode si convertim el nostre PySpark DataFrame a Pandas DataFrame. Per convertir el PySpark DataFrame a Pandas DataFrame, s'utilitza el mètode toPandas(). Vegem la sintaxi de to_json() juntament amb els seus paràmetres.



Sintaxi:





dataframe_object.toPandas().to_json(orient,index,...)
  1. Orient s'utilitza per mostrar el JSON convertit com el format desitjat. Pren 'registres', 'taula', 'valors', 'columnes', 'índex', 'divisió'.
  2. L'índex s'utilitza per incloure/eliminar l'índex de la cadena JSON convertida. Si s'estableix a 'Verdader', es mostren els índexs. En cas contrari, els índexs no es mostraran si l'orient és 'dividit' o 'taula'.

Exemple 1: Orienta com a 'Registres'

Creeu un PySpark DataFrame 'skills_df' amb 3 files i 4 columnes. Converteix aquest DataFrame a JSON especificant el paràmetre orient com a 'registres'.

importar pyspark

importar pandes

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# dades d'habilitats amb 3 files i 4 columnes

habilitats =[{ 'identificador' : 123 , 'persona' : 'mel' , 'habilitat' : 'pintura' , 'premi' : 25000 },

{ 'identificador' : 112 , 'persona' : 'Mouni' , 'habilitat' : 'dansa' , 'premi' : 2000 },

{ 'identificador' : 153 , 'persona' : 'Tulasi' , 'habilitat' : 'lectura' , 'premi' : 1200 }

]

# creeu el marc de dades d'habilitats a partir de les dades anteriors

skills_df = linuxhint_spark_app.createDataFrame(habilitats)

# Dades d'habilitats reals

skills_df.show()

# Converteix a JSON utilitzant to_json() amb orient com a 'registres'

json_skills_data = skills_df.toPandas().to_json(orient= 'registres' )

imprimir(json_skills_data)

Sortida:



+---+------+-----+--------+

| id|persona|premi| habilitat|

+---+------+-----+--------+

| 123 | mel| 25000 |pintura|

| 112 | Mouni| 2000 | dansa|

| 153 |Tulasi| 1200 | lectura|

+---+------+-----+--------+

[{ 'id' : 123 , 'persona' : 'mel' , 'premi' : 25000 , 'habilitat' : 'pintura' },{ 'id' : 112 , 'persona' : 'Mouni' , 'premi' : 2000 , 'habilitat' : 'dansa' },{ 'id' : 153 , 'persona' : 'Tulasi' , 'premi' : 1200 , 'habilitat' : 'lectura' }]

Podem veure que el PySpark DataFrame es converteix a la matriu JSON amb un diccionari de valors. Aquí, les claus representen el nom de la columna i el valor representa el valor de fila/cel·la al PySpark DataFrame.

Exemple 2: Orienta com a 'Divisió'

El format JSON que retorna l'orientació 'divisió' inclou els noms de columnes que tenen una llista de columnes, una llista d'índexs i una llista de dades. El següent és el format de l'orient 'dividit'.

# Converteix a JSON utilitzant to_json() amb orient com a 'split'

json_skills_data = skills_df.toPandas().to_json(orient= 'dividir' )

imprimir(json_skills_data)

Sortida:

{ 'columnes' :[ 'id' , 'persona' , 'premi' , 'habilitat' ], 'índex' :[ 0 , 1 , 2 ], 'dades' :[[ 123 , 'mel' , 25000 , 'pintura' ],[ 112 , 'Mouni' , 2000 , 'dansa' ],[ 153 , 'Tulasi' , 1200 , 'lectura' ]]}

Exemple 3: Orienta com a 'índex'

Aquí, cada fila del PySpark DataFrame es retira en forma de diccionari amb la clau com a nom de columna. Per a cada diccionari, la posició de l'índex s'especifica com a clau.

# Converteix a JSON utilitzant to_json() amb orient com a 'índex'

json_skills_data = skills_df.toPandas().to_json(orient= 'índex' )

imprimir(json_skills_data)

Sortida:

{ '0' :{ 'id' : 123 , 'persona' : 'mel' , 'premi' : 25000 , 'habilitat' : 'pintura' }, '1' :{ 'id' : 112 , 'persona' : 'Mouni' , 'premi' : 2000 , 'habilitat' : 'dansa' }, '2' :{ 'id' : 153 , 'persona' : 'Tulasi' , 'premi' : 1200 , 'habilitat' : 'lectura' }}

Exemple 4: Orienta com a 'Columnes'

Les columnes són la clau de cada registre. Cada columna conté un diccionari que pren els valors de la columna amb números d'índex.

# Converteix a JSON utilitzant to_json() amb orient com a 'columnes'

json_skills_data = skills_df.toPandas().to_json(orient= 'columnes' )

imprimir(json_skills_data)

Sortida:

{ 'id' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'persona' :{ '0' : 'mel' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'premi' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'habilitat' :{ '0' : 'pintura' , '1' : 'dansa' , '2' : 'lectura' }}

Exemple 5: Orientar com a 'Valors'

Si només necessiteu els valors en JSON, podeu optar per l'orientació 'valors'. Mostra cada fila en una llista. Finalment, totes les llistes s'emmagatzemen en una llista. Aquest JSON és del tipus de llista imbricada.

# Converteix a JSON utilitzant to_json() amb orient com a 'valors'

json_skills_data = skills_df.toPandas().to_json(orient= 'valors' )

imprimir(json_skills_data)

Sortida:

[[ 123 , 'mel' , 25000 , 'pintura' ],[ 112 , 'Mouni' , 2000 , 'dansa' ],[ 153 , 'Tulasi' , 1200 , 'lectura' ]]

Exemple 6: Orientar com a 'Taula'

L'orientació 'taula' retorna el JSON que inclou l'esquema amb noms de camps juntament amb els tipus de dades de columna, l'índex com a clau primària i la versió Pandas. Els noms de columnes amb valors es mostren com a 'dades'.

# Converteix a JSON utilitzant to_json() amb orient com a 'taula'

json_skills_data = skills_df.toPandas().to_json(orient= 'taula' )

imprimir(json_skills_data)

Sortida:

{ 'esquema' :{ 'camps' :[{ 'nom' : 'índex' , 'tipus' : 'enter' },{ 'nom' : 'id' , 'tipus' : 'enter' },{ 'nom' : 'persona' , 'tipus' : 'cadena' },{ 'nom' : 'premi' , 'tipus' : 'enter' },{ 'nom' : 'habilitat' , 'tipus' : 'cadena' }], 'clau primària' :[ 'índex' ], 'versió_pandas' : '1.4.0' }, 'dades' :[{ 'índex' : 0 , 'id' : 123 , 'persona' : 'mel' , 'premi' : 25000 , 'habilitat' : 'pintura' },{ 'índex' : 1 , 'id' : 112 , 'persona' : 'Mouni' , 'premi' : 2000 , 'habilitat' : 'dansa' },{ 'índex' : 2 , 'id' : 153 , 'persona' : 'Tulasi' , 'premi' : 1200 , 'habilitat' : 'lectura' }]}

Exemple 7: amb el paràmetre d'índex

Primer, passem el paràmetre d'índex configurant-lo a 'True'. Veureu per a cada valor de columna que la posició de l'índex es retorna com a clau en un diccionari.

A la segona sortida, només es retornen els noms de les columnes ('columnes') i els registres ('dades') sense les posicions de l'índex, ja que l'índex està definit com a 'Fals'.

# Converteix a JSON utilitzant to_json() amb index=True

json_skills_data = skills_df.toPandas().to_json(index=True)

print(json_skills_data, ' \n ' )

# Converteix a JSON utilitzant to_json() amb index=False

json_skills_data= skills_df.toPandas().to_json(index=False,orient= 'dividir' )

imprimir(json_skills_data)

Sortida:

{ 'id' :{ '0' : 123 , '1' : 112 , '2' : 153 }, 'persona' :{ '0' : 'mel' , '1' : 'Mouni' , '2' : 'Tulasi' }, 'premi' :{ '0' : 25000 , '1' : 2000 , '2' : 1200 }, 'habilitat' :{ '0' : 'pintura' , '1' : 'dansa' , '2' : 'lectura' }}

{ 'columnes' :[ 'id' , 'persona' , 'premi' , 'habilitat' ], 'dades' :[[ 123 , 'mel' , 25000 , 'pintura' ],[ 112 , 'Mouni' , 2000 , 'dansa' ],[ 153 , 'Tulasi' , 1200 , 'lectura' ]]

PySpark DataFrame a JSON mitjançant ToJSON()

El mètode toJSON() s'utilitza per convertir el PySpark DataFrame en un objecte JSON. Bàsicament, retorna una cadena JSON que està envoltada per una llista. El [‘{columna:valor,…}’,…. ] és el format que retorna aquesta funció. Aquí, cada fila del PySpark DataFrame es retorna com a diccionari amb el nom de la columna com a clau.

Sintaxi:

dataframe_object.toJSON()

Pot ser possible passar paràmetres com l'índex, les etiquetes de les columnes i el tipus de dades.

Exemple:

Creeu un PySpark DataFrame 'skills_df' amb 5 files i 4 columnes. Converteix aquest DataFrame a JSON mitjançant el mètode toJSON().

importar pyspark

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# dades d'habilitats amb 5 files i 4 columnes

habilitats =[{ 'identificador' : 123 , 'persona' : 'mel' , 'habilitat' : 'pintura' , 'premi' : 25000 },

{ 'identificador' : 112 , 'persona' : 'Mouni' , 'habilitat' : 'música/dansa' , 'premi' : 2000 },

{ 'identificador' : 153 , 'persona' : 'Tulasi' , 'habilitat' : 'lectura' , 'premi' : 1200 },

{ 'identificador' : 173 , 'persona' : 'Ran' , 'habilitat' : 'música' , 'premi' : 2000 },

{ 'identificador' : 43 , 'persona' : 'Kamala' , 'habilitat' : 'lectura' , 'premi' : 10000 }

]

# creeu el marc de dades d'habilitats a partir de les dades anteriors

skills_df = linuxhint_spark_app.createDataFrame(habilitats)

# Dades d'habilitats reals

skills_df.show()

# Converteix a matriu JSON

json_skills_data = skills_df.toJSON().collect()

imprimir(json_skills_data)

Sortida:

+---+------+-----+-----------+

| id|persona|premi| habilitat|

+---+------+-----+-----------+

| 123 | mel| 25000 | pintura|

| 112 | Mouni| 2000 |música/dansa|

| 153 |Tulasi| 1200 | lectura|

| 173 | Ran| 2000 | música|

| 43 |Kamala| 10000 | lectura|

+---+------+-----+-----------+

[ '{'id':123,'person':'Honey','premi':25000,'skill':'pintura'}' , '{'id':112,'person':'Mouni','premi':2000,'skill':'música/dansa'}' , '{'id':153,'person':'Tulasi','premi':1200,'skill':'lectura'}' , '{'id':173,'person':'Ran','premi':2000,'skill':'music'}' , '{'id':43,'person':'Kamala','premi':10000,'skill':'lectura'}' ]

Hi ha 5 files al PySpark DataFrame. Totes aquestes 5 files es tornen com un diccionari de cadenes separades per comes.

PySpark DataFrame a JSON mitjançant Write.json()

El mètode write.json() està disponible a PySpark que escriu/desa el PySpark DataFrame en un fitxer JSON. Pren el nom/ruta del fitxer com a paràmetre. Bàsicament, retorna el JSON en diversos fitxers (fitxers particionats). Per combinar-los tots en un sol fitxer, podem utilitzar el mètode coalesce().

Sintaxi:

dataframe_object.coalesce( 1 ).write.json(‘nom_fitxer’)
  1. Mode d'adjuntar - dataframe_object.write.mode('append').json('nom_fitxer')
  2. Mode de sobreescritura - dataframe_object.write.mode('overwrite').json('file_name')

Pot ser possible afegir/sobreescriure el JSON existent. Utilitzant write.mode(), podem afegir les dades passant 'append' o sobreescriure les dades JSON existents passant 'overwrite' a aquesta funció.

Exemple 1:

Creeu un PySpark DataFrame 'skills_df' amb 3 files i 4 columnes. Escriu aquest DataFrame a JSON.

importar pyspark

importar pandes

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

# dades d'habilitats amb 3 files i 4 columnes

habilitats =[{ 'identificador' : 123 , 'persona' : 'mel' , 'habilitat' : 'pintura' , 'premi' : 25000 },

{ 'identificador' : 112 , 'persona' : 'Mouni' , 'habilitat' : 'dansa' , 'premi' : 2000 },

{ 'identificador' : 153 , 'persona' : 'Tulasi' , 'habilitat' : 'lectura' , 'premi' : 1200 }

]

# creeu el marc de dades d'habilitats a partir de les dades anteriors

skills_df = linuxhint_spark_app.createDataFrame(habilitats)

# write.json()

skills_df.coalesce( 1 ).write.json( 'dades_de_habilitats' )

Fitxer JSON:

Podem veure que la carpeta skills_data inclou les dades JSON particionades.

Obrim el fitxer JSON. Podem veure que totes les files del PySpark DataFrame es converteixen a JSON.

Hi ha 5 files al PySpark DataFrame. Totes aquestes 5 files es tornen com un diccionari de cadenes separades per comes.

Exemple 2:

Creeu un PySpark DataFrame 'skills2_df' amb una fila. Afegeix una fila al fitxer JSON anterior especificant el mode com a 'afegir'.

importar pyspark

importar pandes

des de pyspark.sql importació SparkSession

linuxhint_spark_app = SparkSession.builder.appName( 'Suggeriment de Linux' ).getOrCreate()

habilitats2 =[{ 'identificador' : 78 , 'persona' : 'Maria' , 'habilitat' : 'cavalcar' , 'premi' : 8960 }

]

# creeu el marc de dades d'habilitats a partir de les dades anteriors

skills2_df = linuxhint_spark_app.createDataFrame(skills2)

# write.json() amb mode d'afegir.

habilitats2_df.write.mode( 'afegir' ).json( 'dades_de_habilitats' )

Fitxer JSON:

Podem veure els fitxers JSON particionats. El primer fitxer conté els primers registres de DataFrame i el segon fitxer conté el segon registre de DataFrame.

Conclusió

Hi ha tres maneres diferents de convertir el PySpark DataFrame a JSON. Primer, vam parlar del mètode to_json() que es converteix a JSON convertint el PySpark DataFrame al Pandas DataFrame amb diferents exemples tenint en compte diferents paràmetres. A continuació, hem utilitzat el mètode toJSON(). Finalment, vam aprendre a utilitzar la funció write.json() per escriure el PySpark DataFrame a JSON. Amb aquesta funció es poden afegir i sobreescriure.