Reading Event Hub Capture AVRO JSON messages using Azure Databricks

In the following post I’m going to show a quick example of how to quickly analyse data within JSON messages sent to an Event Hub and automatically captured into blob storage.  This will include how to automatically derive the schema of the JSON messages which can be used on any dataframe where one or more of the fields contains JSON data.

This example uses Python and I’ve used the Service Bus Explorer tool available on GitHub to send data to an Event Hub using the built in ThresholdDeviceEventDataGenerator 

ThresholdDeviceEventDataGenerator

I’ve sent 10 messages to the Event Hub and have enabled capture to automatically write messages to blob storage.

Event Hub capture is something we use a lot: we have our real-time operational data services ingesting and enriching data and passing that onto output Event Hubs for downstream real-time systems to use.  We capture all the events into an Azure Data Lake for any batch processes to make use of, including analytics into a data warehouse via Databricks.

EventHubCapture

For this demo I’m just using the default time and size window settings which means a file will get written to blob storage every 5 mins or when the file size reaches 300 MB.  There are currently two options to chose the structure of the folders within the blob storage and I’m just going to use the default, meaning the Event Hub details make up the first few levels of folders and then the date and time of the capture make up the lower levels.

We can see the captured files using Azure Storage Explorer

BlobStorage

We’re seeing a new file every 5 mins even though there is no new data on the Event Hub.  This is because I didn’t check the Do not emit empty files when no events occur during the Capture time window on the screen above.

BlobStorageAvro

The captured files are always in AVRO format and contain some fields relating to the Event Hub and a Body field that contains the message.

We can now use Databricks to connect to the blob storage and read the AVRO files by running the following in a Databricks notebook…

spark.conf.set(
"fs.azure.account.key.<storage_account_name>.blob.core.windows.net",
"<storage_account_access_key>")

where <storage_account_name> is the name of the storage account and <storage_account_access_key> is one of the keys from the Access Keys section of the Storage Account in the Azure Portal.

To test we can connect, we can run the following which will list the files and folder in the specified container…

dbutils.fs.ls("wasbs://<container_name>@<storage_account_name>.blob.core.windows.net")

dbutils ls

We’ve only got the folder simonp in our container which is the namespace of the Event Hub we’re using and is because we chose the default folder format on the Event Hub Capture page.

Now we can read in either a single AVRO file or multiple using the wildcard characters as follows…

avro_df = spark.read.format("avro").load("wasbs://<container_name>@<storage_account_name>.blob.core.windows.net/simonp/demo/{*}/{*}/{*}/{*}/{*}/{*}")

This will read all AVRO files captured from the demo Event Hub in the simonp namespace and we can run the display() function to see the contents of the AVRO file…

display(avro_df)

DisplayAvro

We’re only interested in the message details so we’ll ignore the Event Hub fields and just focus on the Body field which needs to be cast to a string to make it readable.

body_df = avro_df.withColumn("Body", avro_df.Body.cast("string")).select("Body")

display(body_df)

BodyString

Now we can see there are four fields within the JSON messages.  In order to analyse individual fields within the JSON messages we can create a StructType object and specify each of the four fields and their data types as follows…

from pyspark.sql.types import *

json_schema = StructType(
  [
    StructField("deviceId",LongType(),True),
    StructField("eventId",LongType(),True),
    StructField("timestamp",StringType(),True),
    StructField("value",LongType(),True)
  ]
)

We can view the structure by running the following…

json_schema

json_schema

This is fine when there are only a few fields but if there are several then it can take a long time and is likely to result in syntax errors somewhere along the way.

Another option is the use the map function as follows…

json_schema_auto = spark.read.json(body_df.rdd.map(lambda row: row.Body)).schema

We can now check the structure of this new object and see this single line of code gives us the same StructType object…

json_schema_auto

We can now use either schema object, along with the from_json function, to read the messages into a data frame containing JSON rather than string objects…

