In the following post I’m going to show a quick example of how to quickly analyse data within JSON messages sent to an Event Hub and automatically captured into blob storage. This will include how to automatically derive the schema of the JSON messages which can be used on any dataframe where one or more of the fields contains JSON data.
This example uses Python and I’ve used the Service Bus Explorer tool available on GitHub to send data to an Event Hub using the built in ThresholdDeviceEventDataGenerator
I’ve sent 10 messages to the Event Hub and have enabled capture to automatically write messages to blob storage.
Event Hub capture is something we use a lot: we have our real-time operational data services ingesting and enriching data and passing that onto output Event Hubs for downstream real-time systems to use. We capture all the events into an Azure Data Lake for any batch processes to make use of, including analytics into a data warehouse via Databricks.
For this demo I’m just using the default time and size window settings which means a file will get written to blob storage every 5 mins or when the file size reaches 300 MB. There are currently two options to chose the structure of the folders within the blob storage and I’m just going to use the default, meaning the Event Hub details make up the first few levels of folders and then the date and time of the capture make up the lower levels.
We can see the captured files using Azure Storage Explorer
We’re seeing a new file every 5 mins even though there is no new data on the Event Hub. This is because I didn’t check the Do not emit empty files when no events occur during the Capture time window on the screen above.
The captured files are always in AVRO format and contain some fields relating to the Event Hub and a Body field that contains the message.
We can now use Databricks to connect to the blob storage and read the AVRO files by running the following in a Databricks notebook…
spark.conf.set( "fs.azure.account.key.<storage_account_name>.blob.core.windows.net", "<storage_account_access_key>")
where <storage_account_name> is the name of the storage account and <storage_account_access_key> is one of the keys from the Access Keys section of the Storage Account in the Azure Portal.
To test we can connect, we can run the following which will list the files and folder in the specified container…
dbutils.fs.ls("wasbs://<container_name>@<storage_account_name>.blob.core.windows.net")
We’ve only got the folder simonp in our container which is the namespace of the Event Hub we’re using and is because we chose the default folder format on the Event Hub Capture page.
Now we can read in either a single AVRO file or multiple using the wildcard characters as follows…
avro_df = spark.read.format("avro").load("wasbs://<container_name>@<storage_account_name>.blob.core.windows.net/simonp/demo/{*}/{*}/{*}/{*}/{*}/{*}")
This will read all AVRO files captured from the demo Event Hub in the simonp namespace and we can run the display() function to see the contents of the AVRO file…
display(avro_df)
We’re only interested in the message details so we’ll ignore the Event Hub fields and just focus on the Body field which needs to be cast to a string to make it readable.
body_df = avro_df.withColumn("Body", avro_df.Body.cast("string")).select("Body") display(body_df)
Now we can see there are four fields within the JSON messages. In order to analyse individual fields within the JSON messages we can create a StructType object and specify each of the four fields and their data types as follows…
from pyspark.sql.types import * json_schema = StructType( [ StructField("deviceId",LongType(),True), StructField("eventId",LongType(),True), StructField("timestamp",StringType(),True), StructField("value",LongType(),True) ] )
We can view the structure by running the following…
json_schema
This is fine when there are only a few fields but if there are several then it can take a long time and is likely to result in syntax errors somewhere along the way.
Another option is the use the map function as follows…
json_schema_auto = spark.read.json(body_df.rdd.map(lambda row: row.Body)).schema
We can now check the structure of this new object and see this single line of code gives us the same StructType object…
We can now use either schema object, along with the from_json function, to read the messages into a data frame containing JSON rather than string objects…
from pyspark.sql.functions import from_json, col json_df = body_df.withColumn("Body", from_json(col("Body"), json_schema_auto)) display(json_df)
Now we can select out whatever fields we need from the JSON as follows (I don’t know a quicker way to do this!)…
df = json_df.select( col("Body.deviceId"), col("Body.eventId"), col("Body.timestamp"), col("Body.value") ) display(df)
Now we’ve got every field in the message as a field in a data frame and we can easily start running analysis against this data.
Obviously, this is a very simple example but you can use functions like explode to access more complex fields that contain arrays or JSON objects.