Com implementar la transmissió de dades en temps real a Python

Com Implementar La Transmissio De Dades En Temps Real A Python



Dominar la implementació de la transmissió de dades en temps real a Python actua com una habilitat essencial en el món actual de dades. Aquesta guia explora els passos bàsics i les eines essencials per utilitzar la transmissió de dades en temps real amb autenticitat a Python. Des de seleccionar un marc adequat com Apache Kafka o Apache Pulsar fins a escriure un codi Python per a un consum de dades sense esforç, processament i visualització eficaç, adquirirem les habilitats necessàries per construir els canals de dades àgils i eficients en temps real.

Exemple 1: Implementació de la transmissió de dades en temps real a Python

La implementació d'una transmissió de dades en temps real a Python és crucial en l'era i el món actuals basats en dades. En aquest exemple detallat, passarem pel procés de creació d'un sistema de transmissió de dades en temps real mitjançant Apache Kafka i Python a Google Colab.







Per inicialitzar l'exemple abans de començar a codificar, és essencial crear un entorn específic a Google Colab. El primer que hem de fer és instal·lar les biblioteques necessàries. Utilitzem la biblioteca 'kafka-python' per a la integració de Kafka.



! pip instal·lar kafka-python


Aquesta ordre instal·la la biblioteca 'kafka-python' que proporciona les funcions de Python i els enllaços per a Apache Kafka. A continuació, importem les biblioteques necessàries per al nostre projecte. Importar les biblioteques necessàries, incloses 'KafkaProducer' i 'KafkaConsumer', són les classes de la biblioteca 'kafka-python' que ens permeten interactuar amb els intermediaris de Kafka. JSON és la biblioteca de Python per treballar amb les dades JSON que fem servir per serialitzar i deserialitzar els missatges.



de Kafka Import KafkaProducer, KafkaConsumer
importar json


Creació d'un productor Kafka





Això és important perquè un productor de Kafka envia les dades a un tema de Kafka. Al nostre exemple, creem un productor per enviar dades simulades en temps real a un tema anomenat 'tema en temps real'.

Creem una instància 'KafkaProducer' que especifica l'adreça del corredor de Kafka com a 'localhost:9092'. A continuació, fem servir el 'value_serializer', una funció que serialitza les dades abans d'enviar-les a Kafka. En el nostre cas, una funció lambda codifica les dades com a JSON codificat en UTF-8. Ara, simulem algunes dades en temps real i les enviem al tema Kafka.



productor = KafkaProducer ( servidors_bootstrap = 'localhost:9092' ,
valor_serialitzador =lambda v: json.dumps ( en ) .codificar ( 'utf-8' ) )
# Dades simulades en temps real
dades = { 'id_sensor' : 1 , 'temperatura' : 25.5 , 'humitat' : 60.2 }
# Enviament de dades al tema
productor.enviament ( 'tema en temps real' , dades )


En aquestes línies, definim un diccionari de 'dades' que representa les dades d'un sensor simulat. A continuació, utilitzem el mètode 'enviament' per publicar aquestes dades al 'tema en temps real'.

Aleshores, volem crear un consumidor de Kafka i un consumidor de Kafka llegeix les dades d'un tema de Kafka. Creem un consumidor per consumir i processar els missatges en el 'tema en temps real'. Creem una instància 'KafkaConsumer', especificant el tema que volem consumir, per exemple, (tema en temps real) i l'adreça del corredor de Kafka. Aleshores, el 'value_deserializer' és una funció que deserialitza les dades que es reben de Kafka. En el nostre cas, una funció lambda descodifica les dades com a JSON codificat en UTF-8.

consumidor = KafkaConsumer ( 'tema en temps real' ,
servidors_bootstrap = 'localhost:9092' ,
valor_deserializer =lambda x: json.loads ( x.decodificar ( 'utf-8' ) ) )


Utilitzem un bucle iteratiu per consumir i processar contínuament els missatges del tema.

# Llegir i processar dades en temps real
per missatge en consumidor:
dades = missatge.valor
imprimir ( f 'Dades rebudes: {dades}' )


Recuperem el valor de cada missatge i les nostres dades simulades del sensor dins del bucle i els imprimim a la consola. L'execució del productor i el consumidor de Kafka implica executar aquest codi a Google Colab i executar les cel·les de codi individualment. El productor envia les dades simulades al tema Kafka i el consumidor llegeix i imprimeix les dades rebudes.


Anàlisi de la sortida a mesura que s'executa el codi

Observarem una dada en temps real que s'està produint i consumint. El format de les dades pot variar segons la nostra simulació o la font de dades real. En aquest exemple detallat, cobrim tot el procés de configuració d'un sistema de transmissió de dades en temps real mitjançant Apache Kafka i Python a Google Colab. Explicarem cada línia de codi i la seva importància en la construcció d'aquest sistema. La transmissió de dades en temps real és una capacitat potent, i aquest exemple serveix de base per a aplicacions més complexes del món real.

Exemple 2: implementació d'una transmissió de dades en temps real a Python utilitzant dades del mercat de valors

