Del sensor a la nube – Procesamiento de datos en tiempo real – Parte 2/2

Hola! Hoy te dejo la parte 2/2 de un posteo que armó el crack de Guido Franco sobre cómo comunicar datos de sensores en tiempo real, subirlos a la nube y explotarlos.

Introducción

Te doy la bienvenida a una nueva edición de una serie de posts sobre el procesamiento de datos en tiempo real. El primer artículo estuvo enfocado en la fuente de datos y en su ingesta a la nube. Particularmente, se mostró como enviar datos, emitidos por una estación meteorológica, desde un Socket TCP hacia un broker de mensajería como Azure Event Hubs

Con los eventos arribando a la nube en tiempo real, estamos en condiciones de procesarlos en vivo y plasmar los resultados en algún tablero.

Anteriormente, vimos aspectos sobre la fuente de datos y su ingesta, el siguiente paso es el procesamiento, para aplicar toda la lógica necesaria con el fin de generar resultados accionables. Para ello, existen diferentes motores como Spark Streaming, Apache Flink, Kafka Streams, entre otros. En este caso, en el contexto de Azure, utilizaremos Azure Stream Analytics, un motor que permite manipular los eventos en tiempo real por medio de lenguaje SQL.

Stream Analytics es capaz de:

  • recibir “stream” de eventos desde Azure Event Hubs, IoT Hub y Azure Data Lake Storage. A su vez, es capaz de recibir datos estáticos o “slow-changing”, también llamadados de referencia, para cruzarlos con los eventos en tiempo real y enriquecerlos.
  • procesar los datos por medio de una query SQL.
  • escribir los resultados del procesamiento en algún destino como Azure Data Lake Gen 2, Azure SQL Database, Azure Synapse Analytics, Azure Functions, Azure Event Hubs, Microsoft Power BI, etc.

Las consultas en Stream Analytics se ejecutan de forma perpetua procesando los nuevos datos a medida que lleguen a la entrada y re-dirigiendo los resultados en un destino.

Puedes leer mas sobre Stream Analytics en los siguientes recursos de la documentación de Microsoft:


Entorno de trabajo

Si bien, es posible trabajar con Stream Analytics desde la web en el portal de Azure. En este post, veremos el uso desde Visual Studio Code por medio de una extensión.

Suponiendo que tenemos instalado VS Code y dicha extensión, vamos a crear un nuevo proyecto de Stream Analytics, para ello presionamos la combinación «Ctrl + Shift + P», escribimos y selccionamos «ASA: Create New Project»

No hay texto alternativo para esta imagen

Le asignaremos al proyecto que nombre deesemos y seleccionaremos la ruta donde crear el proyecto.

Automáticamente se creará un nuevo directorio con la siguiente estructura:

No hay texto alternativo para esta imagen
Estructura de un proyecto de Stream Analytics en VS Code

Configuración

Ahora es necesario configurar un nuevo Input, en este caso es el tópico de Event Hubs que se vió en el artículo anterior. Entonces, presionamos la combinación «Ctrl + Shift + P», seleccionamos y escribimos «ASA: Add input. Luego», del listado de Inputs disponibles seleccionamos el de Event Hubs.

No hay texto alternativo para esta imagen
Inputs de Stream Analytics

Se creará un archivo JSON para indicar todas las configuraciones y datos de conexión necesarios para Event Hubs. Es importante aclarar que el archivo ofusca cualquier valor confidencial.

No hay texto alternativo para esta imagen

Finalizaremos la configuración, especificando el destino o salida de este Job, que será Power BI. Entonces, presionamos la combinación «Ctrl + Shift + P», escribimos y seleccionamos «ASA: Add Output». Del listado de Outputs disponibles, seleccionamos Power BI

Se creará un nuevo archivo de configuración en formato JSON para la conexión a Power BI.

No hay texto alternativo para esta imagen

Procesamiento

Una vez lista la configuración de Inputs y Outputs, podemos continuar con la lógica de procesamiento. A continuación veremos un ejemplo de lo que podemos hacer a nivel de procesamiento con el motor SQL de Stream Analytics.

Recordando el artículo anterior, Event Hubs recibe mensajes con el siguiente formato:

{"origin_ip": "XXX.XXX.XXX.XXX", "sensor_id": "HPWREN:LP-WXT536:0R2:4:0", "tstamp": "2023-03-28T02:27:35.000000Z", "variable": "Ta", "value": "9.9", "unit": "C"
{"origin_ip": "XXX.XXX.XXX.XXX", "sensor_id": "HPWREN:LP-WXT536:0R2:4:0", "tstamp": "2023-03-28T02:27:35.000000Z", "variable": "Ua", "value": "19.8", "unit": "P"}
{"origin_ip": "XXX.XXX.XXX.XXX", "sensor_id": "HPWREN:LP-WXT536:0R2:4:0", "tstamp": "2023-03-28T02:27:35.000000Z", "variable": "Pa", "value": "887.4", "unit": "H"}}

Cada objeto JSON representa la medición una variable específica. En este ejemplo tenemos mediciones de Ta: Temperatura del aire, Ua: Humedad relativa y Pa: Presión del aire.

Una tarea de procesamiento, por ejemplo, consistirá crear un solo registro que contenga todas las mediciones que coincidan en el tiempo, para realizar cálculos u operaciones aritméticas entre ellas. El registro final debería tener el siguiente formato:

