Del sensor a la nube – Procesamiento de datos en tiempo real – Parte 1/2

A continuación te dejo un post creado por el crack de Guido Franco, Ingeniero y Arquitecto de Datos.

El procesamiento de datos en tiempo real es un tópico interesante en el mundo de la ingeniería de datos.

Consiste en procesar los datos a medida que ocurren y generar inmediatamente resultados accionables para una rápida y efectiva toma de decisiones.

La nube contribuye significativamente al procesamiento de datos en tiempo real ya que ofrece una infraestructura escalable y flexible, junto con una amplia gama de herramientas y servicios para la ingesta, el procesamiento, el almacenamiento y el análisis de estos datos. También permite a las empresas reducir los costos de infraestructura y aumentar la eficiencia al proporcionar una mayor escalabilidad y elasticidad en sus operaciones de procesamiento de datos.

En este post, vamos a ver un proyecto end-to-end orientado al procesamiento de datos en tiempo real utlizando Python y componentes de Azure.


Flujo a alto nivel

En primer lugar, vamos a ver en que consiste el proyecto, a un alto nivel.

Primero, necesitamos una fuente de datos. Para ello, contamos con una estación meteorológica que provee los datos en tiempo real, con una actualización por minuto, por medio de un socket TCP. Un script en Python se encargará de colectar, pre-procesar y enviar los datos hacia la nube.

Para la recepción de los eventos en la nube, usaremos Azure Event Hubs, una plataforma para el streaming de datos y la ingesta de eventos.

Una vez definido el flujo de datos a la nube, necesitaremos una herramienta para procesarlos, para ellos trabajaremos con Azure Stream Analytics, un motor de procesamiento de datos en streaming.

Por último, para un mayor impacto, es necesario plasmar la información en una visualización para el monitoreo de los resultados y un análisis detallado. Por eso, utilizaremos Power BI para la construcción de un tablero, dado su compatibilidad con el ecosistema de Azure y su popularidad en el mercado.

Todo el flujo está basado en arquitecturas de referencias brindadas por Microsoft en su documentación.

No hay texto alternativo para esta imagen

Fuente de datos

Las fuentes de datos en streaming se refieren a aquellas que se transmiten de forma continua, que se generan en tiempo real a través de diferentes fuentes, como sensores, dispositivos IoT, redes sociales, plataformas en línea, etc. Un ejemplo puede ser las mediciones de temperatura, presión, vibración, consumo de energía de una maquinaria industrial, brindada por sus sensores integrados, y se transmiten de manera frecuente por minuto o segundo. Estos datos se pueden utilizar para monitorear el rendimiento de la maquinaria, detectar problemas en tiempo real y tomar medidas preventivas o correctivas.

Para este proyecto, trabajaremos con los datos que brinda una estación meteorológica de la Universidad de California San Diego, cuyo acceso es libre, por medio de un socket TCP. Esta estación brinda mediciones sobre:

  • Velocidad y dirección del viento.
  • Presión atmosférica.
  • Temperatura del aire
  • Humedad relativa
  • Intensidad, duración e intensidad de granizo y lluvia
  • entre otras.

Las mediciones se actualizan entre los 10 y 60 segundos aproxidamente, de acuerdo a la variable.

Es posible acceder al histórico de estos eventos aquí.

El formato de los datos de entrada es el siguiente:

XXX.XXX.XXX.XXX  HPWREN:LP-WXT536:0R2:4:0    1679970455   0R2,Ta=9.9C,Ua=19.8P,Pa=887.4H

Cada mensaje es una línea de texto separada por tabs, que contiene: la IP de origen de la estación meteorológica, id. del sensor, estampa de tiempo del evento en milisegundos y un conjunto de mediciones (separadas entre comas).

Cada línea contiene un conjunto de mediciones, el objetivo es enviar un mensaje por cada medición. A partir del ejemplo anterior, los mensajes a enviar a Event Hubs serían los siguientes, en formato JSON:

{"origin_ip": "XXX.XXX.XXX.XXX", "sensor_id": "HPWREN:LP-WXT536:0R2:4:0", "tstamp": "2023-03-28T02:27:35.000000Z", "variable": "Ta", "value": "9.9", "unit": "C"
{"origin_ip": "XXX.XXX.XXX.XXX", "sensor_id": "HPWREN:LP-WXT536:0R2:4:0", "tstamp": "2023-03-28T02:27:35.000000Z", "variable": "Ua", "value": "19.8", "unit": "P"}
{"origin_ip": "XXX.XXX.XXX.XXX", "sensor_id": "HPWREN:LP-WXT536:0R2:4:0", "tstamp": "2023-03-28T02:27:35.000000Z", "variable": "Pa", "value": "887.4", "unit": "H"}}