from pyspark.sql.functions import from_json, col
json_df = body_df.withColumn("Body", from_json(col("Body"), json_schema_auto))
display(json_df)

json

Now we can select out whatever fields we need from the JSON as follows (I don’t know a quicker way to do this!)…

df = json_df.select(
  col("Body.deviceId"),
  col("Body.eventId"),
  col("Body.timestamp"),
  col("Body.value")
)

display(df)

df

Now we’ve got every field in the message as a field in a data frame and we can easily start running analysis against this data.

Obviously, this is a very simple example but you can use functions like explode to access more complex fields that contain arrays or JSON objects.

Read from Azure Data Lake using Azure Databricks

I’ve been using Azure Data Lake for a little while now and have been looking at some of the tools used to read, write and analyse the data including Data Lake Analytics using U-SQL and more recently Azure Databricks.

As an ad-hoc analysis tool I think the Databricks notebooks are great and have been able to create a few basic reports using some of our streaming data via an Event Hub. There is loads you can do with Databricks including ETL and we can now execute Python scripts against Databricks clusters using Data Factory.

Some aspects of using Azure Databricks are very easy to get started with, especially using the notebooks, but there were a few things that took a lot longer to get up and running than I first expected.

Some of the documentation is good but there were a few things I had to piece together from various sources and tinker about myself to get it working.

Connecting to the Data Lake is simple but you don’t want connection details hard-coded in your notebooks and storing the sensitive information in the Azure Key Vault is a little more involved.

The following step-by-step guide should allow any complete beginner to get this up and running fairly quickly – although there are quite a few steps and a few places where things can go wrong.

Please note, the Azure Portal Environment changes frequently and these instructions are only accurate at the time of writing.  If things move or are renamed hopefully there is enough info below to work out what is required.  

In order to connect to the Azure Data Lake we can create a credential in Azure Active Directory (AAD) with access to the relevant files and folders. We need a ClientID and a key for this credential and also need a reference to our AAD. We can store these values in Azure Key Vault and use Databricks secrets to access them.

Firstly, let’s looks at the data we want to access in the Azure Data Lake.

Login in to portal.azure.com and navigate to the Data Lake Storage and then Data Explorer.

05

In this example I’ve created a new Data Lake Store named simon and will now upload some speed camera data I’ve mocked up.  This is the data we want to access using Databricks.

06

If we click on Folder Properties on the root folder in the Data Lake we can see the URL we need to connect to the Data Lake from Databricks.  This is the value in the PATH field, in this case, adl://simon.azuredatalakestore.net

07

Now we’ve got the files in place let’s set up everything we need to securely connect to it with Databricks.

To do this we need to create an App registration in AAD via the Azure Portal.

Navigate to the Azure Active Directory resource and click on App registration in the menu on the left.

01

Click on New application registration and enter a Name and Sign-on URL (we don’t use the the Sign-on URL so this can be anything you like).

02

Now we need to create a key for this App registration which Databricks can use in it’s connection to the Data Lake.

Once the App registration is created click on Settings.

03.1

Click on the Keys option, enter a new key description and set the expiry date of the key.

03.2

Then click on Save and the key value will be displayed.  You need to make a note of this value somewhere as you cannot view this value again.

03.3

You’ll also need to make a note of the Application ID of the App Registration as this is also used in the connection (although this one can be obtained again later on if need be).

03

As I mentioned above we don’t want to hard code these values into our Databricks notebooks or script files so a better option is to store this in the Azure Key Vault.

Navigate to the Azure Key Vault resource and either use an existing appropriate Key Vault or create a new one.

04

Click on Properties in the menu on the left and make a note of the DNS NAME and RESOURCE ID values of the Key Vault.  These are needed when setting up the Databricks Secret Scope later on.

11

Click on Secrets in the menu on the left and create a new secret for each of the bits of sensitive data needed in the Databricks connection which are as follows…