Fem un altre exemple únic d'implementació d'una transmissió de dades en temps real a Python utilitzant un escenari diferent; aquesta vegada, ens centrarem en les dades de borsa. Creem un sistema de transmissió de dades en temps real que captura els canvis de preu de les accions i els processa mitjançant Apache Kafka i Python a Google Colab. Com es va demostrar a l'exemple anterior, comencem configurant el nostre entorn a Google Colab. Primer, instal·lem les biblioteques necessàries:

! pip instal·lar kafka-python yfinance


Aquí, afegim la biblioteca 'yfinance' que ens permet obtenir dades de borsa en temps real. A continuació, importem les biblioteques necessàries. Continuem utilitzant les classes 'KafkaProducer' i 'KafkaConsumer' de la biblioteca 'kafka-python' per a la interacció amb Kafka. Importem JSON per treballar amb les dades JSON. També fem servir 'yfinance' per obtenir dades del mercat de valors en temps real. També importem la biblioteca 'temps' per afegir un retard per simular les actualitzacions en temps real.

de Kafka Import KafkaProducer, KafkaConsumer
importar json
importació i finances com yf
importar temps


Ara, creem un productor de Kafka per a les dades d'estoc. El nostre productor de Kafka obté dades d'estoc en temps real i les envia a un tema de Kafka anomenat 'preu d'existències'.

productor = KafkaProducer ( servidors_bootstrap = 'localhost:9092' ,
valor_serialitzador =lambda v: json.dumps ( en ) .codificar ( 'utf-8' ) )

mentre Veritat:
estoc = yf.Ticker ( 'AAPL' ) # Exemple: accions d'Apple Inc
dades_estoc = estoc.historial ( període = '1d' )
últim_preu = dades_estoc [ 'Tanca' ] .iloc [ - 1 ]
dades = { 'símbol' : 'AAPL' , 'preu' : darrer preu }
productor.enviament ( 'preu de les accions' , dades )
temps.dormir ( 10 ) # Simula actualitzacions en temps real cada 10 segons


Creem una instància 'KafkaProducer' amb l'adreça del corredor de Kafka en aquest codi. Dins del bucle, utilitzem 'yfinance' per obtenir l'últim preu de les accions d'Apple Inc. ('AAPL'). Aleshores, extreim l'últim preu de tancament i l'enviem al tema 'stock-price'. Finalment, introduïm un retard per simular les actualitzacions en temps real cada 10 segons.

Creem un consumidor de Kafka per llegir i processar les dades del preu de les accions del tema 'preu d'accions'.

consumidor = KafkaConsumer ( 'preu de les accions' ,
servidors_bootstrap = 'localhost:9092' ,
valor_deserializer =lambda x: json.loads ( x.decodificar ( 'utf-8' ) ) )

per missatge en consumidor:
dades_estoc = missatge.valor
imprimir ( f 'Dades d'estoc rebudes: {stock_data['symbol']} - Preu: {stock_data['price']}' )


Aquest codi és similar a la configuració del consumidor de l'exemple anterior. Llegeix i processa contínuament els missatges del tema 'preu d'accions' i imprimeix el símbol de les accions i el preu a la consola. Executem les cel·les de codi seqüencialment, per exemple, una per una a Google Colab per executar el productor i el consumidor. El productor rep i envia les actualitzacions del preu de les accions en temps real mentre el consumidor llegeix i mostra aquestes dades.

! pip instal·lar kafka-python yfinance
de Kafka Import KafkaProducer, KafkaConsumer
importar json
importació i finances com yf
importar temps
productor = KafkaProducer ( servidors_bootstrap = 'localhost:9092' ,
valor_serialitzador =lambda v: json.dumps ( en ) .codificar ( 'utf-8' ) )

mentre Veritat:
estoc = yf.Ticker ( 'AAPL' ) # accions d'Apple Inc
dades_estoc = estoc.historial ( període = '1d' )
últim_preu = dades_estoc [ 'Tanca' ] .iloc [ - 1 ]

dades = { 'símbol' : 'AAPL' , 'preu' : darrer preu }

productor.enviament ( 'preu de les accions' , dades )

temps.dormir ( 10 ) # Simula actualitzacions en temps real cada 10 segons
consumidor = KafkaConsumer ( 'preu de les accions' ,
servidors_bootstrap = 'localhost:9092' ,
valor_deserializer =lambda x: json.loads ( x.decodificar ( 'utf-8' ) ) )

per missatge en consumidor:
dades_estoc = missatge.valor
imprimir ( f 'Dades d'estoc rebudes: {stock_data['symbol']} - Preu: {stock_data['price']}' )


En l'anàlisi de la sortida després de l'execució del codi, observarem les actualitzacions en temps real dels preus de les accions d'Apple Inc. que es produeixen i es consumeixen.

Conclusió

En aquest exemple únic, vam demostrar la implementació de la transmissió de dades en temps real a Python mitjançant Apache Kafka i la biblioteca 'yfinance' per capturar i processar les dades del mercat de valors. Hem explicat a fons cada línia del codi. La transmissió de dades en temps real es pot aplicar a diversos camps per crear aplicacions del món real en finances, IoT i molt més.