{"origin_ip":"XXX.XXX.XXX.XXX","tstamp":"2023-04-25T02:34:23.0000000Z","air_pressure":881.3,"air_pressure_unit":"H","air_temp":14.4,"air_temp_unit":"C","heating_temp":14.2,"heating_temp_unit":"C","rel_hum":59.9,"rel_hum_unit":"P"}

Primero, es necesario crear una vista temporal por cada variable requerida. Cada vista se encarga de filtrar la variable deseada.

WITH air_pressure AS (
	SELECT
		origin_ip, CAST(tstamp AS DateTime) AS tstamp,
		CAST(value AS Float) AS air_pressure, unit
	FROM
		[weather-data] PARTITION BY sensor_id
	WHERE
		variable = 'Pa'
),
air_temp AS (
	SELECT
		origin_ip, CAST(tstamp AS DateTime) AS tstamp,
		CAST(value AS Float) AS air_temp, unit
	FROM
		[weather-data] PARTITION BY sensor_id
	WHERE
		variable = 'Ta'
),
heating_temp AS (
	SELECT
		origin_ip, CAST(tstamp AS DateTime) AS tstamp,
		CAST(value AS Float) AS heating_temp, unit
	FROM
		[weather-data] PARTITION BY sensor_id
	WHERE
		variable = 'Th'
),
rel_hum AS (
	SELECT
		origin_ip, CAST(tstamp AS DateTime) AS tstamp,
		CAST(value AS Float) AS rel_hum, unit
	FROM
		[weather-data] PARTITION BY sensor_id
	WHERE
		variable = 'Ua'
)

Con las vistas temporales definidas, podemos realizar JOINs entre las mismas a partir de algún campo en común y de una ventana de tiempo. Con la siguiente query podemos unir todos los registros que:

  • coincidan con el campo origin_ip

y hayan sucedido en el mismo minuto, sin importar los segundos, por eso se

SELECT
    air_pressure.origin_ip,
    CAST(SUBSTRING(air_pressure.tstamp, 1, 19) AS DATETIME) AS tstamp,
    air_pressure.air_pressure,
    air_pressure.unit AS air_pressure_unit,
    air_temp.air_temp,
    air_temp.unit AS air_temp_unit,
    heating_temp.heating_temp,
    heating_temp.unit AS heating_temp_unit,
    rel_hum.rel_hum,
    rel_hum.unit AS rel_hum_unit

INTO [pbi-output]

FROM air_pressure

JOIN air_temp ON air_pressure.origin_ip = air_temp.origin_ip
AND DATEDIFF(second, air_pressure, air_temp) BETWEEN 0 AND 60

JOIN heating_temp ON air_pressure.origin_ip = heating_temp.origin_ip
AND DATEDIFF(second, air_pressure, heating_temp) BETWEEN 0 AND 60

JOIN rel_hum ON air_pressure.origin_ip = rel_hum.origin_ip
AND DATEDIFF(second, air_pressure, rel_hum) BETWEEN 0 AND 60

Podemos ejecutar la query usando el poder de procesamiento de nuestro local haciendo lo siguiente:

  • Presionamos la combinación «Ctrl + Shift + P»,
  • escribimos y seleccionamos la opción «ASA: Start local run»,
  • y luego «Use live input and local output»

De esta forma, el Job se ejecutará en nuestro equipo consumiendo los datos de Event Hubs en tiempo real. Así podemos ejecutar y probar la funcionalidad del Job sin consumir los recursos en la nube de Stream Analytics.

La extensión de VS Code nos permite visualizar el Job como un grafo para monitorear cada parte del proceso.

No hay texto alternativo para esta imagen

Por último, si bien el Ouput del Job será en formato JSON, podemos verlo como una tabla.

No hay texto alternativo para esta imagen

Por último, una vez que tengamos una versión estable del Job, podemos desplegarlo en Azure desde VS Code

Visualización

La parte final de este trabajo es mostrar los resultados del valioso procesamiento en un tablero para compartir la información generada.

Stream Analytics deposita los resultados del procesamiento en un dataset, en un workspace determinado de Power BI.

En ese mismo workspace, se debe crear un Dashboard. En el Dashboard, hay que crear “Tiles”, de un tipo específico. que nos permitirán realizar gráficas que se actualizan en tiempo real, a medida que van ingresando nuevos eventos

No hay texto alternativo para esta imagen

Los tipos de gráficos que se pueden realizar para este escenarios son los que se ven en la siguiente imagen.

No hay texto alternativo para esta imagen

Luego de unos ajustes, podemos ver un simple tablero con tarjetas y un gráfico de lineas actualizándose en tiempo real a medida que el Job de Stream Analytics va generando nuevos datos.

No hay texto alternativo para esta imagen

Final

Este ha sido el último post sobre un pipeline end-to-end orientado al procesamiento de datos en tiempo real, donde vimos como recolectar y capturar eventos que reportan la medición de sensores, como manipularlos por medio de un motor de procesamiento y finalmente como disponibilizarlos en un tablero.

Gracias por el tiempo en leer este post, espero haya sido de interés.

Por Guido Franco Expert Data Engineer // Data Architecture

https://www.linkedin.com/in/gdofranco/

Ir arriba