ClientID comes from Application ID of the new App registration
Credential comes from the key on the new App registration
RefreshURL comes from the DirectoryID of the Properties of Azure Active Directory and should be in the format https://login.microsoftonline.com/<DirectoryID>/oauth2/token

08

I’ve also included the URL of the Data Lake in the Simon-ADLS-URL key.

09

Next we need to give the App Registration permissions to read the data from the Data Lake.

13

We need to make sure that all folders from the root down to the files have Execute permissions as an access entry.

In a Production environment it’s likely that there we will be some process that loads new files into the Data Lake.  For the folder(s) where the files are loaded we need to set the permission as Read, Execute and an access and default permission entry to all folders and their children.  This means that any new files added to these folders will be given these same permissions by default.  If this isn’t set up then the new files will cause the Databricks code to fail.

To add these permissions click on Access on the relevant folder.  Then click Add.

14

Enter the App Registration name in the text box and select it when it is displayed below.

15

Then select the appropriate permissions as described above.  Once you’ve got far enough down the chain of folders where all sub-folders and files need to be accessed by Databricks then you can choose This folder and all children.

16

Now we’ve got all our sensitive data stored in Azure Key Vault Secrets and permissions on the Data Lake set up we need to create an Azure Databricks Secret Scope and link it to our Key Vault.

I’m assuming you’ve already managed to set up Azure Databricks, if not, you can do this quite easily via the Azure Portal.

Once you’ve got Databricks set up you’ll connect to it via a URL something like…

https://northeurope.azuredatabricks.net/?o=1234567890

As you can see from this URL, my Databricks is running in North Europe so to create a new Secret Scope we need to navigate to…

https://northeurope.azuredatabricks.net#secrets/createScope

Once we’ve logged in, the following page is displayed.  Give the scope a name and chose an option for the Managed Principal.  For this demo I’m just using the Standard Pricing Tier for Databricks so I have to choose All Users.  The Creator option is only available in Premium Tier.

Enter the DNS NAME and RESOURCE ID values from the Properties of the Key Vault and click Create.

10

We’re finally ready!

Now we can start a Databricks cluster and enter the following in a new Python notepad (the syntax for Scala is very similar)

client_id = dbutils.secrets.get(scope = "SimonTemp", key = "Simon-ADLS-ClientID")
credential = dbutils.secrets.get(scope = "SimonTemp", key = "Simon-ADLS-Credential")
refresh_url = dbutils.secrets.get(scope = "SimonTemp", key = "Simon-ADLS-Refresh-URL")
adls_url = dbutils.secrets.get(scope = "SimonTemp", key = "Simon-ADLS-URL")

spark.conf.set("dfs.adls.oauth2.access.token.provider.type", "ClientCredential")
spark.conf.set("dfs.adls.oauth2.client.id", client_id)
spark.conf.set("dfs.adls.oauth2.credential", credential)
spark.conf.set("dfs.adls.oauth2.refresh.url", refresh_url)

This code is using the Databricks Secret Scope named SimonTemp created above to access the four secrets we put in the Azure Key Vault and store in variables.

We can then use the SparkSession object spark which is available automatically in an Azure Databricks cluster and use the client_idcredential and refresh_url variables to set the authentication values required to connect to the Data Lake.

Now we can use the final variable adls_url along with the rest of the path to read the CSVs files and show the data as follows…

17

If you’ve managed to follow these fairly long instructions hopefully you’re able to read your data.  However, if your read statement is just hanging then it’s likely you’ve not set the correct values in the key vault.  This happened to me the first time I tried this and I didn’t get any errors in Databricks.  I had to re-enter the values in my key vault secrets to fix it.

If you’ve not set up permissions correctly in the Data Lake then you will receive an error something like…

org.apache.hadoop.security.AccessControlException: LISTSTATUS failed with error 0x83090aa2 (Forbidden. ACL verification failed. Either the resource does not exist or the user is not authorized to perform the requested operation.).

In this case make sure the App Registration has access from the root folder all the way down to the files you are trying to read.