Ingesta de datos

Azure Event Hubs, la plataforma de streaming de datos de Azure, será el componente utilizado para la recepción de los datos. Podemos considerarla como la versión cloud en Azure de Apache Kafka, ya que maneja conceptos como tópicos, particiones, patrón publicador-suscriptor, entre otros. Event Hubs funciona como una cola de mensajería que encapsula la lógica para garantizar que los eventos se procesen en orden y solo una vez.

Un programa en Python se encargará de consumir los mensajes del socket TCP y enviarlos hacia un tópico de Event Hubs, es decir cumplirá el rol de “publisher”.

Primero, es necesario crear y configurar Event Hubs. Para ello, usaremos la CLI de Azure.

# Crear un grupo de recurso
az group create --name <resource group name> --location eastus


# Crear una instancia, o cluster, de Event Hubs
az eventhubs namespace create --name <event hubs name> --resource-group <resource group name> -l eastus


# Crear un tópico para recibir los mensajes
az eventhubs eventhub create --resource-group <resource group name> --namespace-name <event hubs name> --name <topic> --retention-time-in-hours <hours> --partition-count 3


# Crear una clave para la conexión al tópico
az eventhubs eventhub authorization-rule create --resource-group <resource group name> --namespace-name <event hubs name> --eventhub-name <topic> --name <key name> --rights Send Listen


# Obtener el connection string
az eventhubs eventhub authorization-rule keys list --resource-group <resource group name> --namespace-name <event hubs name> --eventhub-name <topic> --name <key name>s

A continuación, veremos el código en Python encargado de recolectar los eventos, pre-procesarlos para darles un formato JSON y luego enviarlos a un tópico de Event Hubs.

pip install azure-eventhub

import socket
import json
import logging
from datetime import datetime
from azure.eventhub import EventHubProducerClient, EventData

logger = logging.getLogger("azure.eventhub")
logging.basicConfig(level=logging.INFO)

# Datos de conexión a Event Hubs
connection_string = "<Event Hubs connection string>"
event_hub_name = "<Event Hub topic>"

# Instanciar un producer
client = EventHubProducerClient.from_connection_string(connection_string, event_hub_name=event_hub_name)

# Establecer conexion con el socket TCP
# de la estación de la Universidad California San Diego
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect(('rtd.hpwren.ucsd.edu', 12020))

# Bucle para leer los mensajes del socket
while True:
    # Dato crudo
    d = s.recv(1024)
    d = d.decode()
    logging.info("Received message: " + d)
    
    # Cada evento es una linea de texto con 4 campos separados por tabuladores
    origin_ip, sensor_id, tstamp, values = d.split('\t', 3)
    
    # Las mediciones de cada sensor vienen separadas por comas
    # Se omite el primer valor, que está vinculado al id del sensor
    values = values.strip().split(",")
    values = values[1:]
    
    # Omitimos un id de sensor en particular
    if sensor_id.endswith('0R1:4:0'):
        pass
    else:
        event_data_batch = client.create_batch()
        # Cada medicion, contiene la variable,
        # el valor y la unidad al final del string
        
        for val in values:
            var, m = val.split("=")
            msg = {
                "origin_ip": origin_ip,
                "sensor_id": sensor_id,
                "tstamp": datetime.utcfromtimestamp(int(tstamp)).strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
                "variable": var,
                "value": m[:-1],
                "unit": m[-1]
            }
            event_data_batch.add(EventData(json.dumps(msg)))


        client.send_batch(event_data_batch)
        logging.info("Sent message: " + json.dumps(msg))t

Ejecutando este script, tendremos un pipeline envia eventos en tiempo real hacia la nube.

Una vez allí, podemos aprovechar del potencial de la nube para el procesamiento de datos en streaming.

En un próximo artículo, nos enfocaremos en el procesamiento y la visualización de datos en tiempo real usando Stream Analytics y Power BI, respectivamente.

 

Por Guido Franco

https://www.linkedin.com/in/gdofranco/

Ir arriba