There are numerous methods for performing upserts from an Azure Stream Analytics data stream in Azure Synapse Analytics. One approach is to replicate the pattern shown in Figure 7.25. Instead of placing the data directly into the final result table, you would place it first into an input table (i.e., a landing table or temporary table). As of this writing, the MERGE command in Azure Synapse Analytics is in preview, but it will be supported in the near term. Use the MERGE command between the two tables placed into a scheduled pipeline stored procedure activity. The following code snippet is an example of the command to create such a stored procedure: CREATE PROC [brainwaves].[uspMergeBrainwaveMedians] AS
MERGE INTO [brainwaves].[BrainwaveMedians] target
USING [brainwaves].[BrainwaveMediansInput] AS i
ON target.[ReadingDate] = i.[ReadingDate]
When the uspMergeBrainwaveMedians stored procedure is executed, it compares data on the target BrainwavesMedians table with the data on the input BrainwaveMediansInput table. If a match is found using a column named ReadingDate that exists on both tables, it means the row has been inserted once before. In this case the values on the target table (i.e., the results table) are updated with the values from the input table. If no match is found, it is concluded that the record is new and it is inserted into the target table. The text used to create the stored procedure is in the uspMergeBrainwaveMedians.sql file in the Chapter07 directory on GitHub at https://github.com/benperk/ADE.
In Exercise 5.1 you created a stored procedure named uspCreateAndPopulateFactReading and added it to the IngestTransformBrainwaveReadings Azure Synapse Analytics pipeline (refer to Figure 5.3). You performed that exercise before learning about triggers and scheduling, which were discussed in Chapter 6, so you were instructed to trigger the pipeline manually. However, now you can use what you learned in Chapter 6 about the different types of triggers to determine which kind to use. The Azure Stream Analytics query used in Exercise 7.5 created a tumbling window in 5‐second intervals. Perhaps in this scenario either a scheduled or tumbling window trigger would be most appropriate. Considering that the data stream will flow through the serving layer, which is not real time, it might be prudent to trigger the pipeline using a schedule trigger. How frequently to run the pipeline depends on how urgently the data needs to be delivered along this path.
The other approach for performing upserts covered here is to use an Azure function. As shown in Figure 7.5, an Azure function can be used as an output with an Azure Stream Analytics job. The data is sent in JSON format to the Azure function from Azure Stream Analytics in the body of an HTTPS POST and resembles the following, depending on the Azure Stream Analytics query: [{“Scenario”: “Meditation”,
“ReadingDate”: 2021-07-29T12:41:00,
“medianAPLHA”: 5.801,
“medianBETA_H”: 0.9892,
“medianBETA_L”: 2.6132,
“medianGAMMA”: 0.7368,
“medianTHETA”: 31.6376}]
It is possible to make a connection to your Azure Synapse Analytics SQL pool using a connection string from a remote source. In this case the remote source is an Azure function. You can find the connection string for your SQL pool on the Dedicated SQL Pool blade in the Azure portal, as shown in Figure 7.33.
FIGURE 7.33 Upsert on streamed data using an Azure function, connection string
There are many approaches for executing an upsert on streaming data. They are limited only by your imagination and their support by the product you need to perform the action on.
Azure Cosmos DB
As you learned in the previous section, the output of the Azure Stream Analytics job is in JSON format. This means it is natively suited to flow directly into an Azure Cosmos DB. Figure 3.13 shows an Azure Cosmos DB as an example of a datastore in the serving layer of the lambda architecture. The upsert of data stored on an Azure Cosmos DB is built into the product itself when a compatibility level of 1.2 is set in your Azure Stream Analytics job. Azure Cosmos DB uses the document ID of the uploaded record to determine whether an insert or an update is required. As you would expect, when an existing document is found in the container that matches the document ID of the one received by Azure Cosmos DB, an update is performed to that document. Otherwise, when no document with the provided document ID exists in the container, the document is inserted. To get some experience and learn firsthand how this works, complete Exercise 7.8, where you will perform an upsert from Azure Stream Analytics on a document stored in an Azure Cosmos DB database container.