1. Log in to the Azure portal at https://portal.azure.com➢ navigate to the Azure Stream Analytics job you created in Exercise 3.17➢ select Outputs from the navigation menu ➢ select the + Add drop‐down menu ➢ select Cosmos DB ➢ enter an output alias (I used cosmosDB) ➢ click the Select Cosmos DB from your subscription radio button ➢ choose the subscription ➢ select the Account ID ➢ select the database (consider using the Cosmos DB you created in Exercise 2.2, but it is not required) ➢ select the container name ➢ select Connection String from the Authentication Mode drop‐down list box (if the Account Key field is greyed out, manually provide the settings) ➢ enter ReadingDate into the Document Id text box ➢ and then click Save. The configuration should resemble Figure 7.34.

FIGURE 7.34 Upserting streamed data on Azure Cosmos DB—configuring output

  1. Select Query from the navigation menu ➢ enter the following query into the query window ➢ and then click Save Query. The query is available in the StreamAnalyticsQuery.txt file located in the Chapter07/Ch07Ex08 directory on GitHub.
  2. Start the Azure Stream Analytics job ➢ download and extract the files located in the brainjammer.zip file in the Chapter07/Ch07Ex08 directory on GitHub ➢ and then download the following JSON files from the GitHub location:
    csharpguitar-brainjammer-pow-upsert-10.json
    csharpguitar-brainjammer-pow-upsert-5.json
  3. Open a command window and provide the required parameters to stream the first of the two brain wave files (csharpguitar‐brainjammer‐pow‐upsert‐10.json) to Event Hubs ➢ Azure Stream Analytics ➢ Azure Cosmos DB, as shown in Figure 7.35.

FIGURE 7.35 Streaming data into Azure Cosmos DB using the command console

  1. Navigate to the Azure Cosmos DB that you provisioned in Exercise 2.2 and configured for the output alias in step 1 ➢ click the Data Explorer navigation item ➢ drill down to the Items collection ➢ and then select one of the streamed documents. Note that you might need to click the Refresh icon to the right of the /pk column. Figure 7.36 shows the result.

FIGURE 7.36 Inserting streamed data on Azure Cosmos DB, initial load

  1. Click each streamed document and take special note of the THETA values for documents with the following IDs:
  2. Repeat step 4 using the csharpguitar‐brainjammer‐pow‐upsert‐5.json file ➢ view the documents again ➢ observe the differences ➢ and then stop the Azure Stream Analytics job.

The first action taken in Exercise 7.8 was to configure an output alias for an Azure Cosmos DB. You chose the connection string authentication mode that utilizes the accessible Azure Cosmos DB endpoint, which resembles the following: https://csharpguitar.documents.azure.com. Combining that endpoint with an access key, which is found on the Keys blade of the Azure Cosmos DB account, meets the requirements for making a connection to the database and container. You also added the value of ReadingDate to the Document Id text box, which results in the storage of the value into the id column in the Items collection of the sessions container in Azure Cosmos DB, as shown in Figure 7.36.

The format of the JSON files did not change from the ones used in previous examples. The formatting of the data stream did need to change so that the data matched the values selected by the Azure Stream Analytics query you added in step 2. First, the ReadingDate needed to be added so that something in the stream could be used to uniquely identify a document. There also needed to be a partition key named pk so that the processing of data on the Azure Cosmos DB platform could utilize parallelism across numerous nodes. Remember that you set this partition key name during the provisioning of the database and container in Exercise 2.2. There needed to also be an attribute in the streamed data named pk in order for the partitioning to be applied. This was achieved by updating the brainjammer.exe application. The code is in the Program.cs file in the Chapter07/Ch07Ex08 directory. The snippet that achieved this is shown here:

Notice that the Azure Stream Analytics query was updated to use the output alias of cosmosDB next to the INTO command. This is how the query is instructed to send the data stream to the Azure Cosmos DB. The input alias of brainwaves next to the FROM command remained the same as the data is streamed to the Event Hubs endpoints, as it has been for previous exercises.
The first file that was streamed, csharpguitar‐brainjammer‐pow‐upsert‐10.json, contains 10 brain wave readings. Because none of these readings currently exist in the container, all of them are inserted. Notice that 10 records are added to the container shown in Figure 7.36. The second file, csharpguitar‐brainjammer‐pow‐upsert‐5.json, contains three records with a ReadingDate that is equal to documents that currently exist in the container and two that do not have a match. This results in two new documents being added to the container, bringing the total up to 12, and the other three being updated. This is the example of an upsert at its finest. The first three upserted documents with the new THETA values and the two new documents are listed here.

Lastly, you stopped the Azure Stream Analytics job to save costs, as it is no longer required.

Katie Cox

Learn More

Leave a Reply

Your email address will not be published. Required fields are marked *