Azure Stream Analytics
Azure Stream Analytics
Azure Stream Analytics is an event-processing engine that is designed to examine high volumes of streaming
data. Patterns and relationships can be identified in information extracted from a number of input sources
including devices, sensors, websites, social media feeds, and applications. These patterns can be used to trigger
other actions downstream like creating alerts, feeding information to a reporting tool, or storing transformed
data for later use.
The following scenarios are examples of when you can use Azure Stream Analytics:
Internet of Things (IoT) sensor fusion and real-time analytics on device telemetry
Web logs/clickstream analytics
Geospatial analytics for fleet management and driverless vehicles
Remote monitoring and predictive maintenance of hi-value assets
Real-time analytics on Point of Sale data for inventory control and anomaly detection
Programmer productivity
Azure Stream Analytics uses a simple SQL -based query language that has been augmented with powerful
temporal constraints to analyze data in motion. To define job transformations, you use a simple, declarative
Stream Analytics query language that lets you author complex temporal queries and analytics using simple
SQL constructs. Because Stream Analytics query language is consistent to the SQL language, familiarity with
SQL is sufficient to start creating jobs. You can also create jobs by using developer tools like Azure PowerShell,
Stream Analytics Visual Studio tools, the Stream Analytics Visual Studio Code extension, or Azure Resource
Manager templates. Using developer tools allow you to develop transformation queries offline and use the
CI/CD pipeline to submit jobs to Azure.
The Stream Analytics query language offers a wide array of functions for analyzing and processing streaming
data. This query language supports simple data manipulation, aggregation functions, and complex geospatial
functions. You can edit queries in the portal and test them using sample data that is extracted from a live
stream.
You can extend the capabilities of the query language by defining and invoking additional functions. You can
define function calls in the Azure Machine Learning service to take advantage of Azure Machine Learning
solutions, and integrate JavaScript or C# user-defined functions (UDFs) or user-defined aggregates to perform
complex calculations as part a Stream Analytics query.
Fully managed
Azure Stream Analytics is a fully managed serverless (PaaS ) offering on Azure. You don’t have to provision any
hardware or manage clusters to run your jobs. Azure Stream Analytics fully manages your job by setting up
complex compute clusters in the cloud and taking care of the performance tuning necessary to run the job.
Integration with Azure Event Hubs and Azure IoT Hub allows your job to ingest millions of events per second
coming from a number of sources, to include connected devices, clickstreams, and log files. Using the
partitioning feature of Event Hubs, you can partition computations into logical steps, each with the ability to be
further partitioned to increase scalability.
Mission-critical ready
Azure Stream Analytics is available across multiple regions worldwide and is designed to run mission-critical
workloads by supporting reliability, security and compliance requirements.
Reliability
Azure Stream Analytics guarantees exactly-once event processing and at-least-once delivery of events, so
events are never lost. Exactly-once processing is guaranteed with selected output as described in Event Delivery
Guarantees.
Azure Stream Analytics has built-in recovery capabilities in case the delivery of an event fails. Stream Analytics
also provides built-in checkpoints to maintain the state of your job and provides repeatable results.
As a managed service, Stream Analytics guarantees event processing with a 99.9% availability at the minute
level. For more information, see the Stream Analytics SLA page.
Security
In terms of security, Azure Stream Analytics encrypts all incoming and outgoing communications and supports
TLS 1.2. Built-in checkpoints are also encrypted. Stream Analytics doesn't store the incoming data since all
processing is done in-memory.
Compliance
Azure Stream Analytics follows multiple compliance certifications as described in the overview of Azure
compliance.
Performance
Stream Analytics can process millions of events every second and it can deliver results with low latency. It
allows you to scale-up and scale-out to handle large real-time and complex event processing applications.
Stream Analytics supports performance by partitioning, allowing complex queries to be parallelized and
executed on multiple streaming nodes. Azure Stream Analytics is built on Trill, a high-performance in-memory
streaming analytics engine developed in collaboration with Microsoft Research.
Next steps
You now have an overview of Azure Stream Analytics. Next, you can dive deep and create your first Stream
Analytics job:
Create a Stream Analytics job by using the Azure portal.
Create a Stream Analytics job by using Azure PowerShell.
Create a Stream Analytics job by using Visual Studio.
Create a Stream Analytics job by using Visual Studio Code.
Azure Stream Analytics solution patterns
5/16/2019 • 13 minutes to read • Edit Online
Like many other services in Azure, Stream Analytics is best used with other services to create a larger end-to-end
solution. This article discusses simple Azure Stream Analytics solutions and various architectural patterns. You can
build on these patterns to develop more complex solutions. The patterns described in this article can be used in a
wide variety of scenarios. Examples of scenario-specific patterns are available on Azure solution architectures.
This solution can be built in just a few minutes from Azure portal. There is no extensive coding involved, and SQL
language is used to express the business logic.
This real-time dashboard solution pattern offers the lowest latency from the event source to the Power BI
dashboard in a browser. Azure Stream Analytics is the only Azure service with this built-in capability.
Using SQL database gives you more flexibility at the expense of increased latency. This solution is optimal for jobs
with latency requirements greater than one second. With this method, you can maximize Power BI's utility to
further slice and dice the data for reports. You also gain the flexibility of using other dashboard solutions, such as
Tableau.
SQL is not a high throughput data store, and the maximum throughput to a SQL database from Azure Stream
Analytics is 24 MB/s. If the event sources in your solution produce data at a higher rate, you need to use processing
logic in Stream Analytics to reduce the output rate to SQL. Techniques such as filtering, windowed aggregates,
pattern matching with temporal joins, and analytic functions can be used. The output rate to SQL can be further
optimized using techniques described in Azure Stream Analytics output to Azure SQL Database.
Event Hubs, on the other hand, offers the most flexible integration point. Many other services, like Azure Data
Explorer and Time Series Insight, can consume events from Event Hubs. Services can be connected directly to the
Event Hubs sink from Azure Stream Analytics to complete the solution. Event Hubs is also the highest throughput
messaging broker available on Azure for such integration scenarios.
In this solution pattern, events are processed and aggregated into data stores by Azure Stream Analytics. The
application layer interacts with data stores using the traditional request/response pattern. Because of Stream
Analytics' ability to process a large number of events in real-time, the application is highly scalable without the
need to bulk up the data store layer. The data store layer is essentially a materialized view in the system. Azure
Stream Analytics output to Azure Cosmos DB describes how Cosmos DB is used as a Stream Analytics output.
In real applications where processing logic is complex and there is the need to upgrade certain parts of the logic
independently, multiple Stream Analytics jobs can be composed together with Event Hubs as the intermediary
event broker.
This pattern improves the resiliency and manageability of the system. However, even though Stream Analytics
guarantees exactly once processing, there is a small chance that duplicate events may land in the intermediary
Event Hubs. It's important for the downstream Stream Analytics job to dedupe events using logic keys in a
lookback window. For more information on event delivery, see Event Delivery Guarantees reference.
Dashboarding Creates gap OK for short outage Use for long outage
Event sourcing app Acceptable OK for short outage Use for long outage
Next steps
You now have seen a variety of solution patterns using Azure Stream Analytics. Next, you can dive deep and create
your first Stream Analytics job:
Create a Stream Analytics job by using the Azure portal.
Create a Stream Analytics job by using Azure PowerShell.
Create a Stream Analytics job by using Visual Studio.
Quickstart: Create a Stream Analytics job by using
the Azure portal
6/4/2019 • 6 minutes to read • Edit Online
This quickstart shows you how to get started with creating a Stream Analytics job. In this quickstart, you define a
Stream Analytics job that reads real-time streaming data and filters messages with a temperature greater than
27. Your Stream Analytics job will read data from an IoT Hub device, transform the data, and write the data back
to a container in blob storage. The input data used in this quickstart is generated by a Raspberry Pi online
simulator.
Region <Select the region that is closest to Select a geographic location where
your users> you can host your IoT Hub. Use the
location that's closest to your users.
IoT Hub Name MyASAIoTHub Select a name for your IoT Hub.
4. Select Next: Set size and scale.
5. Choose your Pricing and scale tier. For this quickstart, select the F1 - Free tier if it's still available on
your subscription. For more information, see IoT Hub pricing.
6. Select Review + create. Review your IoT Hub information and click Create. Your IoT Hub might take a
few minutes to create. You can monitor the progress in the Notifications pane.
7. In your IoT Hub navigation menu, click Add under IoT devices. Add a Device ID and click Save.
8. Once the device is created, open the device from the IoT devices list. Copy the Connection string --
primary key and save it to a notepad to use later.
4. From the Blob Service page, select Container and provide a name for your container, such as
container1. Leave the Public access level as Private (no anonymous access) and select OK.
Location <Select the region that is closest to Select geographic location where
your users> you can host your Stream Analytics
job. Use the location that's closest to
your users for better performance
and to reduce the data transfer cost.
IoT Hub MyASAIoTHub Enter the name of the IoT Hub you
created in the previous section.
4. Leave other options to default values and select Save to save the settings.
SELECT *
INTO BlobOutput
FROM IoTHubInput
HAVING Temperature > 27
3. In this example, the query reads the data from IoT Hub and copies it to a new file in the blob. Select Save.
Run the IoT simulator
1. Open the Raspberry Pi Azure IoT Online Simulator.
2. Replace the placeholder in Line 15 with the Azure IoT Hub device connection string you saved in a
previous section.
3. Click Run. The output should show the sensor data and messages that are being sent to your IoT Hub.
Next steps
In this quickstart, you deployed a simple Stream Analytics job using Azure portal. You can also deploy Stream
Analytics jobs using PowerShell, Visual Studio, and Visual Studio Code.
To learn about configuring other input sources and performing real-time detection, continue to the following
article:
Real-time fraud detection using Azure Stream Analytics
Quickstart: Create a Stream Analytics job using
Azure PowerShell
3/26/2019 • 9 minutes to read • Edit Online
The Azure PowerShell module is used to create and manage Azure resources using PowerShell cmdlets or scripts.
This quickstart details using the Azure PowerShell module to deploy and run an Azure Stream Analytics job.
The example job reads streaming data from an IoT Hub device. The input data is generated by a Raspberry Pi
online simulator. Next, the Stream Analytics job transforms the data using the Stream Analytics query language to
filter messages with a temperature greater than 27°. Finally, it writes the resulting output events into a file in blob
storage.
Sign in to Azure
Sign in to your Azure subscription with the Connect-AzAccount command, and enter your Azure credentials in the
pop-up browser:
If you have more than one subscription, select the subscription you would like to use for this quickstart by
running the following cmdlets. Make sure to replace <your subscription name> with the name of your
subscription:
# Select the Azure subscription you want to use to create the resource group and resources.
Get-AzSubscription -SubscriptionName "<your subscription name>" | Select-AzSubscription
Create a resource group
Create an Azure resource group with New -AzResourceGroup. A resource group is a logical container into which
Azure resources are deployed and managed.
$resourceGroup = "StreamAnalyticsRG"
$location = "WestUS2"
New-AzResourceGroup `
-Name $resourceGroup `
-Location $location
az login
2. Create an IoT Hub using the az iot hub create command. This example creates an IoT Hub called
MyASAIoTHub. Because IoT Hub names are unique, you need to come up with your own IoT Hub name.
Set the SKU to F1 to use the free tier if it is available with your subscription. If not, choose the next lowest
tier.
az iot hub create --name "<your IoT Hub name>" --resource-group $resourceGroup --sku S1
Once the IoT hub has been created, get the IoT Hub connection string using the az iot hub show -
connection-string command. Copy the entire connection string and save it for when you add the IoT Hub
as input to your Stream Analytics job.
3. Add a device to IoT Hub using the az iothub device-identity create command. This example creates a device
called MyASAIoTDevice.
4. Get the device connection string using the az iot hub device-identity show -connection-string command.
Copy the entire connection string and save it for when you create the Raspberry Pi simulator.
HostName=MyASAIoTHub.azure-
devices.net;DeviceId=MyASAIoTDevice;SharedAccessKey=a2mnUsg52+NIgYudxYYUNXI67r0JmNubmfVafojG8=
$storageAccountName = "myasaquickstartstorage"
$storageAccount = New-AzStorageAccount `
-ResourceGroupName $resourceGroup `
-Name $storageAccountName `
-Location $location `
-SkuName Standard_LRS `
-Kind Storage
$ctx = $storageAccount.Context
$containerName = "container1"
New-AzStorageContainer `
-Name $containerName `
-Context $ctx
$storageAccountKey = (Get-AzStorageAccountKey `
-ResourceGroupName $resourceGroup `
-Name $storageAccountName).Value[0]
Write-Host "The <storage account key> placeholder needs to be replaced in your output json files with
this key value:"
Write-Host $storageAccountKey -ForegroundColor Cyan
Next, run the New-AzStreamAnalyticsJob cmdlet. Replace the value of jobDefinitionFile variable with the path
where you've stored the job definition JSON file.
$jobName = "MyStreamingJob"
$jobDefinitionFile = "C:\JobDefinition.json"
New-AzStreamAnalyticsJob `
-ResourceGroupName $resourceGroup `
-File $jobDefinitionFile `
-Name $jobName `
-Force
{
"properties": {
"type": "Stream",
"datasource": {
"type": "Microsoft.Devices/IotHubs",
"properties": {
"iotHubNamespace": "MyASAIoTHub",
"sharedAccessPolicyName": "iothubowner",
"sharedAccessPolicyKey": "accesspolicykey",
"endpoint": "messages/events",
"consumerGroupName": "$Default"
}
},
"compression": {
"type": "None"
},
"serialization": {
"type": "Json",
"properties": {
"encoding": "UTF8"
}
}
},
"name": "IoTHubInput",
"type": "Microsoft.StreamAnalytics/streamingjobs/inputs"
}
Next, run the New-AzStreamAnalyticsInput cmdlet, make sure to replace the value of jobDefinitionFile variable
with the path where you've stored the job input definition JSON file.
$jobInputName = "IoTHubInput"
$jobInputDefinitionFile = "C:\JobInputDefinition.json"
New-AzStreamAnalyticsInput `
-ResourceGroupName $resourceGroup `
-JobName $jobName `
-File $jobInputDefinitionFile `
-Name $jobInputName
{
"properties": {
"datasource": {
"type": "Microsoft.Storage/Blob",
"properties": {
"storageAccounts": [
{
"accountName": "asaquickstartstorage",
"accountKey": "<storage account key>"
}
],
"container": "container1",
"pathPattern": "output/",
"dateFormat": "yyyy/MM/dd",
"timeFormat": "HH"
}
},
"serialization": {
"type": "Json",
"properties": {
"encoding": "UTF8",
"format": "LineSeparated"
}
}
},
"name": "BlobOutput",
"type": "Microsoft.StreamAnalytics/streamingjobs/outputs"
}
Next, run the New-AzStreamAnalyticsOutput cmdlet. Make sure to replace the value of jobOutputDefinitionFile
variable with the path where you have stored the job output definition JSON file.
$jobOutputName = "BlobOutput"
$jobOutputDefinitionFile = "C:\JobOutputDefinition.json"
New-AzStreamAnalyticsOutput `
-ResourceGroupName $resourceGroup `
-JobName $jobName `
-File $jobOutputDefinitionFile `
-Name $jobOutputName -Force
{
"name":"MyTransformation",
"type":"Microsoft.StreamAnalytics/streamingjobs/transformations",
"properties":{
"streamingUnits":1,
"script":null,
"query":" SELECT * INTO BlobOutput FROM IoTHubInput HAVING Temperature > 27"
}
}
Next run the New-AzStreamAnalyticsTransformation cmdlet. Make sure to replace the value of
jobTransformationDefinitionFile variable with the path where you've stored the job transformation definition
JSON file.
$jobTransformationName = "MyJobTransformation"
$jobTransformationDefinitionFile = "C:\JobTransformationDefinition.json"
New-AzStreamAnalyticsTransformation `
-ResourceGroupName $resourceGroup `
-JobName $jobName `
-File $jobTransformationDefinitionFile `
-Name $jobTransformationName -Force
Start-AzStreamAnalyticsJob `
-ResourceGroupName $resourceGroup `
-Name $jobName `
-OutputStartMode 'JobStartTime'
Clean up resources
When no longer needed, delete the resource group, the streaming job, and all related resources. Deleting the job
avoids billing the streaming units consumed by the job. If you're planning to use the job in future, you can skip
deleting it, and stop the job for now. If you aren't going to continue to use this job, delete all resources created by
this quickstart by running the following cmdlet:
Remove-AzResourceGroup `
-Name $resourceGroup
Next steps
In this quickstart, you deployed a simple Stream Analytics job using PowerShell. You can also deploy Stream
Analytics jobs using the Azure portal and Visual Studio.
To learn about configuring other input sources and performing real-time detection, continue to the following
article:
Real-time fraud detection using Azure Stream Analytics
Quickstart: Create a Stream Analytics job by using
the Azure Stream Analytics tools for Visual Studio
5/29/2019 • 6 minutes to read • Edit Online
This quickstart shows you how to create and run a Stream Analytics job using Azure Stream Analytics tools for
Visual Studio. The example job reads streaming data from an IoT Hub device. You define a job that calculates
the average temperature when over 27° and writes the resulting output events to a new file in blob storage.
Region <Select the region that is closest to Select a geographic location where
your users> you can host your IoT Hub. Use the
location that's closest to your users.
IoT Hub Name MyASAIoTHub Select a name for your IoT Hub.
4. Select Next: Set size and scale.
5. Choose your Pricing and scale tier. For this quickstart, select the F1 - Free tier if it's still available on
your subscription. If the free tier is unavailable, choose the lowest tier available. For more information,
see IoT Hub pricing.
6. Select Review + create. Review your IoT Hub information and click Create. Your IoT Hub might take a
few minutes to create. You can monitor the progress in the Notifications pane.
7. In your IoT Hub navigation menu, click Add under IoT devices. Add a Device ID and click Save.
8. Once the device is created, open the device from the IoT devices list. Copy the Connection string --
primary key and save it to a notepad to use later.
3. Once your storage account is created, select the Blobs tile on the Overview pane.
4. From the Blob Service page, select Container and provide a name for your container, such as
container1. Leave the Public access level as Private (no anonymous access) and select OK.
Notice the elements that are included in an Azure Stream Analytics project.
Define input
1. In Solution Explorer, expand the Inputs node and double-click Input.json.
2. Fill out the Stream Analytics Input Configuration with the following values:
SETTING SUGGESTED VALUE DESCRIPTION
Resource Choose data source from current Choose to enter data manually or
account select an existing account.
3. Leave other options to default values and select Save to save the settings.
Define output
1. In Solution Explorer, expand the Outputs node and double-click Output.json.
2. Fill out the Stream Analytics Output Configuration with the following values:
3. Leave other options to default values and select Save to save the settings.
Define the transformation query
1. Open Script.asaql from Solution Explorer in Visual Studio.
2. Add the following query:
SELECT *
INTO BlobOutput
FROM IoTHubInput
HAVING Temperature > 27
3. Note the job status has changed to Running, and there are input/output events. This may take a few
minutes.
4. To view results, on the View menu, select Cloud Explorer, and navigate to the storage account in your
resource group. Under Blob Containers, double-click container1, and then the output file path.
Clean up resources
When no longer needed, delete the resource group, the streaming job, and all related resources. Deleting the
job avoids billing the streaming units consumed by the job. If you're planning to use the job in future, you can
stop it and restart it later when you need. If you are not going to continue to use this job, delete all resources
created by this quickstart by using the following steps:
1. From the left-hand menu in the Azure portal, select Resource groups and then select the name of the
resource you created.
2. On your resource group page, select Delete, type the name of the resource to delete in the text box, and
then select Delete.
Next steps
In this quickstart, you deployed a simple Stream Analytics job using Visual Studio. You can also deploy Stream
Analytics jobs using the Azure portal and PowerShell.
To learn about Azure Stream Analytics tools for Visual Studio, continue to the following article:
Use Visual Studio to view Azure Stream Analytics jobs
Quickstart: Create an Azure Stream Analytics cloud
job in Visual Studio Code (Preview)
5/16/2019 • 7 minutes to read • Edit Online
This quickstart shows you how to create and run a Stream Analytics job using the Azure Stream Analytics
extension for Visual Studio Code. The example job reads streaming data from an IoT Hub device. You define a
job that calculates the average temperature when over 27° and writes the resulting output events to a new file in
blob storage.
NOTE
Azure Stream Analytics tools will automatically sign in the next time if you don't sign out. If your account has two-factor
authentication, it is recommended that you use phone authentication rather than using a PIN. If you have issues listing
resources, signing out and signing in again usually helps. To sign out, enter the command Azure: Sign Out .
Region <Select the region that is closest to Select a geographic location where
your users> you can host your IoT Hub. Use the
location that's closest to your users.
IoT Hub Name MyASAIoTHub Select a name for your IoT Hub.
3. Once your storage account is created, select the Blobs tile on the Overview pane.
4. From the Blob Service page, select Container and provide a name for your container, such as
container1. Leave the Public access level as Private (no anonymous access) and select OK.
2. Input your project name, like myASAproj and select a folder for your project.
3. The new project will be added to your workspace. An ASA project consists of the query script (*.asaql), a
JobConfig.json file, and an asaproj.json configuration file.
4. The asaproj.json configuration file contains the inputs, outputs, and job configuration file information
needed for submitting the Stream Analytics job to Azure.
NOTE
When adding inputs and outputs from the command palette, the corresponding paths will be added into asaproj.json
automatically. If you add or remove inputs or outputs on disk directly, you need to manually add or remove them from
asaproj.json. You can choose to put the inputs and outputs in one place then reference them in different jobs by
specifying the paths in each asaproj.json.
Define an input
1. Select Ctrl+Shift+P to open the command palette and enter ASA: Add Input.
3. Choose the ASA query script that will use the input. It should automatically populate with the file path to
myASAproj.asaql.
EndPoint Messaging
SharedAccessPolicyName iothubowner
Define an output
1. Select Ctrl+Shift+P to open the command palette. Then, enter ASA: Add Output.
SELECT *
INTO Output
FROM Input
HAVING Temperature > 27
Clean up resources
When no longer needed, delete the resource group, the streaming job, and all related resources. Deleting the job
avoids billing the streaming units consumed by the job. If you're planning to use the job in future, you can stop it
and restart it later when you need. If you are not going to continue to use this job, delete all resources created by
this quickstart by using the following steps:
1. From the left-hand menu in the Azure portal, select Resource groups and then select the name of the
resource you created.
2. On your resource group page, select Delete, type the name of the resource to delete in the text box, and
then select Delete.
Next steps
In this quickstart, you deployed a simple Stream Analytics job using Visual Studio Code. You can also deploy
Stream Analytics jobs using the Azure portal, PowerShell, and Visual Studio (stream-analytics-quick-create-
vs.md).
To learn about Azure Stream Analytics tools for Visual Studio, continue to the following article:
Use Visual Studio to view Azure Stream Analytics jobs
Analyze phone call data with Stream Analytics and
visualize results in Power BI dashboard
6/4/2019 • 11 minutes to read • Edit Online
This tutorial teaches how to analyze phone call data using Azure Stream Analytics. The phone call data, generated
by a client application, contains some fraudulent calls, which will be filtered by the Stream Analytics job.
In this tutorial, you learn how to:
Generate sample phone call data and send it to Azure Event Hubs
Create a Stream Analytics job
Configure job input and output
Define a query to filter fraudulent calls
Test and start the job
Visualize results in Power BI
Prerequisites
Before you start, do the following actions:
If you don't have an Azure subscription, create a free account.
Sign in to the Azure portal.
Download the phone call event generator app TelcoGenerator.zip from the Microsoft Download Center or get
the source code from GitHub.
You will need Power BI account.
4. Paste the connection string into a text editor. You need this connection string in the next section.
The connection string looks as follows:
Endpoint=sb://<Your event hub namespace>.servicebus.windows.net/;SharedAccessKeyName=<Your shared access
policy name>;SharedAccessKey=<generated key>;EntityPath=<Your event hub name>
Notice that the connection string contains multiple key-value pairs separated with semicolons: Endpoint,
SharedAccessKeyName, SharedAccessKey, and EntityPath.
Start the event generator application
Before you start the TelcoGenerator app, you should configure it to send data to the Azure Event Hubs you created
earlier.
1. Extract the contents of TelcoGenerator.zip file.
2. Open the TelcoGenerator\TelcoGenerator\telcodatagen.exe.config file in a text editor of your choice (There is
more than one .config file, so be sure that you open the right one.)
3. Update the <appSettings> element in the config file with the following details:
Set the value of the EventHubName key to the value of the EntityPath in the connection string.
Set the value of the Microsoft.ServiceBus.ConnectionString key to the connection string without the
EntityPath value (don't forget to remove the semicolon that precedes it).
4. Save the file.
5. Next open a command window and change to the folder where you unzipped the TelcoGenerator
application. Then enter the following command:
RECORD DEFINITION
SwitchNum The telephone switch used to connect the call. For this
example, the switches are strings that represent the
country/region of origin (US, China, UK, Germany, or
Australia).
4. Use default options on the remaining settings, select Create, and wait for the deployment to succeed.
Configure job input
The next step is to define an input source for the job to read data using the event hub you created in the previous
section.
1. From the Azure portal, open the All resources pane, and find the ASATutorial Stream Analytics job.
2. In the Job Topology section of the Stream Analytics job pane, select the Inputs option.
3. Select + Add stream input and Event hub. Fill out the pane with the following values:
Event hub namespace myEventHubsNS Select the event hub namespace you
created in the previous section. All
the event hub namespaces available
in your current subscription are listed
in the dropdown.
Event Hub name MyEventHub Select the event hub you created in
the previous section. All the event
hubs available in your current
subscription are listed in the
dropdown.
Event Hub policy name MyPolicy Select the event hub shared access
policy you created in the previous
section. All the event hubs policies
available in your current subscription
are listed in the dropdown.
4. When you select Authorize, a pop-up window opens and you are asked to provide credentials to
authenticate to your Power BI account. Once the authorization is successful, Save the settings.
To check for fraudulent calls, you can self-join the streaming data based on the CallRecTime value. You can
then look for call records where the CallingIMSI value (the originating number) is the same, but the
SwitchNum value (country/region of origin) is different. When you use a JOIN operation with streaming
data, the join must provide some limits on how far the matching rows can be separated in time. Because the
streaming data is endless, the time bounds for the relationship are specified within the ON clause of the join
using the DATEDIFF function.
This query is just like a normal SQL join except for the DATEDIFF function. The DATEDIFF function used
in this query is specific to Stream Analytics, and it must appear within the ON...BETWEEN clause.
4. Save the query.
Next steps
In this tutorial, you created a simple Stream Analytics job, analyzed the incoming data, and presented results in a
Power BI dashboard. To learn more about Stream Analytics jobs, continue to the next tutorial:
Run Azure Functions within Stream Analytics jobs
Run Azure Functions from Azure Stream Analytics
jobs
3/7/2019 • 7 minutes to read • Edit Online
You can run Azure Functions from Azure Stream Analytics by configuring Functions as one of the output sinks to
the Stream Analytics job. Functions are an event-driven, compute-on-demand experience that lets you implement
code that is triggered by events occurring in Azure or third-party services. This ability of Functions to respond to
triggers makes it a natural output to Stream Analytics jobs.
Stream Analytics invokes Functions through HTTP triggers. The Functions output adapter allows users to connect
Functions to Stream Analytics, such that the events can be triggered based on Stream Analytics queries.
In this tutorial, you learn how to:
Create a Stream Analytics job
Create an Azure function
Configure Azure function as output to your job
If you don’t have an Azure subscription, create a free account before you begin.
// Throw an HTTP Request Entity Too Large exception when the incoming batch(dataArray) is greater
than 256 KB. Make sure that the size value is consistent with the value entered in the Stream Analytics
portal.
// Perform cache operations using the cache object. For example, the following code block adds few
integral data types to the cache
for (var i = 0; i < dataArray.Count; i++)
{
string time = dataArray[i].time;
string callingnum1 = dataArray[i].callingnum1;
string key = time + " - " + callingnum1;
db.StringSet(key, dataArray[i].ToString());
log.Info($"Object put in database. Key is {key} and value is {dataArray[i].ToString()}");
When Stream Analytics receives the "HTTP Request Entity Too Large" exception from the function, it
reduces the size of the batches it sends to Functions. In your function, use the following code to check that
Stream Analytics doesn’t send oversized batches. Make sure that the maximum batch count and size values
used in the function are consistent with the values entered in the Stream Analytics portal.
3. In a text editor of your choice, create a JSON file named project.json. Use the following code, and save it
on your local computer. This file contains the NuGet package dependencies required by the C# function.
{
"frameworks": {
"net46": {
"dependencies": {
"StackExchange.Redis":"1.1.603",
"Newtonsoft.Json": "9.0.1"
}
}
}
}
4. Go back to the Azure portal. From the Platform features tab, browse to your function. Under
Development Tools, select App Service Editor.
5. In the App Service Editor, right-click your root directory, and upload the project.json file. After the upload
is successful, refresh the page. You should now see an autogenerated file named project.lock.json. The
autogenerated file contains references to the .dll files that are specified in the project.json file.
Update the Stream Analytics job with the function as output
1. Open your Stream Analytics job on the Azure portal.
2. Browse to your function, and select Overview > Outputs > Add. To add a new output, select Azure
Function for the sink option. The Functions output adapter has the following properties:
Output alias A user-friendly name that you use in the job's query to
reference the output.
Import option You can use the function from the current subscription, or
provide the settings manually if the function is located in
another subscription.
Max Batch Size Sets the maximum size for each output batch which is sent
to your function in bytes. By default, this value is set to
262,144 bytes (256 KB).
Max Batch Count Specifies the maximum number of events in each batch
that is sent to the function. The default value is 100. This
property is optional.
SELECT
System.Timestamp as Time, CS1.CallingIMSI, CS1.CallingNum as CallingNum1,
CS2.CallingNum as CallingNum2, CS1.SwitchNum as Switch1, CS2.SwitchNum as Switch2
INTO saop1
FROM CallStream CS1 TIMESTAMP BY CallRecTime
JOIN CallStream CS2 TIMESTAMP BY CallRecTime
ON CS1.CallingIMSI = CS2.CallingIMSI AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
WHERE CS1.SwitchNum != CS2.SwitchNum
5. Start the telcodatagen.exe application by running the following command in command line (use the format
telcodatagen.exe [#NumCDRsPerHour] [SIM Card Fraud Probability] [#DurationHours] ):
telcodatagen.exe 1000 .2 2
6. Start the Stream Analytics job.
Known issues
In the Azure portal, when you try to reset the Max Batch Size/ Max Batch Count value to empty (default), the value
changes back to the previously entered value upon save. Manually enter the default values for these fields in this
case.
The use of Http routing on your Azure Functions is currently not supported by Stream Analytics.
Clean up resources
When no longer needed, delete the resource group, the streaming job, and all related resources. Deleting the job
avoids billing the streaming units consumed by the job. If you're planning to use the job in future, you can stop it
and re-start it later when you need. If you are not going to continue to use this job, delete all resources created by
this quickstart by using the following steps:
1. From the left-hand menu in the Azure portal, click Resource groups and then click the name of the resource
you created.
2. On your resource group page, click Delete, type the name of the resource to delete in the text box, and then
click Delete.
Next steps
In this tutorial, you have create a simple Stream Analytics job, that runs an Azure Function, to learn more about
Stream Analytics jobs, continue to the next tutorial:
Run JavaScript user-defined functions within Stream Analytics jobs
Tutorial: Azure Stream Analytics JavaScript user-
defined functions
2/28/2019 • 4 minutes to read • Edit Online
Azure Stream Analytics supports user-defined functions written in JavaScript. With the rich set of String, RegExp,
Math, Array, and Date methods that JavaScript provides, complex data transformations with Stream Analytics
jobs become easier to create.
In this tutorial, you learn how to:
Define a JavaScript user-defined function
Add the function to the portal
Define a query that runs the function
If you don’t have an Azure subscription, create a free account before you begin.
SELECT
time,
UDF.hex2Int(offset) AS IntOffset
INTO
output
FROM
InputStream
double Number
nvarchar(MAX) String
Record Object
Array Array
NULL Null
Date DateTime
String nvarchar(MAX)
Object Record
Array Array
Any other type (for example, a function or error) Not supported (results in runtime error)
Troubleshooting
JavaScript runtime errors are considered fatal, and are surfaced through the Activity log. To retrieve the log, in the
Azure portal, go to your job and select Activity log.
Sample query:
SELECT
DataString,
DataValue,
HexValue,
UDF.json_stringify(input) As InputEvent
INTO
output
FROM
input PARTITION BY PARTITIONID
Clean up resources
When no longer needed, delete the resource group, the streaming job, and all related resources. Deleting the job
avoids billing the streaming units consumed by the job. If you're planning to use the job in future, you can stop it
and re-start it later when you need. If you are not going to continue to use this job, delete all resources created by
this quickstart by using the following steps:
1. From the left-hand menu in the Azure portal, click Resource groups and then click the name of the resource
you created.
2. On your resource group page, click Delete, type the name of the resource to delete in the text box, and then
click Delete.
Get help
For additional help, try our Azure Stream Analytics forum.
Next steps
In this tutorial, you have created a Stream Analytics job that runs a simple JavaScript user-defined function. To
learn more about Stream Analytics, continue to the real-time scenario articles:
Real-time Twitter sentiment analysis in Azure Stream Analytics
Tutorial: Deploy an Azure Stream Analytics job with
CI/CD using Azure Pipelines
2/22/2019 • 5 minutes to read • Edit Online
This tutorial describes how to set up continuous integration and deployment for an Azure Stream Analytics job
using Azure Pipelines.
In this tutorial, you learn how to:
Add source control to your project
Create a build pipeline in Azure Pipelines
Create a release pipeline in Azure Pipelines
Automatically deploy and upgrade an application
Prerequisites
Before you start, make sure you have the following:
If you don't have an Azure subscription, create a free account.
Install Visual Studio and the Azure development or Data Storage and Processing workloads.
Create a Stream Analytics project in Visual Studio.
Create an Azure DevOps organization.
Share your Visual Studio solution to a new Azure Repos Git repo
Share your application source files to a project in Azure DevOps so you can generate builds.
1. Create a new local Git repo for your project by selecting Add to Source Control, then Git on the status bar
in the lower right-hand corner of Visual Studio.
2. In the Synchronization view in Team Explorer, select the Publish Git Repo button under Push to Azure
DevOps Services.
3. Verify your email and select your organization in the Azure DevOps Services Domain drop-down. Enter
your repository name and select Publish repository.
Publishing the repo creates a new project in your organization with the same name as the local repo. To
create the repo in an existing project, click Advanced next to Repository name, and select a project. You
can view your code in the browser by selecting See it on the web.
3. Under Triggers, enable continuous integration by checking Enable continuous integration trigger status.
Select Save and queue to manually start a build.
4. Builds are also triggered upon push or check-in. To check your build progress, switch to the Builds tab.
Once you verify that the build executes successfully, you must define a release pipeline that deploys your
application to a cluster. Right click on the ellipses next to your build pipeline and select Edit.
5. In Tasks, enter "Hosted" as the Agent queue.
/p:CompilerTaskAssemblyFile="Microsoft.WindowsAzure.StreamAnalytics.Common.CompileService.dll"
/p:ASATargetsFilePath="$(Build.SourcesDirectory)\packages\Microsoft.Azure.StreamAnalytics.CICD.1.0.0\bui
ld\StreamAnalytics.targets"
10. In Phase 1, click + and add an Azure Resource Group Deployment task.
11. Expand Azure Details and fill out the configuration with the following:
Override template parameters Type the template parameters to override in the textbox.
Example, –storageName fabrikam –adminUsername
$(vmusername) -adminPassword $(password) –
azureKeyVaultName $(fabrikamFibre). This property is
optional, but your build will result in errors if key
parameters are not overridden.
Pushing the changes to Azure DevOps Services automatically triggers a build. When the build pipeline successfully
completes, a release is automatically created and starts updating the job on the cluster.
Clean up resources
When no longer needed, delete the resource group, the streaming job, and all related resources. Deleting the job
avoids billing the streaming units consumed by the job. If you're planning to use the job in future, you can stop it
and re-start it later when you need. If you are not going to continue to use this job, delete all resources created by
this tutorial by using the following steps:
1. From the left-hand menu in the Azure portal, click Resource groups and then click the name of the resource
you created.
2. On your resource group page, click Delete, type the name of the resource to delete in the text box, and then
click Delete.
Next steps
To learn more about using Azure Stream Analytics tools for Visual Studio to set up a continuous integration and
deployment process, continue to the set up CI/CD pipeline article:
Continuously integrate and develop with Stream Analytics tools
Tutorial: Write a C# user-defined function for Azure
Stream Analytics Edge job (Preview)
12/7/2018 • 3 minutes to read • Edit Online
C# user-defined functions (UDFs) created in Visual Studio allow you to extend the Azure Stream Analytics query
language with your own functions. You can reuse existing code (including DLLs) and use mathematical or complex
logic with C#. There are three ways to implement UDFs: CodeBehind files in a Stream Analytics project, UDFs
from a local C# project, or UDFs from an existing package from a storage account. This tutorial uses the
CodeBehind method to implement a basic C# function. The UDF feature for Stream Analytics Edge jobs is
currently in preview and shouldn't be used in production workloads.
In this tutorial, you learn how to:
Create a C# user defined function using CodeBehind.
Test your Stream Analytics Edge job locally.
Publish your Edge job to Azure.
Prerequisites
Before you start, make sure you've completed the following prerequisites:
If you don't have an Azure subscription, create a free account.
Install Stream Analytics tools for Visual Studio and the Azure development or Data Storage and
Processing workloads.
Take a look at the existing Stream Analytics Edge development guide.
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Text;
namespace ASAEdgeUDFDemo
{
public class Class1
{
// Public static function
public static Int64 SquareFunction(Int64 a)
{
return a * a;
}
}
}
Local testing
1. Download the Edge temperature simulator sample data file.
2. In Solution Explorer, expand Inputs, right-click Input.json, and select Add Local Input.
3. Specify the local input file path for the sample data you downloaded and Save.
4. Click Run Locally in the script editor. Once the local run has successfully saved the output results, press
any key to see the results in table format.
5. You can also select Open Results Folder to see the raw files in JSON and CSV format.
Debug a UDF
You can debug your C# UDF locally the same way you debug standard C# code.
1. Add breakpoints in your C# function.
2. Press F5 to start debugging. The program will stop at your breakpoints as expected.
Publish your job to Azure
Once you've tested your query locally, select Submit to Azure in the script editor to publish the job to Azure.
Next steps
In this tutorial, you created a simple C# user-defined function using CodeBehind, published your job to Azure, and
deployed the job to IoT Edge devices using the IoT Hub portal.
To learn more about the different ways to use C# user-defined functions for Stream Analytics Edge jobs, continue
to this article:
Write C# functions for Azure Stream Analytics
Choose a real-time analytics and streaming
processing technology on Azure
5/23/2019 • 2 minutes to read • Edit Online
There are several services available for real-time analytics and streaming processing on Azure. This article provides
the information you need to decide which technology is the best fit for your application.
Next steps
Create a Stream Analytics job by using the Azure portal
Create a Stream Analytics job by using Azure PowerShell
Create a Stream Analytics job by using Visual Studio
Create a Stream Analytics job by using Visual Studio Code
Understand inputs for Azure Stream Analytics
3/13/2019 • 2 minutes to read • Edit Online
Azure Stream Analytics jobs connect to one or more data inputs. Each input defines a connection to an existing
data source. Stream Analytics accepts data incoming from several kinds of event sources including Event Hubs, IoT
Hub, and Blob storage. The inputs are referenced by name in the streaming SQL query that you write for each job.
In the query, you can join multiple inputs to blend data or compare streaming data with a lookup to reference data,
and pass the results to outputs.
Stream Analytics has first-class integration from three kinds of resources as inputs:
Azure Event Hubs
Azure IoT Hub
Azure Blob storage
These input resources can live in same Azure subscription as your Stream Analytics job, or from a different
subscription.
You can use the Azure portal, Azure PowerShell, .NET API, REST API, and Visual Studio to create, edit, and test
Stream Analytics job inputs.
Next steps
Quickstart: Create a Stream Analytics job by using the Azure portal
Stream data as input into Stream Analytics
5/31/2019 • 10 minutes to read • Edit Online
Stream Analytics has first-class integration with Azure data streams as inputs from three kinds of resources:
Azure Event Hubs
Azure IoT Hub
Azure Blob storage
These input resources can live in the same Azure subscription as your Stream Analytics job or a different
subscription.
Compression
Stream Analytics supports compression across all data stream input sources. Supported compression types are:
None, GZip, and Deflate compression. Support for compression is not available for reference data. If the input
format is Avro data that is compressed, it's handled transparently. You don't need to specify compression type with
Avro serialization.
PROPERTY DESCRIPTION
Input alias A friendly name that you use in the job's query to reference
this input.
Event Hub namespace The Event Hub namespace is a container for a set of
messaging entities. When you create a new event hub, you
also create the namespace.
Event Hub name The name of the event hub to use as input.
Event Hub policy name The shared access policy that provides access to the Event
Hub. Each shared access policy has a name, permissions that
you set, and access keys. This option is automatically
populated, unless you select the option to provide the Event
Hub settings manually.
Event Hub consumer group (recommended) It is highly recommended to use a distinct consumer group for
each Stream Analytics job. This string identifies the consumer
group to use to ingest data from the event hub. If no
consumer group is specified, the Stream Analytics job uses the
$Default consumer group.
Event serialization format The serialization format (JSON, CSV, or Avro) of the incoming
data stream. Ensure the JSON format aligns with the
specification and doesn’t include leading 0 for decimal
numbers.
Event compression type The compression type used to read the incoming data stream,
such as None (default), GZip, or Deflate.
When your data comes from an Event Hub stream input, you have access to the following metadata fields in your
Stream Analytics query:
PROPERTY DESCRIPTION
EventProcessedUtcTime The date and time that the event was processed by Stream
Analytics.
EventEnqueuedUtcTime The date and time that the event was received by Event Hubs.
For example, using these fields, you can write a query like the following example:
SELECT
EventProcessedUtcTime,
EventEnqueuedUtcTime,
PartitionId
FROM Input
NOTE
When using Event Hub as an endpoint for IoT Hub Routes, you can access to the IoT Hub metadata using the
GetMetadataPropertyValue function.
PROPERTY DESCRIPTION
Input alias A friendly name that you use in the job's query to reference
this input.
Subscription Choose the subscription in which the IoT Hub resource exists.
Shared access policy name The shared access policy that provides access to the IoT Hub.
Each shared access policy has a name, permissions that you
set, and access keys.
Shared access policy key The shared access key used to authorize access to the IoT
Hub. This option is automatically populated in unless you
select the option to provide the Iot Hub settings manually.
Event serialization format The serialization format (JSON, CSV, or Avro) of the incoming
data stream. Ensure the JSON format aligns with the
specification and doesn’t include leading 0 for decimal
numbers.
Event compression type The compression type used to read the incoming data stream,
such as None (default), GZip, or Deflate.
When you use stream data from an IoT Hub, you have access to the following metadata fields in your Stream
Analytics query:
PROPERTY DESCRIPTION
EventProcessedUtcTime The date and time that the event was processed.
EventEnqueuedUtcTime The date and time that the event was received by the IoT Hub.
IoTHub.EnqueuedTime The time when the message was received by the IoT Hub.
NOTE
Stream Analytics does not support adding content to an existing blob file. Stream Analytics will view each file only once, and
any changes that occur in the file after the job has read the data are not processed. Best practice is to upload all the data for
a blob file at once and then add additional newer events to a different, new blob file.
Uploading a very large number of blobs at once might cause Stream Analytics to skip reading few blobs in rare
cases. It is recommended to upload blobs at least 2 seconds apart to Blob storage. If this option is not feasible, you
can use Event Hubs to stream large volumes of events.
Configure Blob storage as a stream input
The following table explains each property in the New input page in the Azure portal when you configure Blob
storage as a stream input.
PROPERTY DESCRIPTION
Input alias A friendly name that you use in the job's query to reference
this input.
Subscription Choose the subscription in which the IoT Hub resource exists.
Storage account The name of the storage account where the blob files are
located.
Storage account key The secret key associated with the storage account. This
option is automatically populated in unless you select the
option to provide the Blob storage settings manually.
Container The container for the blob input. Containers provide a logical
grouping for blobs stored in the Microsoft Azure Blob service.
When you upload a blob to the Azure Blob storage service,
you must specify a container for that blob. You can choose
either Use existing container or Create new to have a new
container created.
Path pattern (optional) The file path used to locate the blobs within the specified
container. Within the path, you can specify one or more
instances of the following three variables: {date} , {time} ,
or {partition}
Example 1: cluster1/logs/{date}/{time}/{partition}
Example 2: cluster1/logs/{date}
Date format (optional) If you use the date variable in the path, the date format in
which the files are organized. Example: YYYY/MM/DD
Time format (optional) If you use the time variable in the path, the time format in
which the files are organized. Currently the only supported
value is HH for hours.
PROPERTY DESCRIPTION
Event serialization format The serialization format (JSON, CSV, or Avro) of the incoming
data stream. Ensure the JSON format aligns with the
specification and doesn’t include leading 0 for decimal
numbers.
Encoding For CSV and JSON, UTF-8 is currently the only supported
encoding format.
Compression The compression type used to read the incoming data stream,
such as None (default), GZip, or Deflate.
When your data comes from a Blob storage source, you have access to the following metadata fields in your
Stream Analytics query:
PROPERTY DESCRIPTION
BlobName The name of the input blob that the event came from.
EventProcessedUtcTime The date and time that the event was processed by Stream
Analytics.
BlobLastModifiedUtcTime The date and time that the blob was last modified.
For example, using these fields, you can write a query like the following example:
SELECT
BlobName,
EventProcessedUtcTime,
BlobLastModifiedUtcTime
FROM Input
Next steps
Quickstart: Create a Stream Analytics job by using the Azure portal
Using reference data for lookups in Stream Analytics
5/30/2019 • 9 minutes to read • Edit Online
Reference data (also known as a lookup table) is a finite data set that is static or slowly changing in nature, used to
perform a lookup or to correlate with your data stream. For example, in an IoT scenario, you could store metadata
about sensors (which don’t change often) in reference data and join it with real time IoT data streams. Azure
Stream Analytics loads reference data in memory to achieve low latency stream processing. To make use of
reference data in your Azure Stream Analytics job, you will generally use a Reference Data Join in your query.
Stream Analytics supports Azure Blob storage and Azure SQL Database as the storage layer for Reference Data.
You can also transform and/or copy reference data to Blob storage from Azure Data Factory to use any number of
cloud-based and on-premises data stores.
Input Alias A friendly name that will be used in the job query to reference
this input.
Storage Account The name of the storage account where your blobs are
located. If it’s in the same subscription as your Stream
Analytics Job, you can select it from the drop-down.
Storage Account Key The secret key associated with the storage account. This gets
automatically populated if the storage account is in the same
subscription as your Stream Analytics job.
Storage Container Containers provide a logical grouping for blobs stored in the
Microsoft Azure Blob service. When you upload a blob to the
Blob service, you must specify a container for that blob.
Path Pattern The path used to locate your blobs within the specified
container. Within the path, you may choose to specify one or
more instances of the following 2 variables:
{date}, {time}
Example 1: products/{date}/{time}/product-list.csv
Example 2: products/{date}/product-list.csv
Example 3: product-list.csv
Date Format [optional] If you have used {date} within the Path Pattern that you
specified, then you can select the date format in which your
blobs are organized from the drop-down of supported
formats.
Example: YYYY/MM/DD, MM/DD/YYYY, etc.
Time Format [optional] If you have used {time} within the Path Pattern that you
specified, then you can select the time format in which your
blobs are organized from the drop-down of supported
formats.
Example: HH, HH/mm, or HH-mm.
Event Serialization Format To make sure your queries work the way you expect, Stream
Analytics needs to know which serialization format you're
using for incoming data streams. For Reference Data, the
supported formats are CSV and JSON.
Azure Data Factory can be used to orchestrate the task of creating the updated blobs required by Stream Analytics
to update reference data definitions. Data Factory is a cloud-based data integration service that orchestrates and
automates the movement and transformation of data. Data Factory supports connecting to a large number of
cloud based and on-premises data stores and moving data easily on a regular schedule that you specify. For more
information and step by step guidance on how to set up a Data Factory pipeline to generate reference data for
Stream Analytics which refreshes on a pre-defined schedule, check out this GitHub sample.
Tips on refreshing blob reference data
1. Do not overwrite reference data blobs as they are immutable.
2. The recommended way to refresh reference data is to:
Use {date}/{time} in the path pattern
Add a new blob using the same container and path pattern defined in the job input
Use a date/time greater than the one specified by the last blob in the sequence.
3. Reference data blobs are not ordered by the blob’s "Last Modified" time but only by the time and date
specified in the blob name using the {date} and {time} substitutions.
4. To avoid having to list large number of blobs, consider deleting very old blobs for which processing will no
longer be done. Please note that ASA might go have to reprocess a small amount in some scenarios like a
restart.
Input alias A friendly name that will be used in the job query to reference
this input.
Database The Azure SQL Database that contains your reference data.
Refresh periodically This option allows you to choose a refresh rate. Choosing
"On" will allow you to specify the refresh rate in DD:HH:MM.
Snapshot query This is the default query option that retrieves the reference
data from your SQL Database.
Delta query For advanced scenarios with large data sets and a short
refresh rate, choose to add a delta query.
Size limitation
Stream Analytics supports reference data with maximum size of 300 MB. The 300 MB limit of maximum size of
reference data is achievable only with simple queries. As the complexity of query increases to include stateful
processing, such as windowed aggregates, temporal joins and temporal analytic functions, it is expected that the
maximum supported size of reference data decreases. If Azure Stream Analytics cannot load the reference data
and perform complex operations, the job will run out of memory and fail. In such cases, SU % Utilization metric
will reach 100%.
1 50
3 150
Increasing number of Streaming Units of a job beyond 6 does not increase the maximum supported size of
reference data.
Support for compression is not available for reference data.
Next steps
Quickstart: Create a Stream Analytics job by using the Azure portal
Understand outputs from Azure Stream Analytics
5/31/2019 • 26 minutes to read • Edit Online
This article describes the types of outputs available for an Azure Stream Analytics job. Outputs let you store and
save the results of the Stream Analytics job. By using the output data, you can do further business analytics and
data warehousing of your data.
When you design your Stream Analytics query, refer to the name of the output by using the INTO clause. You can
use a single output per job, or multiple outputs per streaming job (if you need them) by providing multiple INTO
clauses in the query.
To create, edit, and test Stream Analytics job outputs, you can use the Azure portal, Azure PowerShell, .NET API,
REST API, and Visual Studio.
Some outputs types support partitioning. Output batch sizes vary to optimize throughput.
Output alias A friendly name used in queries to direct the query output to
Data Lake Store.
Subscription The subscription that contains your Azure Data Lake Storage
account.
Account name The name of the Data Lake Store account where you're
sending your output. You're presented with a drop-down list
of Data Lake Store accounts that are available in your
subscription.
PROPERTY NAME DESCRIPTION
Path prefix pattern The file path that's used to write your files within the specified
Data Lake Store account. You can specify one or more
instances of the {date} and {time} variables:
Example 1: folder1/logs/{date}/{time}
Example 2: folder1/logs/{date}
If the file path pattern doesn't contain a trailing slash (/), the
last pattern in the file path is treated as a file name prefix.
Date format Optional. If the date token is used in the prefix path, you can
select the date format in which your files are organized.
Example: YYYY/MM/DD
Time format Optional. If the time token is used in the prefix path, specify
the time format in which your files are organized. Currently
the only supported value is HH.
Event serialization format The serialization format for output data. JSON, CSV, and Avro
are supported.
Authentication mode You can authorize access to your Data Lake Storage account
using Managed Identity or User token. Once you grant
access, you can revoke access by changing the user account
password, deleting the Data Lake Storage output for this job,
or deleting the Stream Analytics job.
SQL Database
You can use Azure SQL Database as an output for data that's relational in nature or for applications that depend
on content being hosted in a relational database. Stream Analytics jobs write to an existing table in SQL Database.
The table schema must exactly match the fields and their types in your job's output. You can also specify Azure
SQL Data Warehouse as an output via the SQL Database output option. To learn about ways to improve write
throughput, see the Stream Analytics with Azure SQL Database as output article.
The following table lists the property names and their description for creating a SQL Database output.
Output alias A friendly name used in queries to direct the query output to
this database.
Database The name of the database where you're sending your output.
Username The username that has write access to the database. Stream
Analytics supports only SQL authentication.
Table The table name where the output is written. The table name is
case-sensitive. The schema of this table should exactly match
the number of fields and their types that your job output
generates.
Inherit partition scheme An option for inheriting the partitioning scheme of your
previous query step, to enable fully parallel topology with
multiple writers to the table. For more information, see Azure
Stream Analytics output to Azure SQL Database.
Match batch count The recommended limit on the number of records sent with
every bulk insert transaction.
NOTE
The Azure SQL Database offering is supported for a job output in Stream Analytics, but an Azure virtual machine running
SQL Server with a database attached is not supported.
Blob storage
Azure Blob storage offers a cost-effective and scalable solution for storing large amounts of unstructured data in
the cloud. For an introduction on Blob storage and its usage, see Upload, download, and list blobs with the Azure
portal.
The following table lists the property names and their descriptions for creating a blob output.
Output alias A friendly name used in queries to direct the query output to
this blob storage.
Storage account The name of the storage account where you're sending your
output.
Storage account key The secret key associated with the storage account.
PROPERTY NAME DESCRIPTION
Storage container A logical grouping for blobs stored in the Azure Blob service.
When you upload a blob to the Blob service, you must specify
a container for that blob.
Path pattern Optional. The file path pattern that's used to write your blobs
within the specified container.
Examples:
Example 1: cluster1/logs/{date}/{time}
Example 2: cluster1/logs/{date}
Example 3: cluster1/{client_id}/{date}/{time}
Example 4: cluster1/{datetime:ss}/{myField} where the
query is: SELECT data.myField AS myField FROM Input;
Example 5: cluster1/year={datetime:yyyy}/month=
{datetime:MM}/day={datetime:dd}
{Path Prefix
Pattern}/schemaHashcode_Guid_Number.extension
Date format Optional. If the date token is used in the prefix path, you can
select the date format in which your files are organized.
Example: YYYY/MM/DD
Time format Optional. If the time token is used in the prefix path, specify
the time format in which your files are organized. Currently
the only supported value is HH.
Event serialization format Serialization format for output data. JSON, CSV, and Avro are
supported.
When you're using Blob storage as output, a new file is created in the blob in the following cases:
If the file exceeds the maximum number of allowed blocks (currently 50,000). You might reach the maximum
allowed number of blocks without reaching the maximum allowed blob size. For example, if the output rate is
high, you can see more bytes per block, and the file size is larger. If the output rate is low, each block has less
data, and the file size is smaller.
If there's a schema change in the output, and the output format requires fixed schema (CSV and Avro).
If a job is restarted, either externally by a user stopping it and starting it, or internally for system maintenance
or error recovery.
If the query is fully partitioned, and a new file is created for each output partition.
If the user deletes a file or a container of the storage account.
If the output is time partitioned by using the path prefix pattern, and a new blob is used when the query moves
to the next hour.
If the output is partitioned by a custom field, and a new blob is created per partition key if it does not exist.
If the output is partitioned by a custom field where the partition key cardinality exceeds 8,000, and a new blob
is created per partition key.
Event Hubs
The Azure Event Hubs service is a highly scalable publish-subscribe event ingestor. It can collect millions of events
per second. One use of an event hub as output is when the output of a Stream Analytics job becomes the input of
another streaming job.
You need a few parameters to configure data streams from event hubs as an output.
PROPERTY NAME DESCRIPTION
Output alias A friendly name used in queries to direct the query output to
this event hub.
Event hub namespace A container for a set of messaging entities. When you created
a new event hub, you also created an event hub namespace.
Event hub policy name The shared access policy, which you can create on the event
hub's Configure tab. Each shared access policy has a name,
permissions that you set, and access keys.
Event hub policy key The shared access key that's used to authenticate access to
the event hub namespace.
Partition key column Optional. A column that contains the partition key for event
hub output.
Event serialization format The serialization format for output data. JSON, CSV, and Avro
are supported.
Encoding For CSV and JSON, UTF-8 is the only supported encoding
format at this time.
Power BI
You can use Power BI as an output for a Stream Analytics job to provide for a rich visualization experience of
analysis results. You can use this capability for operational dashboards, report generation, and metric-driven
reporting.
Power BI output from Stream Analytics is currently not available in the Azure China (21Vianet) and Azure
Germany (T-Systems International) regions.
The following table lists property names and their descriptions to configure your Power BI output.
PROPERTY NAME DESCRIPTION
Output alias Provide a friendly name that's used in queries to direct the
query output to this Power BI output.
Group workspace To enable sharing data with other Power BI users, you can
select groups inside your Power BI account or choose My
Workspace if you don't want to write to a group. Updating
an existing group requires renewing the Power BI
authentication.
Dataset name Provide a dataset name that you want the Power BI output to
use.
Table name Provide a table name under the dataset of the Power BI
output. Currently, Power BI output from Stream Analytics jobs
can have only one table in a dataset.
Authorize connection You need to authorize with Power BI to configure your output
settings. Once you grant this output access to your Power BI
dashboard, you can revoke access by changing the user
account password, deleting the job output, or deleting the
Stream Analytics job.
For a walkthrough of configuring a Power BI output and dashboard, see the Azure Stream Analytics and Power BI
tutorial.
NOTE
Don't explicitly create the dataset and table in the Power BI dashboard. The dataset and table are automatically populated
when the job is started and the job starts pumping output into Power BI. If the job query doesn’t generate any results, the
dataset and table aren't created. If Power BI already had a dataset and table with the same name as the one provided in this
Stream Analytics job, the existing data is overwritten.
Create a schema
Azure Stream Analytics creates a Power BI dataset and table schema for the user if they don't already exist. In all
other cases, the table is updated with new values. Currently, only one table can exist within a dataset.
Power BI uses the first-in, first-out (FIFO ) retention policy. Data will collect in a table until it hits 200,000 rows.
Convert a data type from Stream Analytics to Power BI
Azure Stream Analytics updates the data model dynamically at runtime if the output schema changes. Column
name changes, column type changes, and the addition or removal of columns are all tracked.
This table covers the data type conversions from Stream Analytics data types to Power BI Entity Data Model
(EDM ) types, if a Power BI dataset and table don't exist.
bigint Int64
nvarchar(max) String
datetime Datetime
FROM STREAM ANALYTICS TO POWER BI
float Double
Table storage
Azure Table storage offers highly available, massively scalable storage, so that an application can automatically
scale to meet user demand. Table storage is Microsoft’s NoSQL key/attribute store, which you can use for
structured data with fewer constraints on the schema. Azure Table storage can be used to store data for
persistence and efficient retrieval.
The following table lists the property names and their descriptions for creating a table output.
Output alias A friendly name used in queries to direct the query output to
this table storage.
Storage account The name of the storage account where you're sending your
output.
Storage account key The access key associated with the storage account.
Table name The name of the table. The table gets created if it doesn't
exist.
Partition key The name of the output column that contains the partition
key. The partition key is a unique identifier for the partition
within a table that forms the first part of an entity's primary
key. It's a string value that can be up to 1 KB in size.
PROPERTY NAME DESCRIPTION
Row key The name of the output column that contains the row key.
The row key is a unique identifier for an entity within a
partition. It forms the second part of an entity’s primary key.
The row key is a string value that can be up to 1 KB in size.
Batch size The number of records for a batch operation. The default
(100) is sufficient for most jobs. See the Table Batch Operation
spec for more details on modifying this setting.
Output alias A friendly name used in queries to direct the query output to
this Service Bus queue.
Queue policy name When you create a queue, you can also create shared access
policies on the queue's Configure tab. Each shared access
policy has a name, permissions that you set, and access keys.
Queue policy key The shared access key that's used to authenticate access to
the Service Bus namespace.
Event serialization format The serialization format for output data. JSON, CSV, and Avro
are supported.
Encoding For CSV and JSON, UTF-8 is the only supported encoding
format at this time.
Format Applicable only for JSON type. Line separated specifies that
the output is formatted by having each JSON object
separated by a new line. Array specifies that the output is
formatted as an array of JSON objects.
Output alias A friendly name used in queries to direct the query output to
this Service Bus topic.
Service Bus namespace A container for a set of messaging entities. When you created
a new event hub, you also created a Service Bus namespace.
Topic name Topics are messaging entities, similar to event hubs and
queues. They're designed to collect event streams from
devices and services. When a topic is created, it's also given a
specific name. The messages sent to a topic aren't available
unless a subscription is created, so ensure there's one or more
subscriptions under the topic.
Topic policy name When you create a topic, you can also create shared access
policies on the topic's Configure tab. Each shared access
policy has a name, permissions that you set, and access keys.
Topic policy key The shared access key that's used to authenticate access to
the Service Bus namespace.
Event serialization format The serialization format for output data. JSON, CSV, and Avro
are supported.
The number of partitions is based on the Service Bus SKU and size. The partition key is a unique integer value for
each partition.
Azure Cosmos DB
Azure Cosmos DB is a globally distributed database service that offers limitless elastic scale around the globe, rich
query, and automatic indexing over schema-agnostic data models. To learn about Azure Cosmos DB collection
options for Stream Analytics, see the Stream Analytics with Azure Cosmos DB as output article.
Azure Cosmos DB output from Stream Analytics is currently not available in the Azure China (21Vianet) and
Azure Germany (T-Systems International) regions.
NOTE
At this time, Azure Stream Analytics only supports connection to Azure Cosmos DB by using the SQL API. Other Azure
Cosmos DB APIs are not yet supported. If you point Azure Stream Analytics to the Azure Cosmos DB accounts created with
other APIs, the data might not be properly stored.
The following table describes the properties for creating an Azure Cosmos DB output.
Output alias An alias to refer this output in your Stream Analytics query.
Account key The shared access key for the Azure Cosmos DB account.
Collection name pattern The collection name or the pattern for the collections to be
used.
You can construct the collection name format by using the
optional {partition} token, where partitions start from 0. Two
examples:
MyCollection: One collection named "MyCollection"
must exist.
MyCollection{partition}: Based on the partitioning
column.
The partitioning column collections must exist:
"MyCollection0," "MyCollection1," "MyCollection2," and so on.
Partition key Optional. You need this only if you're using a {partition} token
in your collection name pattern.
The partition key is the name of the field in output events
that's used to specify the key for partitioning output across
collections.
For single collection output, you can use any arbitrary output
column. An example is PartitionId.
Document ID Optional. The name of the field in output events that's used
to specify the primary key on which insert or update
operations are based.
Azure Functions
Azure Functions is a serverless compute service that you can use to run code on-demand without having to
explicitly provision or manage infrastructure. It lets you implement code that's triggered by events occurring in
Azure or partner services. This ability of Azure Functions to respond to triggers makes it a natural output for
Azure Stream Analytics. This output adapter enables users to connect Stream Analytics to Azure Functions, and
run a script or piece of code in response to a variety of events.
Azure Functions output from Stream Analytics is currently not available in the Azure China (21Vianet) and Azure
Germany (T-Systems International) regions.
Azure Stream Analytics invokes Azure Functions via HTTP triggers. The Azure Functions output adapter is
available with the following configurable properties:
Max batch size A property that lets you set the maximum size for each
output batch that's sent to your Azure function. The input
unit is in bytes. By default, this value is 262,144 bytes (256
KB).
Max batch count A property that lets you specify the maximum number of
events in each batch that's sent to Azure Functions. The
default value is 100.
When Azure Stream Analytics receives a 413 ("http Request Entity Too Large") exception from an Azure function,
it reduces the size of the batches that it sends to Azure Functions. In your Azure function code, use this exception
to make sure that Azure Stream Analytics doesn’t send oversized batches. Also, make sure that the maximum
batch count and size values used in the function are consistent with the values entered in the Stream Analytics
portal.
Also, in a situation where there's no event landing in a time window, no output is generated. As a result, the
computeResult function isn't called. This behavior is consistent with the built-in windowed aggregate functions.
Partitioning
The following table summarizes the partition support and the number of output writers for each output type:
Azure Data Lake Store Yes Use {date} and {time} tokens Follows the input
in the path prefix pattern. partitioning for fully
Choose the date format, parallelizable queries.
such as YYYY/MM/DD,
DD/MM/YYYY, or MM-DD-
YYYY. HH is used for the
time format.
Azure SQL Database Yes Based on the PARTITION BY Follows the input
clause in the query. partitioning for fully
parallelizable queries. To
learn more about achieving
better write throughput
performance when you're
loading data into Azure SQL
Database, see Azure Stream
Analytics output to Azure
SQL Database.
Azure Blob storage Yes Use {date} and {time} tokens Follows the input
from your event fields in the partitioning for fully
path pattern. Choose the parallelizable queries.
date format, such as
YYYY/MM/DD,
DD/MM/YYYY, or MM-DD-
YYYY. HH is used for the
time format. Blob output
can be partitioned by a
single custom event
attribute {fieldname} or
{datetime:<specifier>}.
OUTPUT TYPE PARTITIONING SUPPORT PARTITION KEY NUMBER OF OUTPUT WRITERS
Azure Table storage Yes Any output column. Follows the input
partitioning for fully
parallelized queries.
Azure Service Bus topic Yes Automatically chosen. The Same as the number of
number of partitions is partitions in the output
based on the Service Bus topic.
SKU and size. The partition
key is a unique integer value
for each partition.
Azure Service Bus queue Yes Automatically chosen. The Same as the number of
number of partitions is partitions in the output
based on the Service Bus queue.
SKU and size. The partition
key is a unique integer value
for each partition.
Azure Cosmos DB Yes Use the {partition} token in Follows the input
the collection name pattern. partitioning for fully
The {partition} value is based parallelized queries.
on the PARTITION BY clause
in the query.
If your output adapter is not partitioned, lack of data in one input partition will cause a delay up to the late arrival
amount of time. In such cases, the output is merged to a single writer, which might cause bottlenecks in your
pipeline. To learn more about late arrival policy, see Azure Stream Analytics event order considerations.
Output batch size
Azure Stream Analytics uses variable-size batches to process events and write to outputs. Typically the Stream
Analytics engine doesn't write one message at a time, and uses batches for efficiency. When the rate of both the
incoming and outgoing events is high, Stream Analytics uses larger batches. When the egress rate is low, it uses
smaller batches to keep latency low.
The following table explains some of the considerations for output batching:
Azure Data Lake Store See Data Lake Storage limits. Use up to 4 MB per write operation.
Azure SQL Database 10,000 maximum rows per single bulk Every batch is initially bulk inserted with
insert. maximum batch size. You can split the
100 minimum rows per single bulk batch in half (until you reach the
insert. minimum batch size) based on
See Azure SQL limits. retryable errors from SQL.
Azure Blob storage See Azure Storage limits. The maximum blob block size is 4 MB.
The maximum blob bock count is
50,000.
Azure Event Hubs 256 KB per message. When input/output partitioning isn't
See Event Hubs limits. aligned, each event is packed
individually in EventData and sent in a
batch of up to the maximum message
size (1 MB for the Premium SKU).
Azure Table storage See Azure Storage limits. The default is 100 entities per single
transaction. You can configure it to a
smaller value as needed.
Azure Service Bus queue 256 KB per message. Use a single event per message.
See Service Bus limits.
Azure Service Bus topic 256 KB per message. Use a single event per message.
See Service Bus limits.
Azure Cosmos DB See Azure Cosmos DB limits. Batch size and write frequency are
adjusted dynamically based Azure
Cosmos DB responses.
There are no predetermined limitations
from Stream Analytics.
OUTPUT TYPE MAX MESSAGE SIZE BATCH SIZE OPTIMIZATION
Next steps
Quickstart: Create a Stream Analytics job by using the Azure portal
Azure Stream Analytics output to Azure Cosmos DB
4/9/2019 • 6 minutes to read • Edit Online
Stream Analytics can target Azure Cosmos DB for JSON output, enabling data archiving and low -latency queries
on unstructured JSON data. This document covers some best practices for implementing this configuration.
For those who are unfamiliar with Cosmos DB, take a look at Azure Cosmos DB’s learning path to get started.
NOTE
At this time, Azure Stream Analytics only supports connection to Azure Cosmos DB using SQL API. Other Azure Cosmos DB
APIs are not yet supported. If you point Azure Stream Analytics to the Azure Cosmos DB accounts created with other APIs,
the data might not be properly stored.
NOTE
You must add 0.0.0.0 to the list of allowed IPs from your Azure Cosmos DB firewall.
NOTE
At this time, Azure Stream Analytics only supports unlimited collections with partition keys at the top level. For example,
/region is supported. Nested partition keys (e.g. /region/name ) are not supported.
For fixed Azure Cosmos DB collections, Stream Analytics allows no way to scale up or out once they're full. They
have an upper limit of 10 GB and 10,000 RU/s throughput. To migrate the data from a fixed container to an
unlimited container (for example, one with at least 1,000 RU/s and a partition key), you need to use the data
migration tool or the change feed library.
Writing to multiple fixed containers is being deprecated and is not the recommended approach for scaling out
your Stream Analytics job. The article Partitioning and scaling in Cosmos DB provides further details.
With 1.2, Stream Analytics is more intelligent in utilizing 100% of the available throughput in Cosmos DB with
very few resubmissions from throttling/rate limiting. This provides a better experience for other workloads like
queries running on the collection at the same time. In case you need to try out how ASA scales out with Cosmos
DB as a sink for 1k to 10k messages/second, here is an azure samples project that lets you do that. Please note
that Cosmos DB output throughput is identical with 1.0 and 1.1. Since 1.2 is currently not the default, you can set
compatibility level for a Stream Analytics job by using portal or by using the create job REST API call. It’s strongly
recommended to use Compatibility Level 1.2 in ASA with Cosmos DB.
FIELD DESCRIPTION
Account key The shared access key for the Azure Cosmos DB account.
Collection name pattern The collection name for the collection to be used.
MyCollection is a sample valid input - one collection named
MyCollection must exist.
This article discusses tips to achieve better write throughput performance when you're loading data into SQL
Azure Database using Azure Stream Analytics.
SQL output in Azure Stream Analytics supports writing in parallel as an option. This option allows for fully
parallel job topologies, where multiple output partitions are writing to the destination table in parallel. Enabling
this option in Azure Stream Analytics however may not be sufficient to achieve higher throughputs, as it depends
significantly on your SQL Azure database configuration and table schema. The choice of indexes, clustering key,
index fill factor, and compression have an impact on the time to load tables. For more information about how to
optimize your SQL Azure database to improve query and load performance based on internal benchmarks, see
SQL database performance guidance. Ordering of writes is not guaranteed when writing in parallel to SQL Azure
Database.
Here are some configurations within each service that can help improve overall throughput of your solution.
NOTE
When there are more than 8 input partitions, inheriting the input partitioning scheme might not be an appropriate choice.
This upper limit was observed on a table with a single identity column and a clustered index. In this case, consider using
INTO 8 in your query, to explicitly specify the number of output writers. Based on your schema and choice of indexes, your
observations may vary.
Batch Size - SQL output configuration allows you to specify the maximum batch size in an Azure Stream
Analytics SQL output based on the nature of your destination table/workload. Batch size is the maximum
number of records that sent with every bulk insert transaction. In clustered columnstore indexes, batch
sizes around 100K allow for more parallelization, minimal logging, and locking optimizations. In disk-based
tables, 10K (default) or lower may be optimal for your solution, as higher batch sizes may trigger lock
escalation during bulk inserts.
Input Message Tuning – If you've optimized using inherit partitioning and batch size, increasing the
number of input events per message per partition helps further pushing up your write throughput. Input
message tuning allows batch sizes within Azure Stream Analytics to be up to the specified Batch Size,
thereby improving throughput. This can be achieved by using compression or increasing input message
sizes in EventHub or Blob.
SQL Azure
Partitioned Table and Indexes – Using a partitioned SQL table and partitioned indexes on the table with
the same column as your partition key (for example, PartitionId) can significantly reduce contentions
among partitions during writes. For a partitioned table, you'll need to create a partition function and a
partition scheme on the PRIMARY filegroup. This will also increase availability of existing data while new
data is being loaded. Log IO limit may be hit based on number of partitions, which can be increased by
upgrading the SKU.
Avoid unique key violations – If you get multiple key violation warning messages in the Azure Stream
Analytics Activity Log, ensure your job isn't impacted by unique constraint violations which are likely to
happen during recovery cases. This can be avoided by setting the IGNORE_DUP_KEY option on your
indexes.
Summary
In summary, with the partitioned output feature in Azure Stream Analytics for SQL output, aligned parallelization
of your job with a partitioned table in SQL Azure should give you significant throughput improvements.
Leveraging Azure Data Factory for orchestrating data movement from an In-Memory table into Disk-based tables
can give order of magnitude throughput gains. If feasible, improving message density can also be a major factor
in improving overall throughput.
Azure Stream Analytics custom blob output
partitioning
5/15/2019 • 4 minutes to read • Edit Online
Azure Stream Analytics supports custom blob output partitioning with custom fields or attributes and custom
DateTime path patterns.
Similarly, if the job input was sensor data from millions of sensors where each sensor had a sensor_id, the Path
Pattern would be {sensor_id} to partition each sensor data to different folders.
Using the REST API, the output section of a JSON file used for that request may look like the following:
Once the job starts running, the clients container may look like the following:
Each folder may contain multiple blobs where each blob contains one or more records. In the above example,
there is a single blob in a folder labelled "06000000" with the following contents:
Notice that each record in the blob has a client_id column matching the folder name since the column used to
partition the output in the output path was client_id.
Limitations
1. Only one custom partition key is permitted in the Path Pattern blob output property. All of the following
Path Patterns are valid:
cluster1/{date}/{aFieldInMyData}
cluster1/{time}/{aFieldInMyData}
cluster1/{aFieldInMyData}
cluster1/{date}/{time}/{aFieldInMyData}
2. Partition keys are case insensitive, so partition keys like "John" and "john" are equivalent. Also, expressions
cannot be used as partition keys. For example, {columnA + columnB } does not work.
3. When an input stream consists of records with a partition key cardinality under 8000, the records will be
appended to existing blobs and only create new blobs when necessary. If the cardinality is over 8000 there
is no guarantee existing blobs will be written to and new blobs won't be created for an arbitrary number of
records with the same partition key.
If you do not wish to use custom DateTime patterns, you can add the {date} and/or {time} token to the Path Prefix
to generate a dropdown with built-in DateTime formats.
logs/{datetime:MM}/{datetime:dd} logs/{datetime:MM/dd}
You may use the same format specifier multiple times in the Path Prefix. The token must be repeated each time.
Hive Streaming conventions
Custom path patterns for blob storage can be used with the Hive Streaming convention, which expects folders to
be labeled with column= in the folder name.
For example, year={datetime:yyyy}/month={datetime:MM}/day={datetime:dd}/hour={datetime:HH} .
Custom output eliminates the hassle of altering tables and manually adding partitions to port data between Azure
Stream Analytics and Hive. Instead, many folders can be added automatically using:
Example
Create a storage account, a resource group, a Stream Analytics job, and an input source according to the Azure
Stream Analytics Azure Portal quickstart guide. Use the same sample data used in the quickstart guide, also
available on GitHub.
Create a blob output sink with the following configuration:
When you start the job, a folder structure based on the path pattern is created in your blob container. You can drill
down to the day level.
Next steps
Understand outputs from Azure Stream Analytics
Azure Stream Analytics job states
2/8/2019 • 2 minutes to read • Edit Online
A Stream Analytics job could be in one of four states at any given time. You can find the state of your job on your
Stream Analytics job's Overview page in the Azure portal.
Running Your job is running on Azure reading It is a best practice to track your job’s
events coming from the defined input performance by monitoring key metrics.
sources, processing them and writing
the results to the configured output
sinks.
Degraded Transient errors are likely impacting You can look at the diagnostic or
your job. Stream Analytics will activity logs to learn more about the
immediately try to recover from such cause of these transient errors. In cases
errors and return to a Running state such as deserialization errors, it is
(within few minutes). These errors could recommended to take corrective action
happen due to network issues, to ensure events aren't malformed. If
availability of other Azure resources, the job keeps reaching the resource
deserialization errors etc. Your job’s utilization limit, try to increase the SU
performance may be impacted when number or parallelize your job. In other
job is in degraded state. cases where you cannot take any action,
Stream Analytics will try to recover to a
Running state.
Failed Your job encountered a critical error You can configure alerts so that you get
resulting in a failed state. Events aren't notified when job goes to Failed state.
read and processed. Runtime errors are
a common cause for jobs ending up in a You can debug using activity and
failed state. diagnostic logs to identify root cause
and address the issue.
Next steps
Setup alerts for Azure Stream Analytics jobs
Metrics available in Stream Analytics
Troubleshoot using activity and diagnostic logs
Introduction to Stream Analytics windowing functions
1/11/2019 • 2 minutes to read • Edit Online
In time-streaming scenarios, performing operations on the data contained in temporal windows is a common
pattern. Stream Analytics has native support for windowing functions, enabling developers to author complex
stream processing jobs with minimal effort.
There are four kinds of temporal windows to choose from: Tumbling, Hopping, Sliding, and Session windows.
You use the window functions in the GROUP BY clause of the query syntax in your Stream Analytics jobs.
All the windowing operations output results at the end of the window. The output of the window will be single
event based on the aggregate function used. The output event will have the time stamp of the end of the window
and all window functions are defined with a fixed length.
Tumbling window
Tumbling window functions are used to segment a data stream into distinct time segments and perform a function
against them, such as the example below. The key differentiators of a Tumbling window are that they repeat, do
not overlap, and an event cannot belong to more than one tumbling window.
Hopping window
Hopping window functions hop forward in time by a fixed period. It may be easy to think of them as Tumbling
windows that can overlap, so events can belong to more than one Hopping window result set. To make a Hopping
window the same as a Tumbling window, specify the hop size to be the same as the window size.
Sliding window
Sliding window functions, unlike Tumbling or Hopping windows, produce an output only when an event occurs.
Every window will have at least one event and the window continuously moves forward by an € (epsilon). Like
hopping windows, events can belong to more than one sliding window.
Session window
Session window functions group events that arrive at similar times, filtering out periods of time where there is no
data. It has three main parameters: timeout, maximum duration, and partitioning key (optional).
A session window begins when the first event occurs. If another event occurs within the specified timeout from the
last ingested event, then the window extends to include the new event. Otherwise if no events occur within the
timeout, then the window is closed at the timeout.
If events keep occurring within the specified timeout, the session window will keep extending until maximum
duration is reached. The maximum duration checking intervals are set to be the same size as the specified max
duration. For example, if the max duration is 10, then the checks on if the window exceed maximum duration will
happen at t = 0, 10, 20, 30, etc.
When a partition key is provided, the events are grouped together by the key and session window is applied to
each group independently. This partitioning is useful for cases where you need different session windows for
different users or devices.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Introduction to Stream Analytics geospatial functions
12/7/2018 • 3 minutes to read • Edit Online
Geospatial functions in Azure Stream Analytics enable real-time analytics on streaming geospatial data. With just a
few lines of code, you can develop a production grade solution for complex scenarios.
Examples of scenarios that can benefit from geospatial functions include:
Ride-sharing
Fleet management
Asset tracking
Geo-fencing
Phone tracking across cell sites
Stream Analytics Query Language has seven built-in geospatial functions: CreateLineString, CreatePoint,
CreatePolygon, ST_DISTANCE, ST_OVERLAPS, ST_INTERSECTS, and ST_WITHIN.
CreateLineString
The CreateLineString function accepts points and returns a GeoJSON LineString, which can be plotted as a line
on a map. You must have at least two points to create a LineString. The LineString points will be connected in
order.
The following query uses CreateLineString to create a LineString using three points. The first point is created
from streaming input data, while the other two are created manually.
SELECT
CreateLineString(CreatePoint(input.latitude, input.longitude), CreatePoint(10.0, 10.0), CreatePoint(10.5,
10.5))
FROM input
Input example
LATITUDE LONGITUDE
3.0 -10.2
-87.33 20.2321
Output example
{"type" : "LineString", "coordinates" : [ [-10.2, 3.0], [10.0, 10.0], [10.5, 10.5] ]}
{"type" : "LineString", "coordinates" : [ [20.2321, -87.33], [10.0, 10.0], [10.5, 10.5] ]}
To learn more, visit the CreateLineString reference.
CreatePoint
The CreatePoint function accepts a latitude and longitude and returns a GeoJSON point, which can be plotted on
a map. Your latitudes and longitudes must be a float datatype.
The following example query uses CreatePoint to create a point using latitudes and longitudes from streaming
input data.
SELECT
CreatePoint(input.latitude, input.longitude)
FROM input
Input example
LATITUDE LONGITUDE
3.0 -10.2
-87.33 20.2321
Output example
{"type" : "Point", "coordinates" : [-10.2, 3.0]}
{"type" : "Point", "coordinates" : [20.2321, -87.33]}
To learn more, visit the CreatePoint reference.
CreatePolygon
The CreatePolygon function accepts points and returns a GeoJSON polygon record. The order of points must
follow right-hand ring orientation, or counter-clockwise. Imagine walking from one point to another in the order
they were declared. The center of the polygon would be to your left the entire time.
The following example query uses CreatePolygon to create a polygon from three points. The first two points are
created manually, and the last point is created from input data.
SELECT
CreatePolygon(CreatePoint(input.latitude, input.longitude), CreatePoint(10.0, 10.0), CreatePoint(10.5,
10.5), CreatePoint(input.latitude, input.longitude))
FROM input
Input example
LATITUDE LONGITUDE
3.0 -10.2
-87.33 20.2321
Output example
{"type" : "Polygon", "coordinates" : [[ [-10.2, 3.0], [10.0, 10.0], [10.5, 10.5], [-10.2, 3.0] ]]}
{"type" : "Polygon", "coordinates" : [[ [20.2321, -87.33], [10.0, 10.0], [10.5, 10.5], [20.2321, -87.33] ]]}
To learn more, visit the CreatePolygon reference.
ST_DISTANCE
The ST_DISTANCE function returns the distance between two points in meters.
The following query uses ST_DISTANCE to generate an event when a gas station is less than 10 km from the car.
SELECT Cars.Location, Station.Location
FROM Cars c
JOIN Station s ON ST_DISTANCE(c.Location, s.Location) < 10 * 1000
ST_OVERLAPS
The ST_OVERLAPS function compares two polygons. If the polygons overlap, the function returns a 1. The function
returns 0 if the polygons don't overlap.
The following query uses ST_OVERLAPS to generate an event when a building is within a possible flooding zone.
The following example query generates an event when a storm is heading towards a car.
ST_INTERSECTS
The ST_INTERSECTS function compares two LineString. If the LineString intersect, then the function returns 1. The
function returns 0 if the LineString don't intersect.
The following example query uses ST_INTERSECTS to determine if a paved road intersects a dirt road.
SELECT
ST_INTERSECTS(input.pavedRoad, input.dirtRoad)
FROM input
Input example
DATACENTERAREA STORMAREA
{“type”:”LineString”, “coordinates”: [ [-10.0, 0.0], [0.0, 0.0], {“type”:”LineString”, “coordinates”: [ [0.0, 10.0], [0.0, 0.0], [0.0, -
[10.0, 0.0] ]} 10.0] ]}
{“type”:”LineString”, “coordinates”: [ [-10.0, 0.0], [0.0, 0.0], {“type”:”LineString”, “coordinates”: [ [-10.0, 10.0], [0.0, 10.0],
[10.0, 0.0] ]} [10.0, 10.0] ]}
Output example
1
0
To learn more, visit the ST_INTERSECTS reference.
ST_WITHIN
The ST_WITHIN function determines whether a point or polygon is within a polygon. If the polygon contains the
point or polygon, the function will return 1. The function will return 0 if the point or polygon isn't located within the
declared polygon.
The following example query uses ST_WITHIN to determine whether the delivery destination point is within the
given warehouse polygon.
SELECT
ST_WITHIN(input.deliveryDestination, input.warehouse)
FROM input
Input example
DELIVERYDESTINATION WAREHOUSE
{“type”:”Point”, “coordinates”: [76.6, 10.1]} {“type”:”Polygon”, “coordinates”: [ [0.0, 0.0], [10.0, 0.0], [10.0,
10.0], [0.0, 10.0], [0.0, 0.0] ]}
{“type”:”Point”, “coordinates”: [15.0, 15.0]} {“type”:”Polygon”, “coordinates”: [ [10.0, 10.0], [20.0, 10.0],
[20.0, 20.0], [10.0, 20.0], [10.0, 10.0] ]}
Output example
0
1
To learn more, visit the ST_WITHIN reference.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Compatibility level for Azure Stream Analytics jobs
5/6/2019 • 4 minutes to read • Edit Online
This article describes the compatibility level option in Azure Stream Analytics. Stream Analytics is a managed
service, with regular feature updates, and performance improvements. Most of the service's runtimes updates are
automatically made available to end users.
However, some new functionality in the service may introduce a major change, such as a change in the behavior of
an existing job, or a change in the way data is consumed in running jobs. You can keep your existing Stream
Analytics jobs running without major changes by leaving the compatibility level setting lowered. When you are
ready for the latest runtime behaviors, you can opt-in by raising the compatibility level.
1.1 level: The message content contains the stream directly with no additional tags. For example:
{ "SensorId":"1", "Temperature":64}
NOTE
Persisting case-sensitivity isn't yet available for Stream Analytic jobs hosted by using Edge environment. As a result, all field
names are converted to lowercase if your job is hosted on Edge.
FloatNaNDeserializationDisabled
1.0 level: CREATE TABLE command did not filter events with NaN (Not-a-Number. For example, Infinity, -Infinity)
in a FLOAT column type because they are out of the documented range for these numbers.
1.1 level: CREATE TABLE allows you to specify a strong schema. The Stream Analytics engine validates that the
data conforms to this schema. With this model, the command can filter events with NaN values.
Disable automatic upcast for datetime strings in JSON
1.0 level: The JSON parser would automatically upcast string values with date/time/zone information to DateTime
type and then convert it to UTC. This behavior resulted in losing the timezone information.
1.1 level: There is no more automatically upcast of string values with date/time/zone information to DateTime
type. As a result, timezone information is kept.
Next steps
Troubleshoot Azure Stream Analytics inputs
Stream Analytics Resource health
Query examples for common Stream Analytics usage
patterns
5/16/2019 • 12 minutes to read • Edit Online
Queries in Azure Stream Analytics are expressed in a SQL -like query language. The language constructs are
documented in the Stream Analytics query language reference guide.
The query design can express simple pass-through logic to move event data from one input stream into an output
data store, or it can do rich pattern matching and temporal analysis to calculate aggregates over various time
windows as in the Build an IoT solution by using Stream Analytics guide. You can join data from multiple inputs to
combine streaming events, and you can do lookups against static reference data to enrich the event values. You can
also write data to multiple outputs.
This article outlines solutions to several common query patterns based on real-world scenarios.
Output:
MAKE WEIGHT
Honda 3000
Solution:
SELECT
Make,
SUM(CAST(Weight AS BIGINT)) AS Weight
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
Explanation: Use a CAST statement in the Weight field to specify its data type. See the list of supported data
types in Data types (Azure Stream Analytics).
Output:
Solution:
SELECT
*
FROM
Input TIMESTAMP BY Time
WHERE
LicensePlate LIKE 'A%9'
Explanation: Use the LIKE statement to check the LicensePlate field value. It should start with the letter A, then
have any string of zero or more characters, and then end with the number 9.
MAKE TIME
Honda 2015-01-01T00:00:01.0000000Z
Toyota 2015-01-01T00:00:02.0000000Z
Toyota 2015-01-01T00:00:03.0000000Z
Output:
CARSPASSED TIME
1 Honda 2015-01-01T00:00:10.0000000Z
2 Toyotas 2015-01-01T00:00:10.0000000Z
Solution:
SELECT
CASE
WHEN COUNT(*) = 1 THEN CONCAT('1 ', Make)
ELSE CONCAT(CAST(COUNT(*) AS NVARCHAR(MAX)), ' ', Make, 's')
END AS CarsPassed,
System.TimeStamp() AS Time
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
Explanation: The CASE expression compares an expression to a set of simple expressions to determine the result.
In this example, vehicle makes with a count of 1 returned a different string description than vehicle makes with a
count other than 1.
MAKE TIME
Honda 2015-01-01T00:00:01.0000000Z
Honda 2015-01-01T00:00:02.0000000Z
Toyota 2015-01-01T00:00:01.0000000Z
Toyota 2015-01-01T00:00:02.0000000Z
Toyota 2015-01-01T00:00:03.0000000Z
Output1:
MAKE TIME
Honda 2015-01-01T00:00:01.0000000Z
Honda 2015-01-01T00:00:02.0000000Z
Toyota 2015-01-01T00:00:01.0000000Z
Toyota 2015-01-01T00:00:02.0000000Z
MAKE TIME
Toyota 2015-01-01T00:00:03.0000000Z
Output2:
Toyota 2015-01-01T00:00:10.0000000Z 3
Solution:
SELECT
*
INTO
ArchiveOutput
FROM
Input TIMESTAMP BY Time
SELECT
Make,
System.TimeStamp() AS Time,
COUNT(*) AS [Count]
INTO
AlertOutput
FROM
Input TIMESTAMP BY Time
GROUP BY
Make,
TumblingWindow(second, 10)
HAVING
[Count] >= 3
Explanation: The INTO clause tells Stream Analytics which of the outputs to write the data to from this
statement. The first query is a pass-through of the data received to an output named ArchiveOutput. The second
query does some simple aggregation and filtering, and it sends the results to a downstream alerting system,
AlertOutput.
Note that you can also reuse the results of the common table expressions (CTEs) (such as WITH statements) in
multiple output statements. This option has the added benefit of opening fewer readers to the input source.
For example:
WITH AllRedCars AS (
SELECT
*
FROM
Input TIMESTAMP BY Time
WHERE
Color = 'red'
)
SELECT * INTO HondaOutput FROM AllRedCars WHERE Make = 'Honda'
SELECT * INTO ToyotaOutput FROM AllRedCars WHERE Make = 'Toyota'
MAKE TIME
Honda 2015-01-01T00:00:01.0000000Z
Honda 2015-01-01T00:00:02.0000000Z
Toyota 2015-01-01T00:00:01.0000000Z
Toyota 2015-01-01T00:00:02.0000000Z
Toyota 2015-01-01T00:00:03.0000000Z
Output:
COUNTMAKE TIME
2 2015-01-01T00:00:02.000Z
1 2015-01-01T00:00:04.000Z
Solution:
SELECT
COUNT(DISTINCT Make) AS CountMake,
System.TIMESTAMP() AS TIME
FROM Input TIMESTAMP BY TIME
GROUP BY
TumblingWindow(second, 2)
Explanation: COUNT(DISTINCT Make) returns the number of distinct values in the Make column within a
time window.
MAKE TIME
Honda 2015-01-01T00:00:01.0000000Z
Toyota 2015-01-01T00:00:02.0000000Z
Output:
MAKE TIME
Toyota 2015-01-01T00:00:02.0000000Z
Solution:
SELECT
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
LAG(Make, 1) OVER (LIMIT DURATION(minute, 1)) <> Make
Explanation: Use LAG to peek into the input stream one event back and get the Make value. Then compare it to
the Make value on the current event and output the event if they are different.
Output:
Solution:
SELECT
LicensePlate,
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
IsFirst(minute, 10) = 1
Now let's change the problem and find the first car of a particular make in every 10-minute interval.
LICENSEPLATE MAKE TIME
Solution:
SELECT
LicensePlate,
Make,
Time
FROM
Input TIMESTAMP BY Time
WHERE
IsFirst(minute, 10) OVER (PARTITION BY Make) = 1
Output:
Solution:
WITH LastInWindow AS
(
SELECT
MAX(Time) AS LastEventTime
FROM
Input TIMESTAMP BY Time
GROUP BY
TumblingWindow(minute, 10)
)
SELECT
Input.LicensePlate,
Input.Make,
Input.Time
FROM
Input TIMESTAMP BY Time
INNER JOIN LastInWindow
ON DATEDIFF(minute, Input, LastInWindow) BETWEEN 0 AND 10
AND Input.Time = LastInWindow.LastEventTime
Explanation: There are two steps in the query. The first one finds the latest time stamp in 10-minute windows.
The second step joins the results of the first query with the original stream to find the events that match the last
time stamps in each window.
Output:
CURRENTCARLICENSEPL
MAKE TIME ATE FIRSTCARLICENSEPLATE FIRSTCARTIME
Solution:
SELECT
Make,
Time,
LicensePlate AS CurrentCarLicensePlate,
LAG(LicensePlate, 1) OVER (LIMIT DURATION(second, 90)) AS FirstCarLicensePlate,
LAG(Time, 1) OVER (LIMIT DURATION(second, 90)) AS FirstCarTime
FROM
Input TIMESTAMP BY Time
WHERE
LAG(Make, 1) OVER (LIMIT DURATION(second, 90)) = Make
Explanation: Use LAG to peek into the input stream one event back and get the Make value. Compare it to the
MAKE value in the current event, and then output the event if they are the same. You can also use LAG to get data
about the previous car.
Output:
[email protected] RightMenu 7
Solution:
SELECT
[user], feature, DATEDIFF(second, LAST(Time) OVER (PARTITION BY [user], feature LIMIT DURATION(hour,
1) WHEN Event = 'start'), Time) as duration
FROM input TIMESTAMP BY Time
WHERE
Event = 'end'
Explanation: Use the LAST function to retrieve the last TIME value when the event type was Start. The LAST
function uses PARTITION BY [user] to indicate that the result is computed per unique user. The query has a 1-
hour maximum threshold for the time difference between Start and Stop events, but is configurable as needed
(LIMIT DURATION (hour, 1).
Output:
STARTFAULT ENDFAULT
2015-01-01T00:00:02.000Z 2015-01-01T00:00:07.000Z
Solution:
WITH SelectPreviousEvent AS
(
SELECT
*,
LAG([time]) OVER (LIMIT DURATION(hour, 24)) as previousTime,
LAG([weight]) OVER (LIMIT DURATION(hour, 24)) as previousWeight
FROM input TIMESTAMP BY [time]
)
SELECT
LAG(time) OVER (LIMIT DURATION(hour, 24) WHEN previousWeight < 20000 ) [StartFault],
previousTime [EndFault]
FROM SelectPreviousEvent
WHERE
[weight] < 20000
AND previousWeight > 20000
Explanation: Use LAG to view the input stream for 24 hours and look for instances where StartFault and
StopFault are spanned by the weight < 20000.
T VALUE
"2014-01-01T06:01:00" 1
T VALUE
"2014-01-01T06:01:05" 2
"2014-01-01T06:01:10" 3
"2014-01-01T06:01:15" 4
"2014-01-01T06:01:30" 5
"2014-01-01T06:01:35" 6
2014-01-01T14:01:00.000Z 2014-01-01T14:01:00.000Z 1
2014-01-01T14:01:05.000Z 2014-01-01T14:01:05.000Z 2
2014-01-01T14:01:10.000Z 2014-01-01T14:01:10.000Z 3
2014-01-01T14:01:15.000Z 2014-01-01T14:01:15.000Z 4
2014-01-01T14:01:20.000Z 2014-01-01T14:01:15.000Z 4
2014-01-01T14:01:25.000Z 2014-01-01T14:01:15.000Z 4
2014-01-01T14:01:30.000Z 2014-01-01T14:01:30.000Z 5
2014-01-01T14:01:35.000Z 2014-01-01T14:01:35.000Z 6
2014-01-01T14:01:40.000Z 2014-01-01T14:01:35.000Z 6
2014-01-01T14:01:45.000Z 2014-01-01T14:01:35.000Z 6
Solution:
SELECT
System.Timestamp() AS windowEnd,
TopOne() OVER (ORDER BY t DESC) AS lastEvent
FROM
input TIMESTAMP BY t
GROUP BY HOPPINGWINDOW(second, 300, 5)
Explanation: This query generates events every 5 seconds and outputs the last event that was received
previously. The Hopping window duration determines how far back the query looks to find the latest event (300
seconds in this example).
Query example: Correlate two event types within the same stream
Description: Sometimes alerts need to be generated based on multiple event types that occurred in a certain time
range. For example, in an IoT scenario for home ovens, an alert must be generated when the fan temperature is
less than 40 and the maximum power during the last 3 minutes is less than 10.
Input:
Output:
MAXPOWERDURINGLA
EVENTTIME DEVICEID TEMP ALERTMESSAGE ST3MINS
Solution:
WITH max_power_during_last_3_mins AS (
SELECT
System.TimeStamp() AS windowTime,
deviceId,
max(value) as maxPower
FROM
input TIMESTAMP BY t
WHERE
sensorName = 'power'
GROUP BY
deviceId,
SlidingWindow(minute, 3)
)
SELECT
t1.t AS eventTime,
t1.deviceId,
t1.value AS temp,
'Short circuit heating elements' as alertMessage,
t2.maxPower AS maxPowerDuringLast3mins
INTO resultsr
WHERE
t1.sensorName = 'temp'
AND t1.value <= 40
AND t2.maxPower > 10
Explanation: The first query max_power_during_last_3_mins , uses the Sliding window to find the max value of the
power sensor for every device, during the last 3 minutes. The second query is joined to the first query to find the
power value in the most recent window relevant for the current event. And then, provided the conditions are met,
an alert is generated for the device.
Output:
TOLLID COUNT
1 2
2 2
1 1
3 1
2 1
3 1
Solution:
SELECT
TollId,
COUNT(*) AS Count
FROM input
TIMESTAMP BY Time OVER TollId
GROUP BY TUMBLINGWINDOW(second, 5), TollId
Explanation: The TIMESTAMP BY OVER clause looks at each device timeline separately using substreams. The
output events for each TollID are generated as they are computed, meaning that the events are in order with
respect to each TollID instead of being reordered as if all devices were on the same clock.
1 2018-07- Temperature 50
27T00:00:01.0000000Z
1 2018-07- Temperature 50
27T00:00:01.0000000Z
2 2018-07- Temperature 40
27T00:00:01.0000000Z
1 2018-07- Temperature 60
27T00:00:05.0000000Z
2 2018-07- Temperature 50
27T00:00:05.0000000Z
Output:
AVERAGEVALUE DEVICEID
70 1
45 2
Solution:
With Temp AS (
SELECT
COUNT(DISTINCT Time) AS CountTime,
Value,
DeviceId
FROM
Input TIMESTAMP BY Time
GROUP BY
Value,
DeviceId,
SYSTEM.TIMESTAMP()
)
SELECT
AVG(Value) AS AverageValue, DeviceId
INTO Output
FROM Temp
GROUP BY DeviceId,TumblingWindow(minute, 5)
Explanation: COUNT(DISTINCT Time) returns the number of distinct values in the Time column within a time
window. You can then use the output of this step to compute the average per device by discarding duplicates.
Get help
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Parse JSON and Avro data in Azure Stream Analytics
6/4/2019 • 2 minutes to read • Edit Online
Azure Stream Analytics supports processing events in CSV, JSON, and Avro data formats. Both JSON and Avro
data can contain complex types such as nested objects (records) and arrays.
Examples
Select array element at a specified index (selecting the first array element):
SELECT
GetArrayElement(arrayField, 0) AS firstElement
FROM input
SELECT
GetArrayLength(arrayField) AS arrayLength
FROM input
Select all array element as individual events. The APPLY operator together with the GetArrayElements built-in
function extracts all array elements as individual events:
SELECT
arrayElement.ArrayIndex,
arrayElement.ArrayValue
FROM input as event
CROSS APPLY GetArrayElements(event.arrayField) AS arrayElement
Examples
Use dot notation (.) to access nested fields. For example, this query selects the Latitude and Longitude coordinates
under the Location property in the preceding JSON data:
SELECT
DeviceID,
Location.Lat,
Location.Long
FROM input
Use the GetRecordPropertyValue function if the property name is unknown. For example, imagine a sample data
stream needs to be joined with reference data containing thresholds for each device sensor:
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 75
}
SELECT
input.DeviceID,
thresholds.SensorName
FROM input
JOIN thresholds
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
To convert record fields into separate events, use the APPLY operator together with the GetRecordProperties
function. For example, to convert a sample stream into a stream of events with individual sensor readings, this
query could be used:
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
You can select all the properties of a nested record using '*' wildcard. Consider the following example:
SELECT input.SensorReadings.*
FROM input
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor022" : 99
}
See Also
Data Types in Azure Stream Analytics
Understand time handling in Azure Stream Analytics
4/23/2019 • 14 minutes to read • Edit Online
In this article, we discuss how you can make design choices to solve practical time handling problems in the Azure
Stream Analytics service. Time handling design decisions are closely related to event ordering factors.
Metrics to observe
You can observe a number of the Event ordering time tolerance effects through Stream Analytics job metrics. The
following metrics are relevant:
METRIC DESCRIPTION
Out-of-Order Events Indicates the number of events received out of order, that
were either dropped or given an adjusted timestamp. This
metric is directly impacted by the configuration of the Out of
order events setting on the Event ordering page on the job
in the Azure portal.
Late Input Events Indicates the number of events arriving late from the source.
This metric includes events that have been dropped or have
had their timestamp was adjusted. This metric is directly
impacted by the configuration of the Events that arrive late
setting in the Event ordering page on the job in the Azure
portal.
Early Input Events Indicates the number of events arriving early from the source
that have either been dropped, or their timestamp has been
adjusted if they are beyond 5 minutes early.
Watermark Delay Indicates the delay of the streaming data processing job. See
more information in the following section.
In this example, no early arrival policy is applied. Outlier events that arrive early raise the watermark
significantly. Notice the third event (deviceId1 at time 12:11) is not dropped in this scenario, and the
watermark is raised to 12:15. The fourth event time is adjusted forward 7 minutes (12:08 to 12:15) as a
result.
3. In the final illustration, substreams are used (OVER the DeviceId). Multiple watermarks are tracked, one per
stream. There are fewer events with their times adjusted as a result.
Next steps
Azure Stream Analytics event order considerations
Stream Analytics job metrics
Checkpoint and replay concepts in Azure Stream
Analytics jobs
12/7/2018 • 4 minutes to read • Edit Online
This article describes the internal checkpoint and replay concepts in Azure Stream Analytics, and the impact those
have on job recovery. Each time a Stream Analytics job runs, state information is maintained internally. That state
information is saved in a checkpoint periodically. In some scenarios, the checkpoint information is used for job
recovery if a job failure or upgrade occurs. In other circumstances, the checkpoint cannot be used for recovery, and
a replay is necessary.
Next steps
For more information on reliability and scalability, see these articles:
Tutorial: Set up alerts for Azure Stream Analytics jobs
Scale an Azure Stream Analytics job to increase throughput
Guarantee Stream Analytics job reliability during service updates
Azure Stream Analytics output error policy
12/7/2018 • 2 minutes to read • Edit Online
This article describes the output data error handling policies that can be configured in Azure Stream Analytics.
Output data error handling policies apply only to data conversion errors that occur when the output event
produced by a Stream Analytics job does not conform to the schema of the target sink. You can configure this
policy by choosing either Retry or Drop. In the Azure portal, while in a Stream Analytics job, under Configure,
select Error Policy to make your selection.
Retry
When an error occurs, Azure Stream Analytics retries writing the event indefinitely until the write succeeds. There
is no timeout for retries. Eventually all subsequent events are blocked from processing by the event that is retrying.
This option is the default output error handling policy.
Drop
Azure Stream Analytics will drop any output event that results in a data conversion error. The dropped events
cannot be recovered for reprocessing later.
All transient errors (for example, network errors) are retried regardless of the output error handling policy
configuration.
Next steps
Troubleshooting guide for Azure Stream Analytics
Rotate login credentials for inputs and outputs of a
Stream Analytics Job
12/7/2018 • 3 minutes to read • Edit Online
Whenever you regenerate credentials for an input or output of a Stream Analytics job, you should update the job
with new credentials. You must stop the job before updating the credentials, you can’t replace the credentials while
the job is running. To reduce the lag between stopping and restarting the job, Stream Analytics supports resuming
a job from its last output. This topic describes the process of rotating the login credentials and restarting the job
with new credentials.
Regenerate new credentials and update your job with the new
credentials
In this section, we will walk you through regenerating credentials for Blob Storage, Event Hubs, SQL Database, and
Table Storage.
Blob storage/Table storage
1. Sign in to the Azure portal > browse the storage account that you used as input/output for the Stream Analytics
job.
2. From the settings section, open Access keys. Between the two default keys (key1, key2), pick the one that is not
used by your job and regenerate it:
time.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Configuring event ordering policies for Azure Stream
Analytics
3/19/2019 • 4 minutes to read • Edit Online
This article describes how to setup and use late arrival and out-of-order event policies in Azure Stream Analytics.
These policies are applied only when you use the TIMESTAMP BY clause in your query.
This message to inform you that at least one partition in your input is empty and will delay your output by the late
arrival threshold. To overcome this, it is recommended you either:
1. Ensure all partitions of your Event Hub/IoT Hub receive input.
2. Use Partition by PartitionID clause in your query.
Next steps
Time handling considerations
Metrics available in Stream Analytics
Set up alerts for Azure Stream Analytics jobs
2/6/2019 • 2 minutes to read • Edit Online
It's important to monitor your Azure Stream Analytics job to ensure the job is running continuously without any
problems. This article describes how to set up alerts for common scenarios that should be monitored.
Rules can be set up on metrics through the portal and can be configured programmatically over Operation Logs
data.
4. Your Stream Analytics job name should automatically appear under RESOURCE. Click Add condition,
and select All Administrative operations under Configure signal logic.
5. Under Configure signal logic, change Event Level to All and change Status to Failed. Leave Event
initiated by blank and click Done.
6. Select an existing action group or create a new group. In this example, a new action group called
TIDashboardGroupActions was created with an Emails action that sends an email to users with the
Owner Azure Resource Manager Role.
7. The RESOURCE, CONDITION, and ACTION GROUPS should each have an entry. Note that in order for
the alerts to fire, the conditions defined need to be met. For example, you can measure a metric's average
value of over the last 15 minutes, every 5 minutes.
Add an Alert rule name, Description, and your Resource Group to the ALERT DETAILS and click
Create alert rule to create the rule for your Stream Analytics job.
Scenarios to monitor
The following alerts are recommended for monitoring the performance of your Stream Analytics job. These
metrics should be evaluated every minute over the last 5-minute period.
METRIC CONDITION TIME AGGREGATION THRESHOLD CORRECTIVE ACTIONS
Watermark delay Greater than Maximum When average value Try increasing the
of this metric over the number of SUs or
last 15 minutes is parallelizing your
greater than late query. For more
arrival tolerance (in information on SUs,
seconds). If you have see Understand and
not modified the late adjust Streaming
arrival tolerance, the Units. For more
default is set to 5 information on
seconds. parallelizing your
query, see Leverage
query parallelization
in Azure Stream
Analytics.
Get help
For more detail on configuring alerts in the Azure portal, see Receive alert notifications.
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
How to start an Azure Stream Analytics job
4/4/2019 • 2 minutes to read • Edit Online
You can start your Azure Stream Analytics job using the Azure portal, Visual Studio, and PowerShell. When you
start a job, you select a time for the job to start creating output. Azure portal, Visual Studio, and PowerShell each
have different methods for setting the start time. Those methods are described below.
Start options
The three following options are available to start a job. Note that all the times mentioned below are the ones
specified in TIMESTAMP BY. If TIMESTAMP BY is not specified, arrival time will be used.
Now: Makes the starting point of the output event stream the same as when the job is started. If a temporal
operator is used (e.g. time window, LAG or JOIN ), Azure Stream Analytics will automatically look back at the
data in the input source. For instance, if you start a job “Now” and if your query uses a 5-minutes Tumbling
Window, Azure Stream Analytics will seek data from 5 minutes ago in the input. The first possible output
event would have a timestamp equal to or greater than the current time, and ASA guarantees that all input
events that may logically contribute to the output has been accounted for. For example, no partial windowed
aggregates are generated. It’s always the complete aggregated value.
Custom: You can choose the starting point of the output. Similarly to the Now option, Azure Stream
Analytics will automatically read the data prior to this time if a temporal operator is used
When last stopped. This option is available when the job was previously started, but was stopped
manually or failed. When choosing this option Azure Stream Analytics will use the last output time to restart
the job so no data is lost. Similarly to previous options, Azure Stream Analytics will automatically read the
data prior to this time if a temporal operator is used. Since several input partitions may have different time,
the earliest stop time of all partitions is used, as a result some duplicates may be seen in the output. More
information about exactly-once processing are available on the page Event Delivery Guarantees.
Azure portal
Navigate to your job in the Azure portal and select Start on the overview page. Select a Job output start time
and then select Start.
Choose one of the options for Job output start time. The options are Now, Custom, and, if the job was previously
run, When last stopped. See above for more information about these options.
Visual Studio
In the job view, select the green arrow button to start the job. Set the Job Output Start Mode and select Start.
The job status will change to Running.
There are three options for Job Output Start Mode: JobStartTime, CustomTime, and LastOutputEventTime. If
this property is absent, the default is JobStartTime. See above for more information about these options.
PowerShell
Use the following cmdlet to start your job using PowerShell:
Start-AzStreamAnalyticsJob `
-ResourceGroupName $resourceGroup `
-Name $jobName `
-OutputStartMode 'JobStartTime'
There are three options for OutputStartMode: JobStartTime, CustomTime, and LastOutputEventTime. If this
property is absent, the default is JobStartTime. See above for more information about these options.
For more information on the Start-AzStreamAnalyitcsJob cmdlet, view the Start-AzStreamAnalyticsJob reference.
Next steps
Quickstart: Create a Stream Analytics job by using the Azure portal
Quickstart: Create a Stream Analytics job using Azure PowerShell
Quickstart: Create a Stream Analytics job by using the Azure Stream Analytics tools for Visual Studio
Test a Stream Analytics query with sample data
12/17/2018 • 2 minutes to read • Edit Online
By using Azure Stream Analytics, you can upload sample data and test queries in the Azure portal without starting
or stopping a job.
5. After the upload is complete, select Test to test this query against the sample data you have provided.
6. If you need the test output for later use, the output of your query is displayed in the browser with a link to
the download results.
7. Iteratively modify your query and test it again to see how the output changes.
When you use multiple outputs in a query, the results are shown on separate tabs, and you can easily toggle
between them.
8. After you verify the results shown in the browser, Save your query. Then Start the job, and let it process the
incoming events.
Next steps
Azure Stream Analytics Query Language Reference
Tutorial: Stream Analytics and Power BI: A real-time
analytics dashboard for streaming data
3/15/2019 • 7 minutes to read • Edit Online
Azure Stream Analytics enables you to take advantage of one of the leading business intelligence tools, Microsoft
Power BI. In this article, you learn how create business intelligence tools by using Power BI as an output for your
Azure Stream Analytics jobs. You also learn how to create and use a real-time dashboard.
This article continues from the Stream Analytics real-time fraud detection tutorial. It builds on the workflow
created in that tutorial and adds a Power BI output so that you can visualize fraudulent phone calls that are
detected by a Streaming Analytics job.
You can watch a video that illustrates this scenario.
Prerequisites
Before you start, make sure you have the following:
An Azure account.
An account for Power BI. You can use a work account or a school account.
A completed version of the real-time fraud detection tutorial. The tutorial includes an app that generates
fictitious telephone-call metadata. In the tutorial, you create an event hub and send the streaming phone call
data to the event hub. You write a query that detects fraudulent calls (calls from the same number at the same
time in different locations).
6. Enter your credentials. Be aware then when you enter your credentials, you're also giving permission to the
Streaming Analytics job to access your Power BI area.
7. When you're returned to the New output blade, enter the following information:
Group Workspace: Select a workspace in your Power BI tenant where you want to create the
dataset.
Dataset Name: Enter sa-dataset . You can use a different name. If you do, make a note of it for later.
Table Name: Enter fraudulent-calls . Currently, Power BI output from Stream Analytics jobs can
have only one table in a dataset.
WARNING
If Power BI has a dataset and table that have the same names as the ones that you specify in the Stream
Analytics job, the existing ones are overwritten. We recommend that you do not explicitly create this dataset
and table in your Power BI account. They are automatically created when you start your Stream Analytics job
and the job starts pumping output into Power BI. If your job query doesn't return any results, the dataset
and table are not created.
8. Click Create.
The dataset is created with the following settings:
defaultRetentionPolicy: BasicFIFO: Data is FIFO, with a maximum of 200,000 rows.
defaultMode: pushStreaming: The dataset supports both streaming tiles and traditional report-based visuals
(a.k.a. push).
NOTE
If you did not name the input CallStream in the fraud-detection tutorial, substitute your name for CallStream in
the FROM and JOIN clauses in the query.
```SQL
/* Our criteria for fraud:
Calls made from the same caller to two phone switches in different locations (for example, Australia
and Europe) within five seconds */
/* Where the caller is the same, as indicated by IMSI (International Mobile Subscriber Identity) */
ON CS1.CallingIMSI = CS2.CallingIMSI
/* ...and date between CS1 and CS2 is between one and five seconds */
AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
4. Click Save.
2. In the Query blade, click the dots next to the CallStream input and then select Sample data from input.
3. Specify that you want three minutes' worth of data and click OK. Wait until you're notified that the data has
been sampled.
4. Click Test and make sure you're getting results.
Your Streaming Analytics job starts looking for fraudulent calls in the incoming stream. The job also creates the
dataset and table in Power BI and starts sending data about the fraudulent calls to them.
Create the dashboard in Power BI
1. Go to Powerbi.com and sign in with your work or school account. If the Stream Analytics job query outputs
results, you see that your dataset is already created:
4. At the top of the window, click Add tile, select CUSTOM STREAMING DATA, and then click Next.
5. Under YOUR DATSETS, select your dataset and then click Next.
6. Under Visualization Type, select Card, and then in the Fields list, select fraudulentcalls.
7. Click Next.
8. Fill in tile details like a title and subtitle.
9. Click Apply.
Now you have a fraud counter!
10. Follow the steps again to add a tile (starting with step 4). This time, do the following:
When you get to Visualization Type, select Line chart.
Add an axis and select windowend.
Add a value and select fraudulentcalls.
For Time window to display, select the last 10 minutes.
11. Click Next, add a title and subtitle, and click Apply.
The Power BI dashboard now gives you two views of data about fraudulent calls as detected in the
streaming data.
Learn more about Power BI
This tutorial demonstrates how to create only a few kinds of visualizations for a dataset. Power BI can help you
create other customer business intelligence tools for your organization. For more ideas, see the following
resources:
For another example of a Power BI dashboard, watch the Getting Started with Power BI video.
For more information about configuring Streaming Analytics job output to Power BI and using Power BI
groups, review the Power BI section of the Stream Analytics outputs article.
For information about using Power BI generally, see Dashboards in Power BI.
For example:
You have 1,000 devices sending data at one-second intervals.
You are using the Power BI Pro SKU that supports 1,000,000 rows per hour.
You want to publish the amount of average data per device to Power BI.
As a result, the equation becomes:
Given this configuration, you can change the original query to the following:
SELECT
MAX(hmdt) AS hmdt,
MAX(temp) AS temp,
System.TimeStamp AS time,
dspl
INTO "CallStream-PowerBI"
FROM
Input TIMESTAMP BY time
GROUP BY
TUMBLINGWINDOW(ss,4),
dspl
Renew authorization
If the password has changed since your job was created or last authenticated, you need to reauthenticate your
Power BI account. If Azure Multi-Factor Authentication is configured on your Azure Active Directory (Azure AD )
tenant, you also need to renew Power BI authorization every two weeks. If you don't renew, you could see
symptoms such as a lack of job output or an Authenticate user error in the operation logs.
Similarly, if a job starts after the token has expired, an error occurs and the job fails. To resolve this issue, stop the
job that's running and go to your Power BI output. To avoid data loss, select the Renew authorization link, and
then restart your job from the Last Stopped Time.
After the authorization has been refreshed with Power BI, a green alert appears in the authorization area to reflect
that the issue has been resolved.
Get help
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics query language reference
Azure Stream Analytics Management REST API reference
Clean up your Azure Stream Analytics job
3/7/2019 • 2 minutes to read • Edit Online
Azure Stream Analytics jobs can be easily deleted through the Azure portal, Azure PowerShell, Azure SDK for .Net,
or REST API. A Stream Analytics job cannot be recovered once it has been deleted.
NOTE
When you stop your Stream Analytics job, the data persists only in the input and output storage, such as Event Hubs or
Azure SQL Database. If you are required to remove data from Azure, be sure to follow the removal process for the input and
output resources of your Stream Analytics job.
To stop a job using PowerShell, use the Stop-AzStreamAnalyticsJob cmdlet. To delete a job using PowerShell, use
the Remove-AzStreamAnalyticsJob cmdlet.
Part of being a fully managed service is the capability to introduce new service functionality and improvements at
a rapid pace. As a result, Stream Analytics can have a service update deploy on a weekly (or more frequent) basis.
No matter how much testing is done there is still a risk that an existing, running job may break due to the
introduction of a bug. For customers who run critical streaming processing jobs these risks need to be avoided. A
mechanism customers can use to reduce this risk is Azure’s paired region model.
Next steps
Introduction to Stream Analytics
Get started with Stream Analytics
Scale Stream Analytics jobs
Stream Analytics query language reference
Stream Analytics management REST API reference
Authenticate Stream Analytics to Azure Data Lake
Storage Gen1 using managed identities
4/11/2019 • 5 minutes to read • Edit Online
Azure Stream Analytics supports managed identity authentication with Azure Data Lake Storage (ADLS ) Gen1
output. The identity is a managed application registered in Azure Active Directory that represents a given Stream
Analytics job, and can be used to authenticate to a targeted resource. Managed identities eliminate the limitations
of user-based authentication methods, like needing to reauthenticate due to password changes or user token
expirations that occur every 90 days. Additionally, managed identities help with the automation of Stream
Analytics job deployments that output to Azure Data Lake Storage Gen1.
This article shows you three ways to enable managed identity for an Azure Stream Analytics job that outputs to an
Azure Data Lake Storage Gen1 through the Azure portal, Azure Resource Manager template deployment, and
Azure Stream Analytics tools for Visual Studio.
NOTE
This article has been updated to use the new Azure PowerShell Az module. You can still use the AzureRM module, which will
continue to receive bug fixes until at least December 2020. To learn more about the new Az module and AzureRM
compatibility, see Introducing the new Azure PowerShell Az module. For Az module installation instructions, see Install Azure
PowerShell.
Azure portal
1. Start by creating a new Stream Analytics job or by opening an existing job in Azure portal. From the menu
bar located on the left side of the screen, select Managed Identity located under Configure.
2. Select Use System -assigned Managed Identity from the window that appears on the right. Click Save to
a service principal for the identity of the Stream Analytics job in Azure Active Directory. The life cycle of the
newly created identity will be managed by Azure. When the Stream Analytics job is deleted, the associated
identity (that is, the service principal) is automatically deleted by Azure.
When the configuration is saved, the Object ID (OID ) of the service principal is listed as the Principal ID as
shown below:
The service principal has the same name as the Stream Analytics job. For example, if the name of your job is
MyASAJob, the name of the service principal created is also MyASAJob.
3. In the output properties window of the ADLS Gen1 output sink, click the Authentication mode drop-down
and select **Managed Identity **.
4. Fill out the rest of the properties. To learn more about creating an ADLS output, see Create a Data lake
Store output with stream analytics. When you are finished, click Save.
5. Navigate to the Overview page of your ADLS Gen1 and click on Data explorer.
6. In the Data explorer pane, select Access and click Add in the Access pane.
7. In the text box on the Select user or group pane, type the name of the service principal. Remember that
the name of the service principal is also the name of the corresponding Stream Analytics job. As you begin
typing the principal name, it will appear below the text box. Choose the desired service principal name and
click Select.
8. In the Permissions pane, check the Write and Execute permissions and assign it to This Folder and all
children. Then click Ok.
9. The service principal is listed under Assigned Permissions on the Access pane as shown below. You can
now go back and start your Stream Analytics job.
To learn more about Data Lake Storage Gen1 file system permissions, see Access Control in Azure Data
Lake Storage Gen1.
This property tells Azure Resource Manager to create and manage the identity for your Azure Stream
Analytics job.
Sample job
{
"Name": "AsaJobWithIdentity",
"Type": "Microsoft.StreamAnalytics/streamingjobs",
"Location": "West US",
"Identity": {
"Type": "SystemAssigned",
},
"properties": {
"sku": {
"name": "standard"
},
"outputs": [
{
"name": "string",
"properties":{
"datasource": {
"type": "Microsoft.DataLake/Accounts",
"properties": {
"accountName": "myDataLakeAccountName",
"filePathPrefix": "cluster1/logs/{date}/{time}",
"dateFormat": "YYYY/MM/DD",
"timeFormat": "HH",
"authenticationMode": "Msi"
}
}
}
}
}
}
{
"Name": "mySAJob",
"Type": "Microsoft.StreamAnalytics/streamingjobs",
"Location": "West US",
"Identity": {
"Type": "SystemAssigned",
"principalId": "GUID",
"tenantId": "GUID",
},
"properties": {
"sku": {
"name": "standard"
},
}
}
Take note of the Principal ID from the job response to grant access to the required ADLS resource.
The Tenant ID is the ID of the Azure Active Directory tenant where the service principal is created. The
service principal is created in the Azure tenant that is trusted by the subscription.
The Type indicates the type of managed identity as explained in types of managed identities. Only the
System Assigned type is supported.
2. Provide Access to the service principal using PowerShell. To give access to the service principal via
PowerShell, execute the following command:
The PrincipalId is the Object ID of the service principal and is listed on the portal screen once the service
principal is created. If you created the job using a Resource Manager template deployment, the Object ID is
listed in the Identity property of the job response.
Example
To learn more about the above PowerShell command, refer to the Set-AzDataLakeStoreItemAclEntry
documentation.
Limitations
This feature doesn’t support the following:
1. Multi-tenant access: The Service principal created for a given Stream Analytics job will reside on the
Azure Active Directory tenant on which the job was created, and cannot be used against a resource that
resides on a different Azure Active Directory tenant. Therefore, you can only use MSI on ADLS Gen 1
resources that are within the same Azure Active Directory tenant as your Azure Stream Analytics job.
2. User Assigned Identity: is not supported. This means the user is not able to enter their own service
principal to be used by their Stream Analytics job. The service principal is generated by Azure Stream
Analytics.
Next steps
Create a Data lake Store output with stream analytics
Test Stream Analytics queries locally with Visual Studio
Test live data locally using Azure Stream Analytics tools for Visual Studio
Use reference data from a SQL Database for an
Azure Stream Analytics job (Preview)
5/29/2019 • 7 minutes to read • Edit Online
Azure Stream Analytics supports Azure SQL Database as a source of input for reference data. You can use SQL
Database as reference data for your Stream Analytics job in the Azure portal and in Visual Studio with Stream
Analytics tools. This article demonstrates how to do both methods.
Azure portal
Use the following steps to add Azure SQL Database as a reference input source using the Azure portal:
Portal prerequisites
1. Create a Stream Analytics job.
2. Create a storage account to be used by the Stream Analytics job.
3. Create your Azure SQL Database with a data set to be used as reference data by the Stream Analytics job.
Define SQL Database reference data input
1. In your Stream Analytics job, select Inputs under Job topology. Click Add reference input and choose
SQL Database.
2. Fill out the Stream Analytics Input Configurations. Choose the database name, server name, username and
password. If you want your reference data input to refresh periodically, choose “On” to specify the refresh
rate in DD:HH:MM. If you have large data sets with a short refresh rate, you can use a delta query.
3. Test the snapshot query in the SQL query editor. For more information, see Use the Azure portal's SQL
query editor to connect and query data
Specify storage account in Job config
Navigate to Storage account settings under Configure and select Add storage account.
If you choose "Refresh Periodically with Delta", two SQL CodeBehind files will be generated: [Input
Alias].snapshot.sql and [Input Alias].delta.sql.
4. Open the SQL file in the editor and write the SQL query.
5. If you are using Visual Studio 2019, and you have installed SQL Server Data tools, you can test the query
by clicking Execute. A wizard window will pop up to help you connect to the SQL database and the query
result will appear in the window at the bottom.
Specify storage account
Open JobConfig.json to specify the storage account for storing SQL reference snapshots.
Delta query
When using the delta query, temporal tables in Azure SQL Database are recommended.
1. Create a temporal table in Azure SQL Database.
Note that Stream Analytics runtime may periodically run the snapshot query in addition to the delta query
to store checkpoints.
FAQs
Will I incur additional cost by using SQL reference data input in Azure Stream Analytics?
There are no additional cost per streaming unit in the Stream Analytics job. However, the Stream Analytics job
must have an associated Azure storage account. The Stream Analytics job queries the SQL DB (during job start
and refresh interval) to retrieve the reference data set and stores that snapshot in the storage account. Storing
these snapshots will incur additional charges detailed in the pricing page for Azure storage account.
How do I know reference data snapshot is being queried from SQL DB and used in the Azure Stream
Analytics job?
There are two metrics filtered by Logical Name (under Metrics Azure Portal) which you can use to monitor the
health of the SQL database reference data input.
InputEvents: This metric measures the number of records loaded in from the SQL database reference data set.
InputEventBytes: This metric measures the size of the reference data snapshot loaded in memory of the Stream
Analytics job.
The combination of both of these metrics can be used to infer if the job is querying the SQL database to fetch the
reference data set and then loading it to memory.
Will I require a special type of Azure SQL Database?
Azure Stream Analytics will work with any type of Azure SQL Database. However, it is important to understand
that the refresh rate set for your reference data input could impact your query load. To use the delta query option,
it is recommended to use temporal tables in Azure SQL Database.
Why does Azure Stream Analytics store snapshots in Azure Storage account?
Stream Analytics guarantees exactly once event processing and at least once delivery of events. In cases where
transient issues impact your job, a small amount of replay is necessary to restore state. To enable replay, it is
required to have these snapshots stored in an Azure Storage account. For more information on checkpoint replay,
see Checkpoint and replay concepts in Azure Stream Analytics jobs.
Next steps
Using reference data for lookups in Stream Analytics
Quickstart: Create a Stream Analytics job by using the Azure Stream Analytics tools for Visual Studio
Test live data locally using Azure Stream Analytics tools for Visual Studio (Preview )
Real-time Twitter sentiment analysis in Azure Stream
Analytics
4/19/2019 • 13 minutes to read • Edit Online
IMPORTANT
Twitter application creation is no longer available through apps.twitter.com. This tutorial is in the process of being updated to
include the new Twitter API.
Learn how to build a sentiment analysis solution for social media analytics by bringing real-time Twitter events
into Azure Event Hubs. You can then write an Azure Stream Analytics query to analyze the data and either store
the results for later use or use a dashboard and Power BI to provide insights in real time.
Social media analytics tools help organizations understand trending topics. Trending topics are subjects and
attitudes that have a high volume of posts in social media. Sentiment analysis, which is also called opinion mining,
uses social media analytics tools to determine attitudes toward a product, idea, and so on.
Real-time Twitter trend analysis is a great example of an analytics tool, because the hashtag subscription model
enables you to listen to specific keywords (hashtags) and develop sentiment analysis of the feed.
Prerequisites
In this tutorial, you use a client application that connects to Twitter and looks for tweets that have certain hashtags
(which you can set). In order to run the application and analyze the tweets using Azure Streaming Analytics, you
must have the following:
An Azure subscription
A Twitter account
A Twitter application, and the OAuth access token for that application. We provide high-level instructions for
how to create a Twitter application later.
The TwitterWPFClient application, which reads the Twitter feed. To get this application, download the
TwitterWPFClient.zip file from GitHub and then unzip the package into a folder on your computer. If you want
to see the source code and run the application in a debugger, you can get the source code from GitHub.
4. When the namespace has finished deploying, find the event hub namespace in your list of Azure resources.
5. Click the new namespace, and in the namespace blade, click + Event Hub.
6. Name the new event hub socialtwitter-eh . You can use a different name. If you do, make a note of it,
because you need the name later. You don't need to set any other options for the event hub.
7. Click Create.
Grant access to the event hub
Before a process can send data to an event hub, the event hub must have a policy that allows appropriate access.
The access policy produces a connection string that includes authorization information.
1. In the event namespace blade, click Event Hubs and then click the name of your new event hub.
2. In the event hub blade, click Shared access policies and then click + Add.
NOTE
Make sure you're working with the event hub, not the event hub namespace.
4. Click Create.
5. After the policy has been deployed, click it in the list of shared access policies.
6. Find the box labeled CONNECTION STRING-PRIMARY KEY and click the copy button next to the
connection string.
7. Paste the connection string into a text editor. You need this connection string for the next section, after you
make some small edits to it.
The connection string looks like this:
Endpoint=sb://YOURNAME-socialtwitter-eh-ns.servicebus.windows.net/;SharedAccessKeyName=socialtwitter-
access;SharedAccessKey=Gw2NFZw6r...FxKbXaC2op6a0ZsPkI=;EntityPath=socialtwitter-eh
Notice that the connection string contains multiple key-value pairs, separated with semicolons: Endpoint ,
SharedAccessKeyName , SharedAccessKey , and EntityPath .
NOTE
For security, parts of the connection string in the example have been removed.
8. In the text editor, remove the EntityPath pair from the connection string (don't forget to remove the
semicolon that precedes it). When you're done, the connection string looks like this:
Endpoint=sb://YOURNAME-socialtwitter-eh-ns.servicebus.windows.net/;SharedAccessKeyName=socialtwitter-
access;SharedAccessKey=Gw2NFZw6r...FxKbXaC2op6a0ZsPkI=
NOTE
The exact process in Twitter for creating an application and getting the keys, secrets, and token might change. If these
instructions don't match what you see on the Twitter site, refer to the Twitter developer documentation.
Leave the callback field blank. The client application you use for this tutorial doesn't require callbacks.
3. Optionally, change the application's permissions to read-only.
4. When the application is created, go to the Keys and Access Tokens page.
5. Click the button to generate an access token and access token secret.
Keep this information handy, because you will need it in the next procedure.
NOTE
The keys and secrets for the Twitter application provide access to your Twitter account. Treat this information as sensitive, the
same as you do your Twitter password. For example, don't embed this information in an application that you give to others.
3. To set the values persistently, use a text editor to open the TwitterWpfClient.exe.config file. Then in the
<appSettings> element, do this:
NOTE
If you see errors, and you don't see a stream of tweets displayed in the lower part of the window, double-check the
keys and secrets. Also check the connection string (make sure that it does not include the EntityPath key and
value.)
Event hub: Select the event hub that you created earlier ( socialtwitter-eh ).
Event hub policy name: Select the access policy that you created earlier ( socialtwitter-access ).
3. Click Create.
Azure samples 3 minutes' worth of data from the input stream and notifies you when the sample data is
ready. (This takes a short while.)
The sample data is stored temporarily and is available while you have the query window open. If you close
the query window, the sample data is discarded, and you have to create a new set of sample data.
6. Change the query in the code editor to the following:
If didn't use TwitterStream as the alias for the input, substitute your alias for TwitterStream in the query.
This query uses the TIMESTAMP BY keyword to specify a timestamp field in the payload to be used in the
temporal computation. If this field isn't specified, the windowing operation is performed by using the time
that each event arrived at the event hub. Learn more in the "Arrival Time vs Application Time" section of
Stream Analytics Query Reference.
This query also accesses a timestamp for the end of each window by using the System.Timestamp
property.
7. Click Test. The query runs against the data that you sampled.
8. Click Save. This saves the query as part of the Streaming Analytics job. (It doesn't save the sample data.)
3. In the Start job blade, for Job output start time, select Now and then click Start.
Azure notifies you when the job has started, and in the job blade, the status is displayed as Running.
4. Click Save.
5. Make sure that the TwitterWpfClient application is running.
6. Click Start to restart the job using the new query.
Get support
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Get started using Azure Stream Analytics: Real-
time fraud detection
4/30/2019 • 17 minutes to read • Edit Online
This tutorial provides an end-to-end illustration of how to use Azure Stream Analytics. You learn how to:
Bring streaming events into an instance of Azure Event Hubs. In this tutorial, you'll use an app that
simulates a stream of mobile-phone metadata records.
Write SQL -like Stream Analytics queries to transform data, aggregating information or looking for
patterns. You will see how to use a query to examine the incoming stream and look for calls that might
be fraudulent.
Send the results to an output sink (storage) that you can analyze for additional insights. In this case,
you'll send the suspicious call data to Azure Blob storage.
This tutorial uses the example of real-time fraud detection based on phone-call data. The technique illustrated
is also suited for other types of fraud detection, such as credit card fraud or identity theft.
Prerequisites
In this tutorial, you'll simulate phone-call data by using a client app that generates sample phone call
metadata. Some of the records that the app produces look like fraudulent calls.
Before you start, make sure you have the following:
An Azure account.
The call-event generator app, TelcoGenerator.zip, which can be downloaded from the Microsoft
Download Center. Unzip this package into a folder on your computer. If you want to see the source
code and run the app in a debugger, you can get the app source code from GitHub.
NOTE
Windows might block the downloaded .zip file. If you can't unzip it, right-click the file and select Properties. If
you see the "This file came from another computer and might be blocked to help protect this computer"
message, select the Unblock option and then click Apply.
If you want to examine the results of the Streaming Analytics job, you also need a tool for viewing the
contents of an Azure Blob Storage container. If you use Visual Studio, you can use Azure Tools for Visual
Studio or Visual Studio Cloud Explorer. Alternatively, you can install standalone tools like Azure Storage
Explorer or Cerulean.
Create an Azure Event Hubs to ingest events
To analyze a data stream, you ingest it into Azure. A typical way to ingest data is to use Azure Event Hubs,
which lets you ingest millions of events per second and then process and store the event information. For this
tutorial, you will create an event hub and then have the call-event generator app send call data to that event
hub. For more about event hubs, see the Azure Service Bus documentation.
NOTE
For a more detailed version of this procedure, see Create an Event Hubs namespace and an event hub using the Azure
portal.
6. Name the new event hub asa-eh-frauddetection-demo . You can use a different name. If you do, make a
note of it, because you need the name later. You don't need to set any other options for the event hub
right now.
7. Click Create.
Grant access to the event hub and get a connection string
Before a process can send data to an event hub, the event hub must have a policy that allows appropriate
access. The access policy produces a connection string that includes authorization information.
1. In the event namespace pane, click Event Hubs and then click the name of your new event hub.
2. In the event hub pane, click Shared access policies and then click + Add.
NOTE
Make sure you're working with the event hub, not the event hub namespace.
4. Click Create.
5. After the policy has been deployed, click it in the list of shared access policies.
6. Find the box labeled CONNECTION STRING-PRIMARY KEY and click the copy button next to the
connection string.
7. Paste the connection string into a text editor. You need this connection string for the next section, after
you make some small edits to it.
The connection string looks like this:
Endpoint=sb://YOURNAME-eh-ns-demo.servicebus.windows.net/;SharedAccessKeyName=asa-policy-manage-
demo;SharedAccessKey=Gw2NFZwU1Di+rxA2T+6hJYAtFExKRXaC2oSQa0ZsPkI=;EntityPath=asa-eh-frauddetection-
demo
Notice that the connection string contains multiple key-value pairs, separated with semicolons:
Endpoint , SharedAccessKeyName , SharedAccessKey , and EntityPath .
```cmd
telcodatagen.exe 1000 0.2 2
```
RECORD DEFINITION
SwitchNum The telephone switch used to connect the call. For this
example, the switches are strings that represent the
country/region of origin (US, China, UK, Germany, or
Australia).
3. Click Create.
The job is created and the portal displays job details. Nothing is running yet, though—you have to
configure the job before it can be started.
Configure job input
1. In the dashboard or the All resources pane, find and select the asa_frauddetection_job_demo Stream
Analytics job.
2. In the Overview section of the Stream Analytics job pane, click the Input box.
3. Click Add stream input and select Event Hub. Then fill the New input page with the following
information:
SETTING SUGGESTED VALUE DESCRIPTION
Event Hub namespace asa-eh-ns-demo Enter the name of the Event Hub
namespace.
Event Hub name asa-eh-frauddetection-demo Select the name of your Event Hub.
Event Hub policy name asa-policy-manage-demo Select the access policy that you
created earlier.
4. Click Create.
3. Click the Query box. Azure lists the inputs and outputs that are configured for the job, and lets you
create a query that lets you transform the input stream as it is sent to the output.
4. In the Query pane, click the dots next to the CallStream input and then select Sample data from
input.
Azure samples 3 minutes' worth of data from the input stream and notifies you when the sample data
is ready. (This takes a short while.)
The sample data is stored temporarily and is available while you have the query window open. If you close
the query window, the sample data is discarded, and you'll have to create a new set of sample data.
As an alternative, you can get a .json file that has sample data in it from GitHub, and then upload that .json file
to use as sample data for the CallStream input.
Test using a pass-through query
If you want to archive every event, you can use a pass-through query to read all the fields in the payload of
the event.
1. In the query window, enter this query:
SELECT
*
FROM
CallStream
NOTE
As with SQL, keywords are not case-sensitive, and whitespace is not significant.
In this query, CallStream is the alias that you specified when you created the input. If you used a
different alias, use that name instead.
2. Click Test.
The Stream Analytics job runs the query against the sample data and displays the output at the bottom
of the window. The results indicate that the Event Hub and the Streaming Analytics job are configured
correctly. (As noted, later you'll create an output sink that the query can write data to.)
The exact number of records you see will depend on how many records were captured in your 3-
minute sample.
Reduce the number of fields using a column projection
In many cases, your analysis doesn't need all the columns from the input stream. You can use a query to
project a smaller set of returned fields than in the pass-through query.
1. Change the query in the code editor to the following:
```SQL
SELECT
System.Timestamp as WindowEnd, SwitchNum, COUNT(*) as CallCount
FROM
CallStream TIMESTAMP BY CallRecTime
GROUP BY TUMBLINGWINDOW(s, 5), SwitchNum
```
This query uses the Timestamp By keyword in the FROM clause to specify which timestamp field in the
input stream to use to define the Tumbling window. In this case, the window divides the data into
segments by the CallRecTime field in each record. (If no field is specified, the windowing operation
uses the time that each event arrives at the event hub. See "Arrival Time Vs Application Time" in
Stream Analytics Query Language Reference.
The projection includes System.Timestamp , which returns a timestamp for the end of each window.
To specify that you want to use a Tumbling window, you use the TUMBLINGWINDOW function in the
GROUP BY clause. In the function, you specify a time unit (anywhere from a microsecond to a day) and a
window size (how many units). In this example, the Tumbling window consists of 5-second intervals,
so you will get a count by country/region for every 5 seconds' worth of calls.
2. Click Test again. In the results, notice that the timestamps under WindowEnd are in 5-second
increments.
Detect SIM fraud using a self-join
For this example, consider fraudulent usage to be calls that originate from the same user but in different
locations within 5 seconds of one another. For example, the same user can't legitimately make a call from the
US and Australia at the same time.
To check for these cases, you can use a self-join of the streaming data to join the stream to itself based on the
CallRecTime value. You can then look for call records where the CallingIMSI value (the originating number )
is the same, but the SwitchNum value (country/region of origin) is not the same.
When you use a join with streaming data, the join must provide some limits on how far the matching rows
can be separated in time. (As noted earlier, the streaming data is effectively endless.) The time bounds for the
relationship are specified inside the ON clause of the join, using the DATEDIFF function. In this case, the join is
based on a 5-second interval of call data.
1. Change the query in the code editor to the following:
```SQL
SELECT System.Timestamp as Time,
CS1.CallingIMSI,
CS1.CallingNum as CallingNum1,
CS2.CallingNum as CallingNum2,
CS1.SwitchNum as Switch1,
CS2.SwitchNum as Switch2
FROM CallStream CS1 TIMESTAMP BY CallRecTime
JOIN CallStream CS2 TIMESTAMP BY CallRecTime
ON CS1.CallingIMSI = CS2.CallingIMSI
AND DATEDIFF(ss, CS1, CS2) BETWEEN 1 AND 5
WHERE CS1.SwitchNum != CS2.SwitchNum
```
This query is like any SQL join except for the DATEDIFF function in the join. This version of DATEDIFF is
specific to Streaming Analytics, and it must appear in the ON...BETWEEN clause. The parameters are a
time unit (seconds in this example) and the aliases of the two sources for the join. This is different from
the standard SQL DATEDIFF function.
The WHERE clause includes the condition that flags the fraudulent call: the originating switches are not
the same.
2. Click Test again.
3. Click Save to save the self-join query as part of the Streaming Analytics job. (It doesn't save the
sample data.)
5. Click Save.
Clean up resources
There are additional articles that continue with the fraud-detection scenario and use the resources you've
created in this tutorial. If you want to continue, see the suggestions under Next steps.
However, if you're done and you don't need the resources you've created, you can delete them so that you
don't incur unnecessary Azure charges. In that case, we suggest that you do the following:
1. Stop the Streaming Analytics job. In the Jobs pane, click Stop at the top.
2. Stop the Telco Generator app. In the command window where you started the app, press Ctrl+C.
3. If you created a new blob storage account just for this tutorial, delete it.
4. Delete the Streaming Analytics job.
5. Delete the event hub.
6. Delete the event hub namespace.
Get support
For further assistance, try the Azure Stream Analytics forum.
Next steps
You can continue this tutorial with the following article:
Stream Analytics and Power BI: A real-time analytics dashboard for streaming data. This article shows you
how to send the TelCo output of the Stream Analytics job to Power BI for real-time visualization and
analysis.
For more information about Stream Analytics in general, see these articles:
Introduction to Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Azure Stream Analytics on IoT Edge
4/3/2019 • 9 minutes to read • Edit Online
Azure Stream Analytics (ASA) on IoT Edge empowers developers to deploy near-real-time analytical intelligence
closer to IoT devices so that they can unlock the full value of device-generated data. Azure Stream Analytics is
designed for low latency, resiliency, efficient use of bandwidth, and compliance. Enterprises can now deploy
control logic close to the industrial operations and complement Big Data analytics done in the cloud.
Azure Stream Analytics on IoT Edge runs within the Azure IoT Edge framework. Once the job is created in ASA,
you can deploy and manage it using IoT Hub.
Scenarios
Low-latency command and control: For example, manufacturing safety systems must respond to
operational data with ultra-low latency. With ASA on IoT Edge, you can analyze sensor data in near real-time,
and issue commands when you detect anomalies to stop a machine or trigger alerts.
Limited connectivity to the cloud: Mission critical systems, such as remote mining equipment, connected
vessels, or offshore drilling, need to analyze and react to data even when cloud connectivity is intermittent.
With ASA, your streaming logic runs independently of the network connectivity and you can choose what you
send to the cloud for further processing or storage.
Limited bandwidth: The volume of data produced by jet engines or connected cars can be so large that data
must be filtered or pre-processed before sending it to the cloud. Using ASA, you can filter or aggregate the
data that needs to be sent to the cloud.
Compliance: Regulatory compliance may require some data to be locally anonymized or aggregated before
being sent to the cloud.
Installation instructions
The high-level steps are described in the following table. More details are given in the following sections.
STEP NOTES
4 Deploy ASA on your IoT Edge ASA job definition is exported to the
device(s) storage container created earlier.
You can follow this step-by-step tutorial to deploy your first ASA job on IoT Edge. The following video should help
you understand the process to run a Stream Analytics job on an IoT edge device:
NOTE
This tutorial focuses on ASA job creation using Azure portal. You can also use Visual Studio plugin to create an ASA Edge job
1. From the Azure portal, create a new "Stream Analytics job". Direct link to create a new ASA job here.
2. In the creation screen, select Edge as hosting environment (see the following picture)
3. Job Definition
a. Define Input Stream (s). Define one or several input streams for your job.
b. Define Reference data (optional).
c. Define Output Stream (s). Define one or several outputs streams for your job.
d. Define query. Define the ASA query in the cloud using the inline editor. The compiler automatically
checks the syntax enabled for ASA edge. You can also test your query by uploading sample data.
4. Set the storage container information in the IoT Edge settings menu.
5. Set optional settings
a. Event ordering. You can configure out-of-order policy in the portal. Documentation is available here.
b. Locale. Set the internalization format.
NOTE
When a deployment is created, ASA exports the job definition to a storage container. This job definition remain the same
during the duration of a deployment. As a consequence, if you want to update a job running on the edge, you need to edit
the job in ASA, and then create a new deployment in IoT Hub.
In the Azure portal, open IoT Hub, navigate to IoT Edge and click on the device you want to target for this
deployment.
Select Set modules, then select + Add and choose Azure Stream Analytics Module.
Select the subscription and the ASA Edge job that you created. Click Save.
NOTE
During this step, ASA creates a folder named "EdgeJobs" in the storage container (if it does not exist already). For each
deployment, a new subfolder is created in the "EdgeJobs" folder. In order to deploy your job to edge devices, ASA creates a
shared access signature (SAS) for the job definition file. The SAS key is securely transmitted to the IoT Edge devices using
device twin. The expiration of this key is three years from the day of its creation.
For more information about IoT Edge deployments, see to this page.
C o n fi g u r e r o u t e s
IoT Edge provides a way to declaratively route messages between modules, and between modules and IoT Hub.
The full syntax is described here. Names of the inputs and outputs created in the ASA job can be used as
endpoints for routing.
E x a mp l e
{
"routes": {
"sensorToAsa": "FROM /messages/modules/tempSensor/* INTO
BrokeredEndpoint(\"/modules/ASA/inputs/temperature\")",
"alertsToCloud": "FROM /messages/modules/ASA/* INTO $upstream",
"alertsToReset": "FROM /messages/modules/ASA/* INTO
BrokeredEndpoint(\"/modules/tempSensor/inputs/control\")"
}
}
This example shows the routes for the scenario described in the following picture. It contains an edge job called
"ASA", with an input named "temperature" and an output named "alert".
Technical information
Current limitations for IoT Edge jobs compared to cloud jobs
The goal is to have parity between IoT Edge jobs and cloud jobs. Most SQL query language features are
supported, enabling to run the same logic on both cloud and IoT Edge. However the following features are not yet
supported for edge jobs:
User-defined functions (UDF ) in JavaScript. UDF are available in C# for IoT Edge jobs (preview ).
User-defined aggregates (UDA).
Azure ML functions
Using more than 14 aggregates in a single step.
AVRO format for input/output. At this time, only CSV and JSON are supported.
The following SQL operators:
PARTITION BY
GetMetadataPropertyValue
Runtime and hardware requirements
To run ASA on IoT Edge, you need devices that can run Azure IoT Edge.
ASA and Azure IoT Edge use Docker containers to provide a portable solution that runs on multiple host
operating systems (Windows, Linux).
ASA on IoT Edge is made available as Windows and Linux images, running on both x86-64 or ARM (Advanced
RISC Machines) architectures.
Input and output
Input and Output Streams
ASA Edge jobs can get inputs and outputs from other modules running on IoT Edge devices. To connect from and
to specific modules, you can set the routing configuration at deployment time. More information is described on
the IoT Edge module composition documentation.
For both inputs and outputs, CSV and JSON formats are supported.
For each input and output stream you create in your ASA job, a corresponding endpoint is created on your
deployed module. These endpoints can be used in the routes of your deployment.
At present, the only supported stream input and stream output types are Edge Hub. Reference input supports
reference file type. Other outputs can be reached using a cloud job downstream. For example, a Stream Analytics
job hosted in Edge sends output to Edge Hub, which can then send output to IoT Hub. You can use a second cloud
hosted Azure Stream Analytics job with input from IoT Hub and output to Power BI or another output type.
R e fe r e n c e d a t a
Reference data (also known as a lookup table) is a finite data set that is static or slow changing in nature. It is used
to perform a lookup or to correlate with your data stream. To make use of reference data in your Azure Stream
Analytics job, you will generally use a Reference Data JOIN in your query. For more information, see the Using
reference data for lookups in Stream Analytics.
Only local reference data is supported. When a job is deployed to IoT Edge device, it loads reference data from the
user defined file path.
To create a job with reference data on Edge:
1. Create a new input for your job.
2. Choose Reference data as the Source Type.
3. Have a reference data file ready on the device. For a Windows container, put the reference data file on the
local drive and share the local drive with the Docker container. For a Linux container, create a Docker
volume and populate the data file to the volume.
4. Set the file path. For Windows Host OS and Windows container, use the absolute path:
E:\<PathToFile>\v1.csv . For a Windows Host OS and Linux container or a Linux OS and Linux container,
use the path in the volume: <VolumeName>/file1.txt .
The reference data on IoT Edge update is triggered by a deployment. Once triggered, the ASA module picks the
updated data without stopping the running job.
There are two ways to update the reference data:
Update reference data path in your ASA job from Azure portal.
Update the IoT Edge deployment.
Get help
For further assistance, try the Azure Stream Analytics forum.
Next steps
More information on Azure Iot Edge
ASA on IoT Edge tutorial
Develop Stream Analytics Edge jobs using Visual Studio tools
Implement CI/CD for Stream Analytics using APIs
Build an IoT solution by using Stream Analytics
1/15/2019 • 12 minutes to read • Edit Online
Introduction
In this solution, you learn how to use Azure Stream Analytics to get real-time insights from your data. Developers
can easily combine streams of data, such as click-streams, logs, and device-generated events, with historical
records or reference data to derive business insights. As a fully managed, real-time stream computation service
that's hosted in Microsoft Azure, Azure Stream Analytics provides built-in resiliency, low latency, and scalability to
get you up and running in minutes.
After completing this solution, you are able to:
Familiarize yourself with the Azure Stream Analytics portal.
Configure and deploy a streaming job.
Articulate real-world problems and solve them by using the Stream Analytics query language.
Develop streaming solutions for your customers by using Stream Analytics with confidence.
Use the monitoring and logging experience to troubleshoot issues.
Prerequisites
You need the following prerequisites to complete this solution:
An Azure subscription
Incoming data
This solution works with two streams of data. Sensors installed in the entrance and exit of the toll stations produce
the first stream. The second stream is a static lookup dataset that has vehicle registration data.
Entry data stream
The entry data stream contains information about cars as they enter toll stations. The exit data events are live
streamed into an Event Hub queue from a Web App included in the sample app.
COLUMN DESCRIPTION
EntryTime The date and time of entry of the vehicle to the toll booth in
UTC
COLUMN DESCRIPTION
ExitTime The date and time of exit of the vehicle from toll booth in UTC
COLUMN DESCRIPTION
To paraphrase the intent of the query, let’s say that you need to count the number of vehicles that enter a toll
booth. Because a highway toll booth has a continuous stream of vehicles entering, those are entrance events
are analogous to a stream that never stops. To quantify the stream, you have to define a "period of time" to
measure over. Let's refine the question further, to "How many vehicles enter a toll booth every three
minutes?" This is commonly referred to as the tumbling count.
As you can see, Azure Stream Analytics uses a query language that's like SQL and adds a few extensions to
specify time-related aspects of the query. For more details, read about Time Management and Windowing
constructs used in the query.
3. Examine the Inputs of the TollApp sample job. Only the EntryStream input is used in the current query.
EntryStream input is an Event Hub connection that queues data representing each time a car enters a
tollbooth on the highway. A web app that is part of the sample is creating the events, and that data is
queued in this Event Hub. Note that this input is queried in the FROM clause of the streaming query.
ExitStream input is an Event Hub connection that queues data representing each time a car exits a
tollbooth on the highway. This streaming input is used in later variations of the query syntax.
Registration input is an Azure Blob storage connection, pointing to a static registration.json file, used for
lookups as needed. This reference data input is used in later variations of the query syntax.
4. Examine the Outputs of the TollApp sample job.
Cosmos DB output is a Cosmos database collection that receives the output sink events. Note that this
output is used in INTO clause of the streaming query.
1. Repeat the steps in the preceding section to update the TollApp streaming job query syntax.
2. Repeat the steps in the preceding section to review the CosmosDB output data from the streaming job.
Example output:
{
"entrytime": "2018-04-05T08:01:28.0252168Z",
"licenseplate": "GMT 3221",
"tollid": 1,
"registrationid": "763220582",
"id": "47db0535-9716-4eb2-db58-de7886966cbf",
"_rid": "y+F8AJ9QWACSAQAAAAAAAA==",
"_self": "dbs/y+F8AA==/colls/y+F8AJ9QWAA=/docs/y+F8AJ9QWACSAQAAAAAAAA==/",
"_etag": "\"88007d8d-0000-0000-0000-5ac5d7e20000\"",
"_attachments": "attachments/",
"_ts": 1522915298
}
You can access Activity Logs from the job dashboard Settings area as well.
Conclusion
This solution introduced you to the Azure Stream Analytics service. It demonstrated how to configure inputs and
outputs for the Stream Analytics job. Using the Toll Data scenario, the solution explained common types of
problems that arise in the space of data in motion and how they can be solved with simple SQL -like queries in
Azure Stream Analytics. The solution described SQL extension constructs for working with temporal data. It
showed how to join data streams, how to enrich the data stream with static reference data, and how to scale out a
query to achieve higher throughput.
Although this solution provides a good introduction, it is not complete by any means. You can find more query
patterns using the SAQL language at Query examples for common Stream Analytics usage patterns.
Azure Stream Analytics JavaScript user-defined
aggregates (Preview)
1/28/2019 • 5 minutes to read • Edit Online
Azure Stream Analytics supports user-defined aggregates (UDA) written in JavaScript, it enables you to implement
complex stateful business logic. Within UDA you have full control of the state data structure, state accumulation,
state decumulation, and aggregate result computation. The article introduces the two different JavaScript UDA
interfaces, steps to create a UDA, and how to use UDA with window -based operations in Stream Analytics query.
this.computeResult = function () {
return this.state;
}
}
AccumulateDeaccumulate aggregates
AccumulateDeaccumulate aggregates allow deaccumulation of a previous accumulated value from the state, for
example, remove a key-value pair from a list of event values, or subtract a value from a state of sum aggregate.
Following is the JavaScript template for AccumulateDeaccumulate aggregates:
// Sample UDA which state can be accumulated and deaccumulated.
function main() {
this.init = function () {
this.state = 0;
}
this.computeResult = function () {
return this.state;
}
}
this.computeResult = function () {
if(this.totalValue == 0) {
result = 0;
}
else {
result = this.totalValue/this.totalWeight;
}
return result;
}
}
6. Once you click the "Save" button, your UDA shows up on the function list.
7. Click on the new function "TWA", you can check the function definition.
WITH value AS
(
SELECT
NoiseLevelDB as level,
DurationSecond as weight
FROM
[YourInputAlias] TIMESTAMP BY EntryTime
)
SELECT
System.Timestamp as ts,
uda.TWA(value) as NoseDoseTWA
FROM value
GROUP BY TumblingWindow(minute, 5)
[
{"EntryTime": "2017-06-10T05:01:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 22.0},
{"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 81, "DurationSecond": 37.8},
{"EntryTime": "2017-06-10T05:02:00-07:00", "NoiseLevelDB": 85, "DurationSecond": 26.3},
{"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 95, "DurationSecond": 13.7},
{"EntryTime": "2017-06-10T05:03:00-07:00", "NoiseLevelDB": 88, "DurationSecond": 10.3},
{"EntryTime": "2017-06-10T05:05:00-07:00", "NoiseLevelDB": 103, "DurationSecond": 5.5},
{"EntryTime": "2017-06-10T05:06:00-07:00", "NoiseLevelDB": 99, "DurationSecond": 23.0},
{"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 1.76},
{"EntryTime": "2017-06-10T05:07:00-07:00", "NoiseLevelDB": 79, "DurationSecond": 17.9},
{"EntryTime": "2017-06-10T05:08:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 27.1},
{"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 91, "DurationSecond": 17.1},
{"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 115, "DurationSecond": 7.9},
{"EntryTime": "2017-06-10T05:09:00-07:00", "NoiseLevelDB": 80, "DurationSecond": 28.3},
{"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 55, "DurationSecond": 18.2},
{"EntryTime": "2017-06-10T05:10:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 25.8},
{"EntryTime": "2017-06-10T05:11:00-07:00", "NoiseLevelDB": 83, "DurationSecond": 11.4},
{"EntryTime": "2017-06-10T05:12:00-07:00", "NoiseLevelDB": 89, "DurationSecond": 7.9},
{"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 112, "DurationSecond": 3.7},
{"EntryTime": "2017-06-10T05:15:00-07:00", "NoiseLevelDB": 93, "DurationSecond": 9.7},
{"EntryTime": "2017-06-10T05:18:00-07:00", "NoiseLevelDB": 96, "DurationSecond": 3.7},
{"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 108, "DurationSecond": 0.99},
{"EntryTime": "2017-06-10T05:20:00-07:00", "NoiseLevelDB": 113, "DurationSecond": 25.1},
{"EntryTime": "2017-06-10T05:22:00-07:00", "NoiseLevelDB": 110, "DurationSecond": 5.3}
]
Get help
For additional help, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics query language reference
Azure Stream Analytics management REST API reference
High-frequency trading simulation with Stream
Analytics
12/7/2018 • 12 minutes to read • Edit Online
The combination of SQL language and JavaScript user-defined functions (UDFs) and user-defined aggregates
(UDAs) in Azure Stream Analytics enables users to perform advanced analytics. Advanced analytics might include
online machine learning training and scoring, as well as stateful process simulation. This article describes how to
perform linear regression in an Azure Stream Analytics job that does continuous training and scoring in a high-
frequency trading scenario.
High-frequency trading
The logical flow of high-frequency trading is about:
1. Getting real-time quotes from a security exchange.
2. Building a predictive model around the quotes, so we can anticipate the price movement.
3. Placing buy or sell orders to make money from the successful prediction of the price movements.
As a result, we need:
A real-time quote feed.
A predictive model that can operate on the real-time quotes.
A trading simulation that demonstrates the profit or loss of the trading algorithm.
Real-time quote feed
IEX offers free real-time bid and ask quotes by using socket.io. A simple console program can be written to receive
real-time quotes and push to Azure Event Hubs as a data source. The following code is a skeleton of the program.
The code omits error handling for brevity. You also need to include SocketIoClientDotNet and
WindowsAzure.ServiceBus NuGet packages in your project.
using Quobject.SocketIoClientDotNet.Client;
using Microsoft.ServiceBus.Messaging;
var symbols = "msft,fb,amzn,goog";
var eventHubClient = EventHubClient.CreateFromConnectionString(connectionString, eventHubName);
var socket = IO.Socket("https://ws-api.iextrading.com/1.0/tops");
socket.On(Socket.EVENT_MESSAGE, (message) =>
{
eventHubClient.Send(new EventData(Encoding.UTF8.GetBytes((string)message)));
});
socket.On(Socket.EVENT_CONNECT, () =>
{
socket.Emit("subscribe", symbols);
});
NOTE
The time stamp of the event is lastUpdated, in epoch time.
Now, let's express the training and prediction operations in an Azure Stream Analytics job.
First, the inputs are cleaned up. Epoch time is converted to datetime via DATEADD. TRY_CAST is used to coerce
data types without failing the query. It's always a good practice to cast input fields to the expected data types, so
there is no unexpected behavior in manipulation or comparison of the fields.
WITH
typeconvertedquotes AS (
/* convert all input fields to proper types */
SELECT
System.Timestamp AS lastUpdated,
symbol,
DATEADD(millisecond, CAST(lastSaleTime as bigint), '1970-01-01T00:00:00Z') AS lastSaleTime,
TRY_CAST(bidSize as bigint) AS bidSize,
TRY_CAST(bidPrice as float) AS bidPrice,
TRY_CAST(askSize as bigint) AS askSize,
TRY_CAST(askPrice as float) AS askPrice,
TRY_CAST(volume as bigint) AS volume,
TRY_CAST(lastSaleSize as bigint) AS lastSaleSize,
TRY_CAST(lastSalePrice as float) AS lastSalePrice
FROM quotes TIMESTAMP BY DATEADD(millisecond, CAST(lastUpdated as bigint), '1970-01-01T00:00:00Z')
),
timefilteredquotes AS (
/* filter between 7am and 1pm PST, 14:00 to 20:00 UTC */
/* clean up invalid data points */
SELECT * FROM typeconvertedquotes
WHERE DATEPART(hour, lastUpdated) >= 14 AND DATEPART(hour, lastUpdated) < 20 AND bidSize > 0 AND askSize > 0
AND bidPrice > 0 AND askPrice > 0
),
Next, we use the LAG function to get values from the last tick. One hour of LIMIT DURATION value is arbitrarily
chosen. Given the quote frequency, it's safe to assume that you can find the previous tick by looking back one hour.
shiftedquotes AS (
/* get previous bid/ask price and size in order to calculate VOI */
SELECT
symbol,
(bidPrice + askPrice)/2 AS midPrice,
bidPrice,
bidSize,
askPrice,
askSize,
LAG(bidPrice) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS bidPricePrev,
LAG(bidSize) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS bidSizePrev,
LAG(askPrice) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS askPricePrev,
LAG(askSize) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS askSizePrev
FROM timefilteredquotes
),
We can then compute VOI value. We filter out the null values if the previous tick doesn't exist, just in case.
currentPriceAndVOI AS (
/* calculate VOI */
SELECT
symbol,
midPrice,
(CASE WHEN (bidPrice < bidPricePrev) THEN 0
ELSE (CASE WHEN (bidPrice = bidPricePrev) THEN (bidSize - bidSizePrev) ELSE bidSize END)
END) -
(CASE WHEN (askPrice < askPricePrev) THEN askSize
ELSE (CASE WHEN (askPrice = askPricePrev) THEN (askSize - askSizePrev) ELSE 0 END)
END) AS VOI
FROM shiftedquotes
WHERE
bidPrice IS NOT NULL AND
bidSize IS NOT NULL AND
askPrice IS NOT NULL AND
askSize IS NOT NULL AND
bidPricePrev IS NOT NULL AND
bidSizePrev IS NOT NULL AND
askPricePrev IS NOT NULL AND
askSizePrev IS NOT NULL
),
Now, we use LAG again to create a sequence with 2 consecutive VOI values, followed by 10 consecutive mid-price
values.
shiftedPriceAndShiftedVOI AS (
/* get 10 future prices and 2 previous VOIs */
SELECT
symbol,
midPrice AS midPrice10,
LAG(midPrice, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice9,
LAG(midPrice, 2) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice8,
LAG(midPrice, 3) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice7,
LAG(midPrice, 4) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice6,
LAG(midPrice, 5) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice5,
LAG(midPrice, 6) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice4,
LAG(midPrice, 7) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice3,
LAG(midPrice, 8) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice2,
LAG(midPrice, 9) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice1,
LAG(midPrice, 10) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS midPrice,
LAG(VOI, 10) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI1,
LAG(VOI, 11) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI2
FROM currentPriceAndVOI
),
We then reshape the data into inputs for a two-variable linear model. Again, we filter out the events where we
don't have all the data.
modelInput AS (
/* create feature vector, x being VOI, y being delta price */
SELECT
symbol,
(midPrice1 + midPrice2 + midPrice3 + midPrice4 + midPrice5 + midPrice6 + midPrice7 + midPrice8 + midPrice9 +
midPrice10)/10.0 - midPrice AS y,
VOI1 AS x1,
VOI2 AS x2
FROM shiftedPriceAndShiftedVOI
WHERE
midPrice1 IS NOT NULL AND
midPrice2 IS NOT NULL AND
midPrice3 IS NOT NULL AND
midPrice4 IS NOT NULL AND
midPrice5 IS NOT NULL AND
midPrice6 IS NOT NULL AND
midPrice7 IS NOT NULL AND
midPrice8 IS NOT NULL AND
midPrice9 IS NOT NULL AND
midPrice10 IS NOT NULL AND
midPrice IS NOT NULL AND
VOI1 IS NOT NULL AND
VOI2 IS NOT NULL
),
Because Azure Stream Analytics doesn't have a built-in linear regression function, we use SUM and AVG
aggregates to compute the coefficients for the linear model.
modelagg AS (
/* get aggregates for linear regression calculation,
http://faculty.cas.usf.edu/mbrannick/regression/Reg2IV.html */
SELECT
symbol,
SUM(x1 * x1) AS x1x1,
SUM(x2 * x2) AS x2x2,
SUM(x1 * y) AS x1y,
SUM(x2 * y) AS x2y,
SUM(x1 * x2) AS x1x2,
AVG(y) AS avgy,
AVG(x1) AS avgx1,
AVG(x2) AS avgx2
FROM modelInput
GROUP BY symbol, TumblingWindow(hour, 24, -4)
),
modelparambs AS (
/* calculate b1 and b2 for the linear model */
SELECT
symbol,
(x2x2 * x1y - x1x2 * x2y)/(x1x1 * x2x2 - x1x2 * x1x2) AS b1,
(x1x1 * x2y - x1x2 * x1y)/(x1x1 * x2x2 - x1x2 * x1x2) AS b2,
avgy,
avgx1,
avgx2
FROM modelagg
),
model AS (
/* calculate a for the linear model */
SELECT
symbol,
avgy - b1 * avgx1 - b2 * avgx2 AS a,
b1,
b2
FROM modelparambs
),
To use the previous day's model for current event's scoring, we want to join the quotes with the model. But instead
of using JOIN, we UNION the model events and quote events. Then we use LAG to pair the events with previous
day's model, so we can get exactly one match. Because of the weekend, we have to look back three days. If we used
a straightforward JOIN, we would get three models for every quote event.
shiftedVOI AS (
/* get two consecutive VOIs */
SELECT
symbol,
midPrice,
VOI AS VOI1,
LAG(VOI, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 1)) AS VOI2
FROM currentPriceAndVOI
),
VOIAndModel AS (
/* combine VOIs and models */
SELECT
'voi' AS type,
symbol,
midPrice,
VOI1,
VOI2,
0.0 AS a,
0.0 AS b1,
0.0 AS b2
FROM shiftedVOI
UNION
SELECT
'model' AS type,
symbol,
0.0 AS midPrice,
0 AS VOI1,
0 AS VOI2,
a,
b1,
b2
FROM model
),
VOIANDModelJoined AS (
/* match VOIs with the latest model within 3 days (72 hours, to take the weekend into account) */
SELECT
symbol,
midPrice,
VOI1 as x1,
VOI2 as x2,
LAG(a, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS a,
LAG(b1, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS b1,
LAG(b2, 1) OVER (PARTITION BY symbol LIMIT DURATION(hour, 72) WHEN type = 'model') AS b2
FROM VOIAndModel
WHERE type = 'voi'
),
Now, we can make predictions and generate buy/sell signals based on the model, with a 0.02 threshold value. A
trade value of 10 is buy. A trade value of -10 is sell.
prediction AS (
/* make prediction if there is a model */
SELECT
symbol,
midPrice,
a + b1 * x1 + b2 * x2 AS efpc
FROM VOIANDModelJoined
WHERE
a IS NOT NULL AND
b1 IS NOT NULL AND
b2 IS NOT NULL AND
x1 IS NOT NULL AND
x2 IS NOT NULL
),
tradeSignal AS (
/* generate buy/sell signals */
SELECT
DateAdd(hour, -7, System.Timestamp) AS time,
symbol,
midPrice,
efpc,
CASE WHEN (efpc > 0.02) THEN 10 ELSE (CASE WHEN (efpc < -0.02) THEN -10 ELSE 0 END) END AS trade,
DATETIMEFROMPARTS(DATEPART(year, System.Timestamp), DATEPART(month, System.Timestamp), DATEPART(day,
System.Timestamp), 0, 0, 0, 0) as date
FROM prediction
),
Trading simulation
After we have the trading signals, we want to test how effective the trading strategy is, without trading for real.
We achieve this test by using a UDA, with a hopping window, hopping every one minute. The additional grouping
on date and the having clause allow the window only accounts for events that belong to the same day. For a
hopping window across two days, the GROUP BY date separates the grouping into previous day and current day.
The HAVING clause filters out the windows that are ending on the current day but grouping on the previous day.
simulation AS
(
/* perform trade simulation for the past 7 hours to cover an entire trading day, and generate output every
minute */
SELECT
DateAdd(hour, -7, System.Timestamp) AS time,
symbol,
date,
uda.TradeSimulation(tradeSignal) AS s
FROM tradeSignal
GROUP BY HoppingWindow(minute, 420, 1), symbol, date
Having DateDiff(day, date, time) < 1 AND DATEPART(hour, time) < 13
)
The JavaScript UDA initializes all accumulators in the init function, computes the state transition with every
event added to the window, and returns the simulation results at the end of the window. The general trading
process is to:
Buy stock when a buy signal is received and there is no stocking holding.
Sell stock when a sell signal is received and there is stock holding.
Short if there is no stock holding.
If there's a short position, and a buy signal is received, we buy to cover. We never hold or short 10 shares of a stock
in this simulation. The transaction cost is a flat $8.
function main() {
function main() {
var TRADE_COST = 8.0;
var SHARES = 10;
this.init = function () {
this.own = false;
this.pos = 0;
this.pnl = 0.0;
this.tradeCosts = 0.0;
this.buyPrice = 0.0;
this.sellPrice = 0.0;
this.buySize = 0;
this.sellSize = 0;
this.buyTotal = 0.0;
this.sellTotal = 0.0;
}
this.accumulate = function (tradeSignal, timestamp) {
if(!this.own && tradeSignal.trade == 10) {
// Buy to open
this.own = true;
this.pos = 1;
this.buyPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.buySize += SHARES;
this.buyTotal += SHARES * tradeSignal.midprice;
} else if(!this.own && tradeSignal.trade == -10) {
// Sell to open
this.own = true;
this.pos = -1
this.sellPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.sellSize += SHARES;
this.sellTotal += SHARES * tradeSignal.midprice;
} else if(this.own && this.pos == 1 && tradeSignal.trade == -10) {
// Sell to close
this.own = false;
this.pos = 0;
this.sellPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.pnl += (this.sellPrice - this.buyPrice)*SHARES - 2*TRADE_COST;
this.sellSize += SHARES;
this.sellTotal += SHARES * tradeSignal.midprice;
// Sell to open
this.own = true;
this.pos = -1;
this.sellPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.sellSize += SHARES;
this.sellTotal += SHARES * tradeSignal.midprice;
} else if(this.own && this.pos == -1 && tradeSignal.trade == 10) {
// Buy to close
this.own = false;
this.pos = 0;
this.buyPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.pnl += (this.sellPrice - this.buyPrice)*SHARES - 2*TRADE_COST;
this.buySize += SHARES;
this.buyTotal += SHARES * tradeSignal.midprice;
// Buy to open
this.own = true;
this.pos = 1;
this.buyPrice = tradeSignal.midprice;
this.tradeCosts += TRADE_COST;
this.buySize += SHARES;
this.buyTotal += SHARES * tradeSignal.midprice;
}
}
this.computeResult = function () {
var result = {
"pnl": this.pnl,
"buySize": this.buySize,
"buySize": this.buySize,
"sellSize": this.sellSize,
"buyTotal": this.buyTotal,
"sellTotal": this.sellTotal,
"tradeCost": this.tradeCost
};
return result;
}
}
In this tutorial, you will learn how to create stream-processing logic to gather data from Internet of Things (IoT)
devices. We will use a real-world, Internet of Things (IoT) use case to demonstrate how to build your solution
quickly and economically.
Prerequisites
Azure subscription
Sample query and data files downloadable from GitHub
Scenario
Contoso, which is a company in the industrial automation space, has completely automated its manufacturing
process. The machinery in this plant has sensors that are capable of emitting streams of data in real time. In this
scenario, a production floor manager wants to have real-time insights from the sensor data to look for patterns and
take actions on them. We will use the Stream Analytics Query Language (SAQL ) over the sensor data to find
interesting patterns from the incoming stream of data.
Here data is being generated from a Texas Instruments sensor tag device. The payload of the data is in JSON
format and looks like the following:
{
"time": "2016-01-26T20:47:53.0000000",
"dspl": "sensorE",
"temp": 123,
"hmdt": 34
}
In a real-world scenario, you could have hundreds of these sensors generating events as a stream. Ideally, a
gateway device would run code to push these events to Azure Event Hubs or Azure IoT Hubs. Your Stream
Analytics job would ingest these events from Event Hubs and run real-time analytics queries against the streams.
Then, you could send the results to one of the supported outputs.
For ease of use, this getting started guide provides a sample data file, which was captured from real sensor tag
devices. You can run queries on the sample data and see results. In subsequent tutorials, you will learn how to
connect your job to inputs and outputs and deploy them to the Azure service.
NOTE
You should create this storage account only once per region. This storage will be shared across all Stream Analytics
jobs that are created in that region.
4. Check the box to place your job on your dashboard and then click CREATE.
5. You should see a 'Deployment started...' displayed in the top right of your browser window. Soon it will
change to a completed window as shown below.
In the Job Topology pane click the QUERY box to go to the Query Editor. The QUERY editor allows you to enter
a T-SQL query that performs the transformation over the incoming event data.
Query: Archive your raw data
The simplest form of query is a pass-through query that archives all input data to its designated output. Download
the sample data file from GitHub to a location on your computer.
1. Paste the query from the PassThrough.txt file.
2. Click the three dots next to your input and select Upload sample data from file box.
3. A pane opens on the right as a result, in it select the HelloWorldASA-InputStream.json data file from your
downloaded location and click OK at the bottom of the pane.
4. Then click the Test gear in the top left area of the window and process your test query against the sample
dataset. A results window will open below your query as the processing is complete.
You should now see results that contain only 245 rows and names of sensors where the average temperate is
greater than 100. This query groups the stream of events by dspl, which is the sensor name, over a Tumbling
Window of 30 seconds. Temporal queries must state how we want time to progress. By using the TIMESTAMP
BY clause, we have specified the OUTPUTTIME column to associate times with all temporal calculations. For
detailed information, read the MSDN articles about Time Management and Windowing functions.
Query: Detect absence of events
How can we write a query to find a lack of input events? Let’s find the last time that a sensor sent data and then did
not send events for the next 5 seconds. The query is in the AbsenceOfEvent.txt file.
Here we use a LEFT OUTER join to the same data stream (self-join). For an INNER join, a result is returned only
when a match is found. For a LEFT OUTER join, if an event from the left side of the join is unmatched, a row that
has NULL for all the columns of the right side is returned. This technique is very useful to find an absence of
events. See our MSDN documentation for more information about JOIN.
Conclusion
The purpose of this tutorial is to demonstrate how to write different Stream Analytics Query Language queries and
see results in the browser. However, this is just getting started. You can do so much more with Stream Analytics.
Stream Analytics supports a variety of inputs and outputs and can even use functions in Azure Machine Learning
to make it a robust tool for analyzing data streams. You can start to explore more about Stream Analytics by using
our learning map. For more information about how to write queries, read the article about common query
patterns.
Process configurable threshold-based rules in Azure
Stream Analytics
2/22/2019 • 6 minutes to read • Edit Online
This article describes how to use reference data to achieve an alerting solution that uses configurable threshold-
based rules in Azure Stream Analytics.
Reference data
This example reference data shows how a threshold-based rule could be represented. A JSON file holds the
reference data and is saved into Azure blob storage, and that blob storage container is used as a reference data
input named rules. You could overwrite this JSON file and replace the rule configuration as time goes on, without
stopping or starting the streaming job.
The example rule is used to represent an adjustable alert when CPU exceeds (average is greater than or equal
to) the value 90 percent. The value field is configurable as needed.
Notice the rule has an operator field, which is dynamically interpreted in the query syntax later on
AVGGREATEROREQUAL .
The rule filters the data on a certain dimension key 2 with value C1 . Other fields are empty string, indicating
not to filter the input stream by those event fields. You could set up additional CPU rules to filter other
matching fields as needed.
Not all columns are to be included in the output alert event. In this case, includedDim key number 2 is turned
on TRUE to represent that field number 2 of event data in the stream will be included in the qualifying output
events. The other fields are not included in the alert output, but the field list can be adjusted.
{
"ruleId": 1234,
"deviceId" : "978648",
"metricName": "CPU",
"alertName": "hot node AVG CPU over 90",
"operator" : "AVGGREATEROREQUAL",
"value": 90,
"includeDim": {
"0": "FALSE",
"1": "FALSE",
"2": "TRUE",
"3": "FALSE",
"4": "FALSE"
},
"filter": {
"0": "",
"1": "",
"2": "C1",
"3": "",
"4": ""
}
}
SELECT
System.Timestamp as time,
transformedInput.deviceId as deviceId,
transformedInput.ruleId as ruleId,
transformedInput.metricName as metric,
transformedInput.alertName as alert,
AVG(metric) as avg,
MIN(metric) as min,
MAX(metric) as max,
dim0, dim1, dim2, dim3, dim4
FROM
transformedInput
GROUP BY
transformedInput.deviceId,
transformedInput.ruleId,
transformedInput.metricName,
transformedInput.alertName,
dim0, dim1, dim2, dim3, dim4,
ruleOperator,
ruleValue,
TumblingWindow(second, 60)
HAVING
(
(ruleOperator = 'AVGGREATEROREQUAL' AND avg(metric) >= ruleValue) OR
(ruleOperator = 'AVGEQUALORLESS' AND avg(metric) <= ruleValue)
)
{
"eventTime": "2018-04-30T14:50:23.1324132Z",
"deviceId": "978648",
"custom": {
"dimensions": {
"0": {
"name": "NodeType",
"value": "N1"
},
"1": {
"name": "Cluster",
"value": "C1"
},
"2": {
"name": "NodeName",
"value": "N024"
}
},
"filters": {
"0": {
"name": "application",
"value": "A1"
},
"1": {
"name": "deviceType",
"value": "T1"
},
"2": {
"name": "cluster",
"value": "C1"
},
"3": {
"name": "nodeType",
"value": "N1"
}
}
},
"metric": {
"name": "CPU",
"value": 98,
"count": 1.0,
"min": 98,
"max": 98,
"stdDev": 0.0
}
}
{
"eventTime": "2018-04-30T14:50:24.1324138Z",
"deviceId": "978648",
"custom": {
"dimensions": {
"0": {
"name": "NodeType",
"value": "N2"
},
"1": {
"name": "Cluster",
"value": "C1"
},
},
"2": {
"name": "NodeName",
"value": "N024"
}
},
"filters": {
"0": {
"name": "application",
"value": "A1"
},
"1": {
"name": "deviceType",
"value": "T1"
},
"2": {
"name": "cluster",
"value": "C1"
},
"3": {
"name": "nodeType",
"value": "N2"
}
}
},
"metric": {
"name": "CPU",
"value": 95,
"count": 1,
"min": 95,
"max": 95,
"stdDev": 0
}
}
{
"eventTime": "2018-04-30T14:50:37.1324130Z",
"deviceId": "978648",
"custom": {
"dimensions": {
"0": {
"name": "NodeType",
"value": "N3"
},
"1": {
"name": "Cluster",
"value": "C1 "
},
"2": {
"name": "NodeName",
"value": "N014"
}
},
"filters": {
"0": {
"name": "application",
"value": "A1"
},
"1": {
"name": "deviceType",
"value": "T1"
},
"2": {
"name": "cluster",
"value": "C1"
},
"3": {
"name": "nodeType",
"value": "N3"
}
}
}
},
"metric": {
"name": "CPU",
"value": 80,
"count": 1,
"min": 80,
"max": 80,
"stdDev": 0
}
}
Example output
This example output JSON data shows a single alert event was produced based on the CPU threshold rule defined
in the reference data. The output event contains the name of the alert as well as the aggregated (average, min,
max) of the fields considered. The output event data includes field key number 2 NodeName value N024 due to the
rule configuration. (The JSON was altered to show line breaks for readability.)
{"time":"2018-05-01T02:03:00.0000000Z","deviceid":"978648","ruleid":1234,"metric":"CPU",
"alert":"hot node AVG CPU over 90","avg":96.5,"min":95.0,"max":98.0,
"dim0":null,"dim1":null,"dim2":"N024","dim3":null,"dim4":null}
Process Apache Kafka for Event Hubs events using
Stream analytics
3/6/2019 • 5 minutes to read • Edit Online
This article shows how to stream data into Kafka-enabled Event Hubs and process it with Azure Stream Analytics.
It walks you through the following steps:
1. Create a Kafka enabled Event Hubs namespace.
2. Create a Kafka client that sends messages to the event hub.
3. Create a Stream Analytics job that copies data from the event hub into an Azure blob storage.
You do not need to change your protocol clients or run your own clusters when you use the Kafka endpoint
exposed by an event hub. Azure Event Hubs supports Apache Kafka version 1.0. and above.
Prerequisites
To complete this quickstart, make sure you have the following prerequisites:
An Azure subscription. If you do not have one, create a free account before you begin.
Java Development Kit (JDK) 1.7+.
Download and install a Maven binary archive.
Git
An Azure Storage account. If you don't have one, create one before proceeding further. The Stream Analytics
job in this walkthrough stores the output data in an Azure blob storage.
You can now stream events from your applications that use the Kafka protocol into Event Hubs.
This code sends the event data in JSON format. When you configure input for a Stream Analytics job, you
specify JSON as the format for the input data.
7. Run the producer and stream into Kafka-enabled Event Hubs. On a Windows machine, when using a
Node.js command prompt, switch to the azure-event-hubs-for-kafka/quickstart/java/producer folder
before running these commands.
4. Wait until the status of the job changes from Starting to running.
The Azure Stream Analytics job received input data from the event hub and stored it in the Azure blob
storage in this scenario.
Next steps
In this article, you learned how to stream into Kafka-enabled Event Hubs without changing your protocol clients or
running your own clusters. To learn more about Event Hubs and Event Hubs for Kafka, see the following topic:
Learn about Event Hubs
Event Hubs for Apache Kafka
How to create Kafka enabled Event Hubs
Stream into Event Hubs from your Kafka applications
Mirror a Kafka broker in a Kafka-enabled event hub
Connect Apache Spark to a Kafka-enabled event hub
Connect Apache Flink to a Kafka-enabled event hub
Integrate Kafka Connect with a Kafka-enabled event hub
Connect Akka Streams to a Kafka-enabled event hub
Explore samples on our GitHub
Geofencing and geospatial aggregation scenarios
with Azure Stream Analytics
4/1/2019 • 4 minutes to read • Edit Online
With built-in geospatial functions, you can use Azure Stream Analytics to build applications for scenarios such as
fleet management, ride sharing, connected cars, and asset tracking.
Geofencing
Azure Stream Analytics supports low latency real-time geofencing computations in the cloud and on the IoT Edge
runtime.
Geofencing scenario
A manufacturing company needs to track assets on their buildings. They equipped every device with a GPS and
want to receive notifications if a device leaves a certain area.
Reference data used in this example has the geofence information for the buildings and the devices that are
allowed in each of the buildings. Remember that reference data could either be static or slow changing. Static
reference data is used for this scenario. A stream of data continuously emits the device ID and its current position.
Define geofences in reference data
A geofence can be defined using a GeoJSON object. For jobs with compatibility version 1.2 and higher, geofences
can also be defined using Well Known Text (WKT) as NVARCHAR(MAX) . WKT is an Open Geospatial Consortium
(OGC ) standard that is used to represent spatial data in a textual format.
The built-in geospatial functions can use defined geofences to find out if an element is in or out of a specific
geofence polygon.
The following table is an example of geofence reference data that could be stored in Azure blob storage or an
Azure SQL table. Every site is represented by a geospatial polygon, and every device is associated with an allowed
site ID.
DEVICEID GEOPOSITION
You can write a query that joins the device stream with the geofence reference data and generates an alert every
time a device is outside of an allowed building.
The following image represents the geofences. You can see where the devices are in accordance to the stream data
input.
Device "C" is located inside building ID 2, which is not allowed according to the reference data. This device should
be located inside building ID 3. Running this job will generate an alert for this specific violation.
Site with multiple allowed devices
If a site allows multiple devices, an array of device IDs can be defined in AllowedDeviceID and a User-Defined
Function can be used on the WHERE clause to verify if the stream device ID matches any device ID in that list. For
more information, view the Javascript UDF tutorial for cloud jobs and the C# UDF tutorial for edge jobs.
Geospatial aggregation
Azure Stream Analytics supports low latency real-time geospatial aggregation in the cloud and on the IoT Edge
runtime.
Geospatial aggregation scenario
A cab company wants to build a real-time application to guide their cab drivers looking for ride towards the areas
of the cities currently experiencing higher demand.
The company stores logical regions of the city as reference data. Each region is defined by a RegionID,
RegionName, and Geofence.
Define the geofences
The following table is an example of geofence reference data that could be stored in Azure blob storage or an
Azure SQL table. Every region is represented by a geospatial polygon, which is used to correlate with the requests
coming from streaming data.
These polygons are for reference only and do not represent actual city logical or physical separations.
1 "SoHo" "POLYGON((-74.00279525078275
40.72833625216264,-
74.00547745979765
40.721929158663244,-
74.00125029839018
40.71893680218994,-
73.9957785919998
40.72521409075776,-
73.9972377137039
40.72557184584898,-
74.00279525078275
40.72833625216264))"
2 "Chinatown" "POLYGON((-73.99712367114876
40.71281582267133,-
73.9901070123658
40.71336881907936,-
73.99023575839851
40.71452359088633,-
73.98976368961189
40.71554823078944,-
73.99551434573982
40.717337246783735,-
73.99480624255989
40.718491949759304,-
73.99652285632942
40.719109951574,-
73.99776740131233
40.7168005470334,-
73.99903340396736
40.71727219249899,-
74.00193018970344
40.71938642421256,-
74.00409741458748
40.71688186545551,-
74.00051398334358
40.71517415773184,-
74.0004281526551
40.714377212470005,-
73.99849696216438
40.713450141693166,-
73.99748845157478
40.71405192594819,-
73.99712367114876
40.71281582267133))"
3 "Tribeca" "POLYGON((-74.01091641815208
40.72583120006787,-
74.01338405044578
40.71436586362705,-
74.01370591552757
40.713617702123415,-
74.00862044723533
40.711308107057235,-
74.00194711120628
40.7194238654018,-
74.01091641815208
40.72583120006787))"
The following query joins the device stream with the geofence reference data and calculates the number of
requests per region on a time window of 15 minutes every minute.
This query outputs a count of requests every minute for the last 15 minutes by each region within the city. This
information can be displayed easily by Power BI dashboard, or can be broadcasted to all drivers as SMS text
messages through integration with services like Azure functions.
The image below illustrates the output of the query to Power BI dashboard.
Next steps
Introduction to Stream Analytics geospatial functions
GeoSpatial Functions (Azure Stream Analytics)
Understand Stream Analytics job monitoring and
how to monitor queries
5/9/2019 • 3 minutes to read • Edit Online
Backlogged Input Events Number of input events that are backlogged. A non-zero
value for this metric implies that your job isn't able to keep up
with the number of incoming events. If this value is slowly
increasing or consistently non-zero, you should scale out your
job. You can learn more by visiting Understand and adjust
Streaming Units.
Data Conversion Errors Number of output events that could not be converted to the
expected output schema. Error policy can be changed to
'Drop' to drop events that encounter this scenario.
Early Input Events Events whose application timestamp is earlier than their
arrival time by more than 5 minutes.
Failed Function Requests Number of failed Azure Machine Learning function calls (if
present).
Function Requests Number of calls to the Azure Machine Learning function (if
present).
Input Deserialization Errors Number of input events that could not be deserialized.
Input Event Bytes Amount of data received by the Stream Analytics job, in bytes.
This can be used to validate that events are being sent to the
input source.
Input Events Number of records deserialized from the input events. This
count does not include incoming events that result in
deserialization errors.
Input Sources Received Number of messages received by the job. For Event Hub, a
message is a single EventData. For Blob, a message is a single
blob. Please note that Input Sources are counted before
deserialization. If there are deserialization errors, input sources
can be greater than input events. Otherwise, it can be less
than or equal to input events since each message can contain
multiple events.
Late Input Events Events that arrived later than the configured late arrival
tolerance window. Learn more about Azure Stream Analytics
event order considerations .
Out-of-Order Events Number of events received out of order that were either
dropped or given an adjusted timestamp, based on the Event
Ordering Policy. This can be impacted by the configuration of
the Out of Order Tolerance Window setting.
Output Events Amount of data sent by the Stream Analytics job to the
output target, in number of events.
Watermark Delay The maximum watermark delay across all partitions of all
outputs in the job.
You can use these metrics to monitor the performance of your Stream Analytics job.
Latest output
Another interesting data point to monitor your job is the time of the last output, shown in the Overview page. This
time is the application time (i.e. the time using the timestamp from the event data) of the latest output of your job.
Get help
For further assistance, try our Azure Stream Analytics forum
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Monitor and manage Stream Analytics jobs with
Azure PowerShell cmdlets
4/4/2019 • 11 minutes to read • Edit Online
Learn how to monitor and manage Stream Analytics resources with Azure PowerShell cmdlets and powershell
scripting that execute basic Stream Analytics tasks.
NOTE
This article has been updated to use the new Azure PowerShell Az module. You can still use the AzureRM module, which will
continue to receive bug fixes until at least December 2020. To learn more about the new Az module and AzureRM
compatibility, see Introducing the new Azure PowerShell Az module. For Az module installation instructions, see Install Azure
PowerShell.
NOTE
Stream Analytics jobs created programmatically do not have monitoring enabled by default. You can manually enable
monitoring in the Azure Portal by navigating to the job’s Monitor page and clicking the Enable button or you can do this
programmatically by following the steps located at Azure Stream Analytics - Monitor Stream Analytics Jobs Programmatically.
Azure PowerShell cmdlets for Stream Analytics
The following Azure PowerShell cmdlets can be used to monitor and manage Azure Stream Analytics jobs. Note
that Azure PowerShell has different versions. In the examples listed the first command is for Azure
PowerShell 0.9.8, the second command is for Azure PowerShell 1.0. The Azure PowerShell 1.0 commands
will always have "Az" in the command.
Get-AzureStreamAnalyticsJob | Get-AzStreamAnalyticsJob
Lists all Stream Analytics jobs defined in the Azure subscription or specified resource group, or gets job
information about a specific job within a resource group.
Example 1
Azure PowerShell 0.9.8:
Get-AzureStreamAnalyticsJob
Get-AzStreamAnalyticsJob
This PowerShell command returns information about all the Stream Analytics jobs in the Azure subscription.
Example 2
Azure PowerShell 0.9.8:
This PowerShell command returns information about all the Stream Analytics jobs in the resource group
StreamAnalytics-Default-Central-US.
Example 3
Azure PowerShell 0.9.8:
This PowerShell command returns information about the Stream Analytics job StreamingJob in the resource
group StreamAnalytics-Default-Central-US.
Get-AzureStreamAnalyticsInput | Get-AzStreamAnalyticsInput
Lists all of the inputs that are defined in a specified Stream Analytics job, or gets information about a specific input.
Example 1
Azure PowerShell 0.9.8:
This PowerShell command returns information about all the inputs defined in the job StreamingJob.
Example 2
Azure PowerShell 0.9.8:
This PowerShell command returns information about the input named EntryStream defined in the job
StreamingJob.
Get-AzureStreamAnalyticsOutput | Get-AzStreamAnalyticsOutput
Lists all of the outputs that are defined in a specified Stream Analytics job, or gets information about a specific
output.
Example 1
Azure PowerShell 0.9.8:
This PowerShell command returns information about the outputs defined in the job StreamingJob.
Example 2
Azure PowerShell 0.9.8:
This PowerShell command returns information about the quota and usage of streaming units in the Central US
region.
Get-AzureStreamAnalyticsTransformation | Get-AzStreamAnalyticsTransformation
Gets information about a specific transformation defined in a Stream Analytics job.
Example 1
Azure PowerShell 0.9.8:
This PowerShell command returns information about the transformation called StreamingJob in the job
StreamingJob.
New-AzureStreamAnalyticsInput | New-AzStreamAnalyticsInput
Creates a new input within a Stream Analytics job, or updates an existing specified input.
The name of the input can be specified in the .json file or on the command line. If both are specified, the name on
the command line must be the same as the one in the file.
If you specify an input that already exists and do not specify the –Force parameter, the cmdlet will ask whether or
not to replace the existing input.
If you specify the –Force parameter and specify an existing input name, the input will be replaced without
confirmation.
For detailed information on the JSON file structure and contents, refer to the Create Input (Azure Stream
Analytics) section of the Stream Analytics Management REST API Reference Library.
Example 1
Azure PowerShell 0.9.8:
New-AzureStreamAnalyticsInput -ResourceGroupName StreamAnalytics-Default-Central-US -JobName StreamingJob –File
"C:\Input.json"
This PowerShell command creates a new input from the file Input.json. If an existing input with the name specified
in the input definition file is already defined, the cmdlet will ask whether or not to replace it.
Example 2
Azure PowerShell 0.9.8:
This PowerShell command creates a new input in the job called EntryStream. If an existing input with this name is
already defined, the cmdlet will ask whether or not to replace it.
Example 3
Azure PowerShell 0.9.8:
This PowerShell command replaces the definition of the existing input source called EntryStream with the
definition from the file.
New-AzureStreamAnalyticsJob | New-AzStreamAnalyticsJob
Creates a new Stream Analytics job in Microsoft Azure, or updates the definition of an existing specified job.
The name of the job can be specified in the .json file or on the command line. If both are specified, the name on the
command line must be the same as the one in the file.
If you specify a job name that already exists and do not specify the –Force parameter, the cmdlet will ask whether
or not to replace the existing job.
If you specify the –Force parameter and specify an existing job name, the job definition will be replaced without
confirmation.
For detailed information on the JSON file structure and contents, refer to the Create Stream Analytics Job section
of the Stream Analytics Management REST API Reference Library.
Example 1
Azure PowerShell 0.9.8:
This PowerShell command creates a new job from the definition in JobDefinition.json. If an existing job with the
name specified in the job definition file is already defined, the cmdlet will ask whether or not to replace it.
Example 2
Azure PowerShell 0.9.8:
This PowerShell command creates a new output called "output" in the job StreamingJob. If an existing output with
this name is already defined, the cmdlet will ask whether or not to replace it.
Example 2
Azure PowerShell 0.9.8:
This PowerShell command replaces the definition for "output" in the job StreamingJob.
New-AzureStreamAnalyticsTransformation | New-AzStreamAnalyticsTransformation
Creates a new transformation within a Stream Analytics job, or updates the existing transformation.
The name of the transformation can be specified in the .json file or on the command line. If both are specified, the
name on the command line must be the same as the one in the file.
If you specify a transformation that already exists and do not specify the –Force parameter, the cmdlet will ask
whether or not to replace the existing transformation.
If you specify the –Force parameter and specify an existing transformation name, the transformation will be
replaced without confirmation.
For detailed information on the JSON file structure and contents, refer to the Create Transformation (Azure Stream
Analytics) section of the Stream Analytics Management REST API Reference Library.
Example 1
Azure PowerShell 0.9.8:
This PowerShell command creates a new transformation called StreamingJobTransform in the job StreamingJob. If
an existing transformation is already defined with this name, the cmdlet will ask whether or not to replace it.
Example 2
Azure PowerShell 0.9.8:
New-AzureStreamAnalyticsTransformation -ResourceGroupName StreamAnalytics-Default-Central-US –File
"C:\Transformation.json" –JobName StreamingJob –Name StreamingJobTransform -Force
This PowerShell command replaces the definition of StreamingJobTransform in the job StreamingJob.
Remove -AzureStreamAnalyticsInput | Remove -AzStreamAnalyticsInput
Asynchronously deletes a specific input from a Stream Analytics job in Microsoft Azure.
If you specify the –Force parameter, the input will be deleted without confirmation.
Example 1
Azure PowerShell 0.9.8:
This PowerShell command removes the input EventStream in the job StreamingJob.
Remove -AzureStreamAnalyticsJob | Remove -AzStreamAnalyticsJob
Asynchronously deletes a specific Stream Analytics job in Microsoft Azure.
If you specify the –Force parameter, the job will be deleted without confirmation.
Example 1
Azure PowerShell 0.9.8:
This PowerShell command removes the output Output in the job StreamingJob.
Start-AzureStreamAnalyticsJob | Start-AzStreamAnalyticsJob
Asynchronously deploys and starts a Stream Analytics job in Microsoft Azure.
Example 1
Azure PowerShell 0.9.8:
This PowerShell command starts the job StreamingJob with a custom output start time set to December 12, 2012,
12:12:12 UTC.
Stop-AzureStreamAnalyticsJob | Stop-AzStreamAnalyticsJob
Asynchronously stops a Stream Analytics job from running in Microsoft Azure and de-allocates resources that
were that were being used. The job definition and metadata will remain available within your subscription through
both the Azure portal and management APIs, such that the job can be edited and restarted. You will not be charged
for a job in the stopped state.
Example 1
Azure PowerShell 0.9.8:
This PowerShell command tests the connection status of the input EntryStream in StreamingJob.
Test-AzureStreamAnalyticsOutput | Test-AzStreamAnalyticsOutput
Tests the ability of Stream Analytics to connect to a specified output.
Example 1
Azure PowerShell 0.9.8:
This PowerShell command tests the connection status of the output Output in StreamingJob.
Get support
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Programmatically create a Stream Analytics job
monitor
5/29/2019 • 3 minutes to read • Edit Online
This article demonstrates how to enable monitoring for a Stream Analytics job. Stream Analytics jobs that are
created via REST APIs, Azure SDK, or PowerShell do not have monitoring enabled by default. You can manually
enable it in the Azure portal by going to the job’s Monitor page and clicking the Enable button or you can
automate this process by following the steps in this article. The monitoring data will show up in the Metrics area of
the Azure portal for your Stream Analytics job.
Prerequisites
Before you begin this process, you must have the following prerequisites:
Visual Studio 2019 or 2015
Azure .NET SDK downloaded and installed
An existing Stream Analytics job that needs to have monitoring enabled
Create a project
1. Create a Visual Studio C# .NET console application.
2. In the Package Manager Console, run the following commands to install the NuGet packages. The first one
is the Azure Stream Analytics Management .NET SDK. The second one is the Azure Monitor SDK that will
be used to enable monitoring. The last one is the Azure Active Directory client that will be used for
authentication.
Install-Package Microsoft.Azure.Management.StreamAnalytics
Install-Package Microsoft.Azure.Insights -Pre
Install-Package Microsoft.IdentityModel.Clients.ActiveDirectory
<appSettings>
<!--CSM Prod related values-->
<add key="ResourceGroupName" value="RESOURCE GROUP NAME" />
<add key="JobName" value="YOUR JOB NAME" />
<add key="StorageAccountName" value="YOUR STORAGE ACCOUNT"/>
<add key="ActiveDirectoryEndpoint" value="https://login.microsoftonline.com/" />
<add key="ResourceManagerEndpoint" value="https://management.azure.com/" />
<add key="WindowsManagementUri" value="https://management.core.windows.net/" />
<add key="AsaClientId" value="1950a258-227b-4e31-a9cf-717495945fc2" />
<add key="RedirectUri" value="urn:ietf:wg:oauth:2.0:oob" />
<add key="SubscriptionId" value="YOUR AZURE SUBSCRIPTION ID" />
<add key="ActiveDirectoryTenantId" value="YOUR TENANT ID" />
</appSettings>
Replace values for SubscriptionId and ActiveDirectoryTenantId with your Azure subscription and tenant IDs.
You can get these values by running the following PowerShell cmdlet:
Get-AzureAccount
4. Add the following using statements to the source file (Program.cs) in the project.
using System;
using System.Configuration;
using System.Threading;
using Microsoft.Azure;
using Microsoft.Azure.Management.Insights;
using Microsoft.Azure.Management.Insights.Models;
using Microsoft.Azure.Management.StreamAnalytics;
using Microsoft.Azure.Management.StreamAnalytics.Models;
using Microsoft.IdentityModel.Clients.ActiveDirectory;
thread.SetApartmentState(ApartmentState.STA);
thread.Name = "AcquireTokenThread";
thread.Start();
thread.Join();
if (result != null)
{
return result.AccessToken;
}
throw new InvalidOperationException("Failed to acquire token");
}
WARNING
If you have previously enabled monitoring for a different Stream Analytics job, either through the Azure portal or
programmatically via the below code, we recommend that you provide the same storage account name that you used
when you previously enabled monitoring.
The storage account is linked to the region that you created your Stream Analytics job in, not specifically to the job itself.
All Stream Analytics jobs (and all other Azure resources) in that same region share this storage account to store monitoring
data. If you provide a different storage account, it might cause unintended side effects in the monitoring of your other
Stream Analytics jobs or other Azure resources.
The storage account name that you use to replace <YOUR STORAGE ACCOUNT NAME> in the following code should be a
storage account that is in the same subscription as the Stream Analytics job that you are enabling monitoring for.
// Enable monitoring
ServiceDiagnosticSettingsPutParameters insightPutParameters = new ServiceDiagnosticSettingsPutParameters()
{
Properties = new ServiceDiagnosticSettings()
{
StorageAccountName = "<YOUR STORAGE ACCOUNT NAME>"
}
};
insightsClient.ServiceDiagnosticSettingsOperations.Put(jobGetResponse.Job.Id, insightPutParameters);
Get support
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Monitor and manage Stream Analytics jobs with
Visual Studio
12/7/2018 • 2 minutes to read • Edit Online
This article demonstrates how to monitor your Stream Analytics job in Visual Studio. Azure Stream Analytics tools
for Visual Studio provides a monitoring experience similar to Azure portal without having to leave the IDE. You can
begin to monitor a job as soon as you Submit to Azure from your Script.asaql, or you can monitor existing jobs
regardless of how they were created.
Job summary
The Job Summary and Job Metrics give a quick snapshot of your job. At a glance, you can determine a job's
status and event information.]
Job metrics
You can collapse the Job Summary and click the Job Metrics tab to view a graph with important metrics. Check
and un-check metrics types to add and remove them from the graph.
Error monitoring
You can also monitor errors by clicking on the Errors tab.
Get support
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Create an Azure Stream Analytics job with Visual Studio
Install Azure Stream Analytics tools for Visual Studio
Understand and adjust Streaming Units
5/20/2019 • 8 minutes to read • Edit Online
Streaming Units (SUs) represents the computing resources that are allocated to execute a job. The higher the
number of SUs, the more CPU and memory resources are allocated for your job. This capacity lets you focus on
the query logic and abstracts the need to manage the hardware to run your Stream Analytics job in a timely
manner.
To achieve low latency stream processing, Azure Stream Analytics jobs perform all processing in memory. When
running out of memory, the streaming job fails. As a result, for a production job, it’s important to monitor a
streaming job’s resource usage, and make sure there is enough resource allocated to keep the jobs running 24/7.
The SU % utilization metric, which ranges from 0% to 100%, describes the memory consumption of your
workload. For a streaming job with minimal footprint, this metric is usually between 10% to 20%. If SU%
utilization is low and input events get backlogged, your workload likely requires more compute resources, which
requires you to increase the number of SUs. It’s best to keep the SU metric below 80% to account for occasional
spikes. Microsoft recommends setting an alert on 80% SU Utilization metric to prevent resource exhaustion. For
more information, see Tutorial: Set up alerts for Azure Stream Analytics jobs.
4. Use the slider to set the SUs for the job. Notice that you are limited to specific SU settings.
NOTE
Choosing how many SUs are required for a particular job depends on the partition configuration for the inputs and on the
query defined for the job. You can select up to your quota in SUs for a job. By default, each Azure subscription has a quota
of up to 500 SUs for all the analytics jobs in a specific region. To increase SUs for your subscriptions beyond this quota,
contact Microsoft Support. Valid values for SUs per job are 1, 3, 6, and up in increments of 6.
Windowed aggregates
The memory consumed (state size) for a windowed aggregate is not always directly proportional to the window
size. Instead, the memory consumed is proportional to the cardinality of the data, or the number of groups in
each time window.
For example, in the following query, the number associated with clusterid is the cardinality of the query.
SELECT count(*)
FROM input
GROUP BY clusterid, tumblingwindow (minutes, 5)
In order to ameliorate issues caused by high cardinality in the previous query, you can send events to Event Hub
partitioned by clusterid , and scale out the query by allowing the system to process each input partition
separately using PARTITION BY as shown in the example below:
SELECT count(*)
FROM input PARTITION BY PartitionId
GROUP BY PartitionId, clusterid, tumblingwindow (minutes, 5)
Once the query is partitioned out, it is spread out over multiple nodes. As a result, the number of clusterid
values coming into each node is reduced thereby reducing the cardinality of the group by operator.
Event Hub partitions should be partitioned by the grouping key to avoid the need for a reduce step. For more
information, see Event Hubs overview.
Temporal joins
The memory consumed (state size) of a temporal join is proportional to the number of events in the temporal
wiggle room of the join, which is event input rate multiply by the wiggle room size. In other words, the memory
consumed by joins is proportional to the DateDiff time range multiplied by average event rate.
The number of unmatched events in the join affect the memory utilization for the query. The following query is
looking to find the ad impressions that generate clicks:
SELECT clicks.id
FROM clicks
INNER JOIN impressions ON impressions.id = clicks.id AND DATEDIFF(hour, impressions, clicks) between 0 AND
10.
In this example, it is possible that lots of ads are shown and few people click on it and it is required to keep all the
events in the time window. Memory consumed is proportional to the window size and event rate.
To remediate this, send events to Event Hub partitioned by the join keys (id in this case), and scale out the query
by allowing the system to process each input partition separately using PARTITION BY as shown:
SELECT clicks.id
FROM clicks PARTITION BY PartitionId
INNER JOIN impressions PARTITION BY PartitionId
ON impression.PartitionId = clicks.PartitionId AND impressions.id = clicks.id AND DATEDIFF(hour, impressions,
clicks) between 0 AND 10
Once the query is partitioned out, it is spread out over multiple nodes. As a result the number of events coming
into each node is reduced thereby reducing the size of the state kept in the join window.
Reference data
Reference data in ASA are loaded into memory for fast lookup. With the current implementation, each join
operation with reference data keeps a copy of the reference data in memory, even if you join with the same
reference data multiple times. For queries with PARTITION BY, each partition has a copy of the reference data,
so the partitions are fully decoupled. With the multiplier effect, memory usage can quickly get very high if you
join with reference data multiple times with multiple partitions.
Use of UDF functions
When you add a UDF function, Azure Stream Analytics loads the JavaScript runtime into memory. This will affect
the SU%.
Next steps
Create parallelizable queries in Azure Stream Analytics
Scale Azure Stream Analytics jobs to increase throughput
Leverage query parallelization in Azure Stream
Analytics
1/3/2019 • 9 minutes to read • Edit Online
This article shows you how to take advantage of parallelization in Azure Stream Analytics. You learn how to scale
Stream Analytics jobs by configuring input partitions and tuning the analytics query definition. As a prerequisite,
you may want to be familiar with the notion of Streaming Unit described in Understand and adjust Streaming
Units.
SELECT TollBoothId
FROM Input1 Partition By PartitionId
WHERE TollBoothId > 100
This query is a simple filter. Therefore, we don't need to worry about partitioning the input that is being sent to
the event hub. Notice that the query includes PARTITION BY PartitionId, so it fulfills requirement #2 from
earlier. For the output, we need to configure the event hub output in the job to have the partition key set to
PartitionId. One last check is to make sure that the number of input partitions is equal to the number of output
partitions.
Query with a grouping key
Input: Event hub with 8 partitions
Output: Blob storage
Query:
This query has a grouping key. Therefore, the events grouped together must be sent to the same Event Hub
partition. Since in this example we group by TollBoothID, we should be sure that TollBoothID is used as the
partition key when the events are sent to Event Hub. Then in ASA, we can use PARTITION BY PartitionId to
inherit from this partition scheme and enable full parallelization. Since the output is blob storage, we don't need
to worry about configuring a partition key value, as per requirement #4.
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId, PartitionId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
As you can see, the second step uses TollBoothId as the partitioning key. This step is not the same as the first
step, and it therefore requires us to do a shuffle.
The preceding examples show some Stream Analytics jobs that conform to (or don't) an embarrassingly parallel
topology. If they do conform, they have the potential for maximum scale. For jobs that don't fit one of these
profiles, scaling guidance will be available in future updates. For now, use the general guidance in the following
sections.
Calculate the maximum streaming units of a job
The total number of streaming units that can be used by a Stream Analytics job depends on the number of steps
in the query defined for the job and the number of partitions for each step.
Steps in a query
A query can have one or many steps. Each step is a subquery defined by the WITH keyword. The query that is
outside the WITH keyword (one query only) is also counted as a step, such as the SELECT statement in the
following query:
Query:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute,3), TollBoothId
NOTE
This query is discussed in more detail later in the article.
Partition a step
Partitioning a step requires the following conditions:
The input source must be partitioned.
The SELECT statement of the query must read from a partitioned input source.
The query within the step must have the PARTITION BY keyword.
When a query is partitioned, the input events are processed and aggregated in separate partition groups, and
outputs events are generated for each of the groups. If you want a combined aggregate, you must create a second
non-partitioned step to aggregate.
Calculate the max streaming units for a job
All non-partitioned steps together can scale up to six streaming units (SUs) for a Stream Analytics job. In addition
to this, you can add 6 SUs for each partition in a partitioned step. You can see some examples in the table below.
The input data stream is partitioned by 3. 24 (18 for partitioned steps + 6 for non-partitioned steps
The query contains two steps. The input step is
partitioned and the second step is not.
The SELECT statement reads from the partitioned
input.
Examples of scaling
The following query calculates the number of cars within a three-minute window going through a toll station that
has three tollbooths. This query can be scaled up to six SUs.
To use more SUs for the query, both the input data stream and the query must be partitioned. Since the data
stream partition is set to 3, the following modified query can be scaled up to 18 SUs:
When a query is partitioned, the input events are processed and aggregated in separate partition groups. Output
events are also generated for each of the groups. Partitioning can cause some unexpected results when the
GROUP BY field is not the partition key in the input data stream. For example, the TollBoothId field in the
previous query is not the partition key of Input1. The result is that the data from TollBooth #1 can be spread in
multiple partitions.
Each of the Input1 partitions will be processed separately by Stream Analytics. As a result, multiple records of the
car count for the same tollbooth in the same Tumbling window will be created. If the input partition key can't be
changed, this problem can be fixed by adding a non-partition step to aggregate values across partitions, as in the
following example:
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
NOTE
If you are joining two streams, make sure that the streams are partitioned by the partition key of the column that you use
to create the joins. Also make sure that you have the same number of partitions in both streams.
Get help
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Scale an Azure Stream Analytics job to increase
throughput
12/17/2018 • 6 minutes to read • Edit Online
This article shows you how to tune a Stream Analytics query to increase throughput for Streaming
Analytics jobs. You can use the following guide to scale your job to handle higher load and take advantage
of more system resources (such as more bandwidth, more CPU resources, more memory). As a
prerequisite, you may need to read the following articles:
Understand and adjust Streaming Units
Create parallelizable jobs
WITH Step1 AS (
SELECT COUNT(*) AS Count, TollBoothId, PartitionId
FROM Input1 Partition By PartitionId
GROUP BY TumblingWindow(minute, 3), TollBoothId, PartitionId
)
SELECT SUM(Count) AS Count, TollBoothId
FROM Step1
GROUP BY TumblingWindow(minute, 3), TollBoothId
In the query above, you are counting cars per toll booth per partition, and then adding the count from all
partitions together.
Once partitioned, for each partition of the step, allocate up to 6 SU, each partition having 6 SU is the
maximum, so each partition can be placed on its own processing node.
NOTE
If your query cannot be partitioned, adding additional SU in a multi-steps query may not always improve
throughput. One way to gain performance is to reduce volume on the initial steps using local/global aggregate
pattern, as described above in step 5.
NOTE
How many tenants to put in each job? This query pattern often has a large number of subqueries, and results in very
large and complex topology. The controller of the job may not be able to handle such a large topology. As a rule of
thumb, stay under 40 tenants for 1 SU job, and 60 tenants for 3 SU and 6 SU jobs. When you are exceeding the
capacity of the controller, the job will not start successfully.
Get help
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Scale your Stream Analytics job with Azure Machine
Learning functions
4/8/2019 • 8 minutes to read • Edit Online
It's straight forward to set up a Stream Analytics job and run some sample data through it. What should we do
when we need to run the same job with higher data volume? It requires us to understand how to configure the
Stream Analytics job so that it scales. In this document, we focus on the special aspects of scaling Stream Analytics
jobs with Machine Learning functions. For information on how to scale Stream Analytics jobs in general see the
article Scaling jobs.
An additional consideration may be the 'max concurrent calls' on the Machine Learning web service side, it’s
recommended to set this to the maximum value (200 currently).
For more information on this setting, review the Scaling article for Machine Learning Web Services.
WITH subquery AS (
SELECT text, sentiment(text) as result from input
)
Consider the following scenario; with a throughput of 10,000 tweets per second a Stream Analytics job must be
created to perform sentiment analysis of the tweets (events). Using 1 SU, could this Stream Analytics job be able to
handle the traffic? Using the default batch size of 1000 the job should be able to keep up with the input. Further the
added Machine Learning function should generate no more than a second of latency, which is the general default
latency of the sentiment analysis Machine Learning web service (with a default batch size of 1000). The Stream
Analytics job’s overall or end-to-end latency would typically be a few seconds. Take a more detailed look into this
Stream Analytics job, especially the Machine Learning function calls. Having the batch size as 1000, a throughput
of 10,000 events take about 10 requests to web service. Even with one SU, there are enough concurrent
connections to accommodate this input traffic.
If the input event rate increases by 100x, then the Stream Analytics job needs to process 1,000,000 tweets per
second. There are two options to accomplish the increased scale:
1. Increase the batch size, or
2. Partition the input stream to process the events in parallel
With the first option, the job latency increases.
With the second option, more SUs would need to be provisioned and therefore generate more concurrent Machine
Learning web service requests. This means the job cost increases.
Assume the latency of the sentiment analysis Machine Learning web service is 200 ms for 1000-event batches or
below, 250 ms for 5,000-event batches, 300 ms for 10,000-event batches or 500 ms for 25,000-event batches.
1. Using the first option (not provisioning more SUs). The batch size could be increased to 25,000. This in turn
would allow the job to process 1,000,000 events with 20 concurrent connections to the Machine Learning web
service (with a latency of 500 ms per call). So the additional latency of the Stream Analytics job due to the
sentiment function requests against the Machine Learning web service requests would be increased from 200
ms to 500 ms. However, batch size cannot be increased infinitely as the Machine Learning web services
requires the payload size of a request be 4 MB or smaller web service requests timeout after 100 seconds of
operation.
2. Using the second option, the batch size is left at 1000, with 200-ms web service latency, every 20 concurrent
connections to the web service would be able to process 1000 * 20 * 5 events = 100,000 per second. So to
process 1,000,000 events per second, the job would need 60 SUs. Compared to the first option, Stream
Analytics job would make more web service batch requests, in turn generating an increased cost.
Below is a table for the throughput of the Stream Analytics job for different SUs and batch sizes (in number of
events per second).
… … … … … …
By now, you should already have a good understanding of how Machine Learning functions in Stream Analytics
work. You likely also understand that Stream Analytics jobs "pull" data from data sources and each "pull" returns a
batch of events for the Stream Analytics job to process. How does this pull model impact the Machine Learning
web service requests?
Normally, the batch size we set for Machine Learning functions won’t exactly be divisible by the number of events
returned by each Stream Analytics job "pull". When this occurs, the Machine Learning web service is called with
"partial" batches. This is done to not incur additional job latency overhead in coalescing events from pull to pull.
Key Takeaways
To summarize the main points, in order to scale a Stream Analytics job with Machine Learning functions, the
following items must be considered:
1. The input event rate
2. The tolerated latency for the running Stream Analytics job (and thus the batch size of the Machine Learning
web service requests)
3. The provisioned Stream Analytics SUs and the number of Machine Learning web service requests (the
additional function-related costs)
A fully partitioned Stream Analytics query was used as an example. If a more complex query is needed, the Azure
Stream Analytics forum is a great resource for getting additional help from the Stream Analytics team.
Next steps
To learn more about Stream Analytics, see:
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Management .NET SDK: Set up and run analytics jobs
using the Azure Stream Analytics API for .NET
5/29/2019 • 7 minutes to read • Edit Online
Learn how to set up and run analytics jobs using the Stream Analytics API for .NET using the Management .NET
SDK. Set up a project, create input and output sources, transformations, and start and stop jobs. For your analytics
jobs, you can stream data from Blob storage or from an event hub.
See the management reference documentation for the Stream Analytics API for .NET.
Azure Stream Analytics is a fully managed service providing low -latency, highly available, scalable, complex event
processing over streaming data in the cloud. Stream Analytics enables customers to set up streaming jobs to
analyze data streams, and allows them to drive near real-time analytics.
NOTE
We have updated the sample code in this article with Azure Stream Analytics Management .NET SDK v2.x version. For sample
code using the uses lagecy (1.x) SDK version, please see Use the Management .NET SDK v1.x for Stream Analytics.
Prerequisites
Before you begin this article, you must have the following requirements:
Install Visual Studio 2019 or 2015.
Download and install Azure .NET SDK.
Create an Azure Resource Group in your subscription. The following example is a sample Azure PowerShell
script. For Azure PowerShell information, see Install and configure Azure PowerShell;
# Select the Azure subscription you want to use to create the resource group
Select-AzureSubscription -SubscriptionName <subscription name>
# If Stream Analytics has not been registered to the subscription, remove the remark symbol (#) to
run the Register-AzProvider cmdlet to register the provider namespace
#Register-AzProvider -Force -ProviderNamespace 'Microsoft.StreamAnalytics'
Set up an input source and output target for the job to connect to.
Set up a project
To create an analytics job, use the Stream Analytics API for .NET, first set up your project.
1. Create a Visual Studio C# .NET console application.
2. In the Package Manager Console, run the following commands to install the NuGet packages. The first one
is the Azure Stream Analytics Management .NET SDK. The second one is for Azure client authentication.
Install-Package Microsoft.Azure.Management.StreamAnalytics -Version 2.0.0
Install-Package Microsoft.Rest.ClientRuntime.Azure.Authentication -Version 2.3.1
<appSettings>
<add key="ClientId" value="1950a258-227b-4e31-a9cf-717495945fc2" />
<add key="RedirectUri" value="urn:ietf:wg:oauth:2.0:oob" />
<add key="SubscriptionId" value="YOUR SUBSCRIPTION ID" />
<add key="ActiveDirectoryTenantId" value="YOUR TENANT ID" />
</appSettings>
Replace values for SubscriptionId and ActiveDirectoryTenantId with your Azure subscription and tenant
IDs. You can get these values by running the following Azure PowerShell cmdlet:
Get-AzureAccount
5. Add the following using statements to the source file (Program.cs) in the project:
using System;
using System.Collections.Generic;
using System.Configuration;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Management.StreamAnalytics;
using Microsoft.Azure.Management.StreamAnalytics.Models;
using Microsoft.Rest.Azure.Authentication;
using Microsoft.Rest;
return credentials;
}
SynchronizationContext.SetSynchronizationContext(new SynchronizationContext());
// Get credentials
ServiceClientCredentials credentials = GetCredentials().Result;
The resourceGroupName variable's value should be the same as the name of the resource group you created or
picked in the prerequisite steps.
To automate the credential presentation aspect of job creation, refer to Authenticating a service principal with
Azure Resource Manager.
The remaining sections of this article assume that this code is at the beginning of the Main method.
Input sources, whether from Blob storage or an event hub, are tied to a specific job. To use the same input source
for different jobs, you must call the method again and specify a different job name.
// Create a transformation
Transformation transformation = new Transformation()
{
Query = "Select Id, Name from <your input name>", // '<your input name>' should be replaced with the value
you put for the 'inputName' variable above or in a previous step
StreamingUnits = 1
};
Transformation createTransformationResult =
streamAnalyticsManagementClient.Transformations.CreateOrReplace(transformation, resourceGroupName,
streamingJobName, transformationName);
Like input and output, a transformation is also tied to the specific Stream Analytics job it was created under.
Get support
For further assistance, try our Azure Stream Analytics forum.
Next steps
You've learned the basics of using a .NET SDK to create and run analytics jobs. To learn more, see the following
articles:
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Management .NET SDK.
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Implement CI/CD for Stream Analytics on IoT Edge
using APIs
2/14/2019 • 3 minutes to read • Edit Online
You can enable continuous integration and deployment for Azure Stream Analytics jobs using REST APIs. This
article provides examples on which APIs to use and how to use them. REST APIs aren't supported on Azure Cloud
Shell.
Windows
For Windows, use Powershell:
$user = "<username>"
$pass = "<password>"
$encodedCreds = [Convert]::ToBase64String([Text.Encoding]::ASCII.GetBytes(("{0}:{1}" -f $user,$pass)))
$basicAuthValue = "Basic $encodedCreds"
$headers = New-Object "System.Collections.Generic.Dictionary[[String],[String]]"
$headers.Add("Content-Type", 'application/json')
$headers.Add("Authorization", $basicAuthValue)
$content = "<request body>"
$response = Invoke-RestMethod <url>-Method <method> -Body $content -Headers $Headers
echo $response
PUT https://management.azure.com/subscriptions/{subscription-
id}/resourcegroups/{resource-group-
name}/providers/Microsoft.StreamAnalytics/streamingjobs/{jo
b-name}?api-version=2017-04-01-preview
POST https://management.azure.com/subscriptions/{subscriptionid
}/resourceGroups/{resourcegroupname}/providers/Microsoft
.StreamAnalytics/streamingjobs/{jobname}/publishedgepacka
ge?api-version=2017-04-01-preview
This asynchronous operation returns a status of 202 until the job has been successfully published. The location
response header contains the URI used to get the status of the process. While the process is running, a call to the
URI in the location header returns a status of 202. When the process finishes, the URI in the location header
returns a status of 200.
Example of an Edge package publish call using curl:
curl -d -X POST
https://management.azure.com/subscriptions/{subscriptionid}/resourceGroups/{resourcegroupname}/providers/Micro
soft.StreamAnalytics/streamingjobs/{jobname}/publishedgepackage?api-version=2017-04-01-preview
After making the POST call, you should expect a response with an empty body. Look for the URL located in the
HEAD of the response and record it for further use.
Example of the URL from the HEAD of response:
https://management.azure.com/subscriptions/{**subscriptionid**}/resourcegroups/{**resourcegroupname**}/provide
rs/Microsoft.StreamAnalytics/StreamingJobs/{**resourcename**}/OperationResults/023a4d68-ffaf-4e16-8414-
cb6f2e14fe23?api-version=2017-04-01-preview
A Wait for one to two minutes before running the following command to make an API call with the URL you found
in the HEAD of the response. Retry the command if you do not get a 200 response.
Example of making API call with returned URL with curl:
curl -d –X GET
https://management.azure.com/subscriptions/{subscriptionid}/resourceGroups/{resourcegroupname}/providers/Micro
soft.StreamAnalytics/streamingjobs/{resourcename}/publishedgepackage?api-version=2017-04-01-preview
The response includes the information you need to add to the Edge deployment script. The examples below show
what information you need to collect and where to add it in the deployment manifest.
Sample response body after publishing successfully:
{
edgePackageUrl : null
error : null
manifest : "{"supportedPlatforms":[{"os":"linux","arch":"amd64","features":[]},
{"os":"linux","arch":"arm","features":[]},{"os":"windows","arch":"amd64","features":
[]}],"schemaVersion":"2","name":"{jobname}","version":"1.0.0.0","type":"docker","settings":{"image":"
{imageurl}","createOptions":null},"endpoints":{"inputs":["],"outputs":["{outputnames}"]},"twin":
{"contentType":"assignments","content":{"properties.desired":{"ASAJobInfo":"
{asajobsasurl}","ASAJobResourceId":"{asajobresourceid}","ASAJobEtag":"
{etag}",”PublishTimeStamp”:”{publishtimestamp}”}}}}"
status : "Succeeded"
}
Next steps
Azure Stream Analytics on IoT Edge
ASA on IoT Edge tutorial
Develop Stream Analytics Edge jobs using Visual Studio tools
Install Azure Stream Analytics tools for Visual Studio
2/18/2019 • 2 minutes to read • Edit Online
Azure Stream Analytics tools support Visual Studio 2017, 2015, and 2013. This article describes how to install
and uninstall the tools.
For more information on using the tools, see Stream Analytics tools for Visual Studio.
Install
Recommended: Visual Studio 2019 and 2017
Download Visual Studio 2019 (Preview 2 or above) and Visual Studio 2017 (15.3 or above). Enterprise
(Ultimate/Premium), Professional, and Community editions are supported. Express edition is not supported.
Visual Studio 2017 on Mac is not supported.
Stream Analytics tools are part of the Azure development and Data storage and processing workloads in
Visual Studio 2017. Enable either one of these two workloads as part of your Visual Studio installation.
Enable the Data storage and processing workload as shown:
In the Tools menu, choose Extensions and Updates. Find Azure Data Lake and Stream Analytics tools in the
installed extensions and click Update to install the latest extension.
Visual Studio 2015, 2013
Install Visual Studio 2015 or Visual Studio 2013 Update 4. Enterprise (Ultimate/Premium), Professional, and
Community editions are supported. Express edition is not supported.
Install the Microsoft Azure SDK for .NET version 2.7.1 or above by using the Web platform installer.
Install Azure Stream Analytics tools for Visual Studio.
Update
Visual Studio 2019 and 2017
The new version reminder shows up in the Visual Studio notification.
Uninstall
Visual Studio 2019 and 2017
Double-click the Visual Studio installer, and select Modify. Clear the Azure Data Lake and Stream Analytics
Tools check box from either the Data storage and processing workload or the Azure development workload.
Visual Studio 2015 and 2013
Go to Control Panel, and uninstall Microsoft Azure Data Lake and Stream Analytics tools for Visual
Studio.
Test Stream Analytics queries locally with Visual
Studio
9/10/2018 • 2 minutes to read • Edit Online
You can use Azure Stream Analytics tools for Visual Studio to test your Stream Analytics jobs locally with sample
data.
Use this Quickstart to learn how to create a Stream Analytics job using Visual Studio.
In the pop-up window, select sample data from your local path and Save.
A file named local_EntryStream.json is added automatically to your inputs folder.
Select Run Locally in the query editor. Or you can press F5.
The output can be viewed in a table format directly from Visual Studio.
You can find the output path from the console output. Press any key to open the result folder.
You can only sample data streaming from Event Hubs or IoT Hubs. Other input sources are not supported. In the
pop-up dialog box, fill in the local path to save the sample data and select Sample.
Next steps
Use Visual Studio to view Azure Stream Analytics jobs
Quickstart: Create a Stream Analytics job using Visual Studio
Tutorial: Deploy an Azure Stream Analytics job with CI/CD using Azure DevOps
Continuously integrate and develop with Stream Analytics tools
Test live data locally using Azure Stream Analytics
tools for Visual Studio (Preview)
12/7/2018 • 2 minutes to read • Edit Online
Azure Stream Analytics tools for Visual Studio allows you to test jobs locally from the IDE using live event
streams from Azure Event Hub, IoT Hub, and Blob Storage. Live data local testing can't replace the performance
and scalability testing you can perform in the cloud, but you can save time during functional testing by not having
to submit to the cloud each time you want to test your Stream Analytics job. This feature is in preview and
shouldn't be used for production workloads.
Testing options
The following local testing options are supported:
2. To test live data, choose Use Cloud Input from the dropdown box.
3. Set the Start Time to define when the job will start processing input data. The job might need to read input
data ahead of time to ensure accurate results. The default time is set to 30 minutes ahead of the current
time.
4. Click Run Locally. A console window will appear with the running progress and job metrics. If you want to
stop the process, you can do so manually.
The output results are refreshed every three seconds with the first 500 output rows in the local run result
window, and the output files are placed in your project path ASALocalRun folder. You can also open the
output files by clicking Open Results Folder button in the local run result window.
5. If you want to output the results to your cloud output sinks, choose Output to Cloud from the second
dropdown box. Power BI and Azure Data Lake Storage are not supported output sinks.
Limitations
Power BI and Azure Data Lake Storage are not supported output sinks due to authentication model
limitations.
Only cloud input options have time policies support, while local input options do not.
Next steps
Create a Stream Analytics job by using the Azure Stream Analytics tools for Visual Studio
Install Azure Stream Analytics tools for Visual Studio
Test Stream Analytics queries locally with Visual Studio
Use Visual Studio to view Azure Stream Analytics jobs
Use Visual Studio to view Azure Stream Analytics
jobs
9/26/2018 • 2 minutes to read • Edit Online
Azure Stream Analytics tools for Visual Studio makes it easy for developers to manage their Stream Analytics
jobs directly from the IDE. With Azure Stream Analytics tools, you can:
Create new jobs
Start, stop, and monitor jobs
Check job results
Export existing jobs to a project
Test input and output connections
Run queries locally
Learn how to install Azure Stream Analytics tools for Visual Studio.
2. Expand your job node, and double-click on the Job View node to open a job view.
Start and stop jobs
Azure Stream Analytics jobs can be fully managed from the job view in Visual Studio. Use the controls to start,
stop, or delete a job.
Next steps
Monitor and manage Azure Stream Analytics jobs using Visual Studio
Quickstart: Create a Stream Analytics job using Visual Studio
Tutorial: Deploy an Azure Stream Analytics job with CI/CD using Azure Pipelines
Continuously integrate and develop with Stream Analytics tools
Develop Stream Analytics Data Box Edge jobs using
Visual Studio tools
5/29/2019 • 4 minutes to read • Edit Online
In this tutorial, you learn how to use Stream Analytics tools for Visual Studio. You learn how to author, debug, and
create your Stream Analytics Data Box Edge jobs. After you create and test the job, you can go to the Azure portal
to deploy it to your devices.
Prerequisites
You need the following prerequisites to complete this tutorial:
Install Visual Studio 2019, Visual Studio 2015, or Visual Studio 2013 Update 4. Enterprise
(Ultimate/Premium), Professional, and Community editions are supported. Express edition isn't supported.
Follow the installation instructions to install Stream Analytics tools for Visual Studio.
After the project gets created, navigate to the Solution Explorer to view the folder hierarchy.
Choose the correct subscription
1. From your Visual Studio View menu, select Server Explorer.
2. Right click on Azure > Select Connect to Microsoft Azure Subscription > and then sign in with your
Azure account.
Define inputs
1. From the Solution Explorer, expand the Inputs node you should see an input named EdgeInput.json.
Double-click to view its settings.
2. Set Source Type to Data Stream. Then set Source to Edge Hub, Event Serialization Format to Json, and
Encoding to UTF8. Optionally, you can rename the Input Alias, let’s leave it as is for this example. In case
you rename the input alias, use the name you specified when defining the query. Select Save to save the
settings.
Define outputs
1. From the Solution Explorer, expand the Outputs node you should see an output named
EdgeOutput.json. Double-click to view its settings.
2. Make sure to set Sink to select Edge Hub, set Event Serialization Format to Json, set Encoding to UTF8,
and set Format Array. Optionally, you can rename the Output Alias, let’s leave it as is for this example. In
case you rename the output alias, use the name you specified when defining the query. Select Save to save
the settings.
CATEGORY COMMAND
When you create a Stream Analytics Data Box Edge job in the portal, the compiler will automatically warn you if
you aren't using a supported operator.
From your Visual Studio, define the following transformation query in the query editor ( script.asaql file)
5. The command prompt window shows the status of the job. When the job runs successfully, it creates a
folder that looks like "2018-02-23-11-31-42" in your project folder path "Visual Studio
2015\Projects\MyASAEdgejob\MyASAEdgejob\ASALocalRun\2018-02-23-11-31-42". Navigate to the
folder path to view the results in the local folder:
You can also sign in to the Azure portal and verify that the job is created.
Submit the job to Azure
1. Before you submit the job to Azure, you must connect to your Azure Subscription. Open Server Explorer
> right click on Azure > Connect to Microsoft Azure subscription > sign in to your Azure subscription.
2. To submit the job to Azure, navigate to the query editor > select Submit to Azure.
3. A pop-up window opens. Choose to update an existing Stream Analytics Data Box Edge job or create a new
one. When you update an existing job, it will replace all the job configuration, in this scenario, you'll publish
a new job. Select Create a New Azure Stream Analytics Job > enter a name for your job something like
MyASAEdgeJob > choose the required Subscription, Resource Group, and Location > Select Submit.
Now your Stream Analytics Data Box Edge job has been created. You can refer to the Run jobs on IoT Edge
tutorial to learn how to deploy it to your devices.
Next steps
More information on Azure IoT Edge
ASA on IoT Edge tutorial
Send feedback to the team using this survey
Continuously integrate and develop with Azure
Stream Analytics CI/CD NuGet package
5/16/2019 • 2 minutes to read • Edit Online
This article describes how to use the Azure Stream Analytics CI/CD NuGet package to set up a continuous
integration and deployment process.
Use version 2.3.0000.0 or above of Stream Analytics tools for Visual Studio to get support for MSBuild.
A NuGet package is available: Microsoft.Azure.Stream Analytics.CICD. It provides the MSBuild, local run, and
deployment tools that support the continuous integration and deployment process of Stream Analytics Visual
Studio projects.
NOTE
The NuGet package can be used only with the 2.3.0000.0 or above version of Stream Analytics Tools for Visual Studio. If you
have projects created in previous versions of Visual Studio tools, just open them with the 2.3.0000.0 or above version and
save. Then the new capabilities are enabled.
For more information, see Stream Analytics tools for Visual Studio.
MSBuild
Like the standard Visual Studio MSBuild experience, to build a project you have two options. You can right-click
the project, and then choose Build. You also can use MSBuild in the NuGet package from the command line.
When a Stream Analytics Visual Studio project builds successfully, it generates the following two Azure Resource
Manager template files under the bin/[Debug/Retail]/Deploy folder:
Resource Manager template file
[ProjectName].JobTemplate.json
[ProjectName].JobTemplate.parameters.json
The default parameters in the parameters.json file are from the settings in your Visual Studio project. If you want
to deploy to another environment, replace the parameters accordingly.
NOTE
For all the credentials, the default values are set to null. You are required to set the values before you deploy to the cloud.
"Input_EntryStream_sharedAccessPolicyKey": {
"value": null
},
Learn more about how to deploy with a Resource Manager template file and Azure PowerShell. Learn more
about how to use an object as a parameter in a Resource Manager template.
To use Managed Identity for Azure Data Lake Store Gen1 as output sink, you need to provide Access to the
service principal using PowerShell before deploying to Azure. Learn more about how to deploy ADLS Gen1 with
Managed Identity with Resource Manager template.
Command-line tool
Build the project
The NuGet package has a command-line tool called SA.exe. It supports project build and local testing on an
arbitrary machine, which you can use in your continuous integration and continuous delivery process.
The deployment files are placed under the current directory by default. You can specify the output path by using
the following -OutputPath parameter:
Generate a job definition file to use with the Stream Analytics PowerShell API
The arm command takes the job template and job template parameter files generated through build as input.
Then it combines them into a job definition JSON file that can be used with the Stream Analytics PowerShell API.
Example:
Next steps
Quickstart: Create an Azure Stream Analytics cloud job in Visual Studio
Test Stream Analytics queries locally with Visual Studio
Explore Azure Stream Analytics jobs with Visual Studio
Develop .NET Standard user-defined functions for
Azure Stream Analytics Edge jobs (Preview)
4/25/2019 • 5 minutes to read • Edit Online
Azure Stream Analytics offers a SQL -like query language for performing transformations and computations over
streams of event data. There are many built-in functions, but some complex scenarios require additional flexibility.
With .NET Standard user-defined functions (UDF ), you can invoke your own functions written in any .NET
standard language (C#, F#, etc.) to extend the Stream Analytics query language. UDFs allow you to perform
complex math computations, import custom ML models using ML.NET, and use custom imputation logic for
missing data. The UDF feature for Stream Analytics Edge jobs is currently in preview and shouldn't be used in
production workloads.
Overview
Visual Studio tools for Azure Stream Analytics make it easy for you to write UDFs, test your jobs locally (even
offline), and publish your Stream Analytics job to Azure. Once published to Azure, you can deploy your job to IoT
devices using IoT Hub.
There are three ways to implement UDFs:
CodeBehind files in an ASA project
UDF from a local project
An existing package from an Azure storage account
Package path
The format of any UDF package has the path /UserCustomCode/CLR/* . Dynamic Link Libraries (DLLs) and resources
are copied under the /UserCustomCode/CLR/* folder, which helps isolate user DLLs from system and Azure Stream
Analytics DLLs. This package path is used for all functions regardless of the method used to employ them.
long bigint
double double
string nvarchar(max)
dateTime dateTime
struct IRecord
object IRecord
Array<object> IArray
UDF TYPE (C#) AZURE STREAM ANALYTICS TYPE
CodeBehind
You can write user-defined functions in the Script.asql CodeBehind. Visual Studio tools will automatically compile
the CodeBehind file into an assembly file. The assemblies are packaged as a zip file and uploaded to your storage
account when you submit your job to Azure. You can learn how to write a C# UDF using CodeBehind by following
the C# UDF for Stream Analytics Edge jobs tutorial.
Local project
User-defined functions can be written in an assembly that is later referenced in an Azure Stream Analytics query.
This is the recommended option for complex functions that require the full power of a .NET Standard language
beyond its expression language, such as procedural logic or recursion. UDFs from a local project might also be
used when you need to share the function logic across several Azure Stream Analytics queries. Adding UDFs to
your local project gives you the ability to debug and test your functions locally from Visual Studio.
To reference a local project:
1. Create a new class library in your solution.
2. Write the code in your class. Remember that the classes must be defined as public and objects must be defined
as static public.
3. Build your project. The tools will package all the artifacts in the bin folder to a zip file and upload the zip file to
the storage account. For external references, use assembly reference instead of the NuGet package.
4. Reference the new class in your Azure Stream Analytics project.
5. Add a new function in your Azure Stream Analytics project.
6. Configure the assembly path in the job configuration file, JobConfig.json . Set the Assembly Path to Local
Project Reference or CodeBehind.
7. Rebuild both the function project and the Azure Stream Analytics project.
Example
In this example, UDFTest is a C# class library project and ASAEdgeUDFDemo is the Azure Stream Analytics
Edge project, which will reference UDFTest.
1. Build your C# project, which will enable you to add a reference to your C# UDF from the Azure Stream
Analytics query.
2. Add the reference to the C# project in the ASA Edge project. Right-click the References node and choose
Add Reference.
4. You should see the UDFTest listed under References in Solution Explorer.
5. Right click on the Functions folder and choose New Item.
Existing packages
You can author .NET Standard UDFs in any IDE of your choice and invoke them from your Azure Stream Analytics
query. First compile your code and package all the DLLs. The format of the package has the path
/UserCustomCode/CLR/* . Then, upload UserCustomCode.zip to the root of the container in your Azure storage
account.
Once assembly zip packages have been uploaded to your Azure storage account, you can use the functions in
Azure Stream Analytics queries. All you need to do is include the storage information in the Stream Analytics Edge
job configuration. You can't test the function locally with this option because Visual Studio tools will not download
your package. The package path is parsed directly to the service.
To configure the assembly path in the job configuration file, JobConfig.json :
Expand the User-Defined Code Configuration section, and fill out the configuration with the following
suggested values:
SETTING SUGGESTED VALUE
Limitations
The UDF preview currently has the following limitations:
.NET Standard languages can only be used for Azure Stream Analytics on IoT Edge. For cloud jobs, you can
write JavaScript user-defined functions. To learn more, visit the Azure Stream Analytics JavaScript UDF
tutorial.
.NET Standard UDFs can only be authored in Visual Studio and published to Azure. Read-only versions of
.NET Standard UDFs can be viewed under Functions in the Azure portal. Authoring of .NET Standard
functions is not supported in the Azure portal.
The Azure portal query editor shows an error when using .NET Standard UDF in the portal.
Because the custom code shares context with Azure Stream Analytics engine, custom code can't reference
anything that has a conflicting namespace/dll_name with Azure Stream Analytics code. For example, you
can't reference Newtonsoft Json.
Next steps
Tutorial: Write a C# user-defined function for an Azure Stream Analytics Edge job (Preview )
Tutorial: Azure Stream Analytics JavaScript user-defined functions
Use Visual Studio to view Azure Stream Analytics jobs
Test Stream Analytics queries locally with Visual
Studio Code
5/16/2019 • 2 minutes to read • Edit Online
You can use Azure Stream Analytics tools for Visual Studio Code to test your Stream Analytics jobs locally with
sample data.
Use this quickstart to learn how to create a Stream Analytics job using Visual Studio Code.
4. Enter the same input alias that you used in your query.
5. In the LocalInput_DefaultLocalStream.json file, enter the file path where your local data file is located.
6. Return to your query editor, and select Run locally.
Next steps
Create an Azure Stream Analytics cloud job in Visual Studio Code (Preview )
Explore Azure Stream Analytics jobs with Visual Studio Code (Preview )
Continuously integrate and develop with Stream
Analytics CI/CD npm package
5/22/2019 • 2 minutes to read • Edit Online
This article describes how to use the Azure Stream Analytics CI/CD npm package to set up a continuous
integration and deployment process.
When a Stream Analytics Visual Studio Code project builds successfully, it generates the following two Azure
Resource Manager template files under the bin/[Debug/Retail]/Deploy folder:
Resource Manager template file
[ProjectName].JobTemplate.json
[ProjectName].JobTemplate.parameters.json
The default parameters in the parameters.json file are from the settings in your Visual Studio Code project. If you
want to deploy to another environment, replace the parameters accordingly.
NOTE
For all the credentials, the default values are set to null. You are required to set the values before you deploy to the cloud.
"Input_EntryStream_sharedAccessPolicyKey": {
"value": null
},
Learn more about how to deploy with a Resource Manager template file and Azure PowerShell. Learn more about
how to use an object as a parameter in a Resource Manager template.
To use Managed Identity for Azure Data Lake Store Gen1 as output sink, you need to provide Access to the service
principal using PowerShell before deploying to Azure. Learn more about how to deploy ADLS Gen1 with Managed
Identity with Resource Manager template.
Next steps
Quickstart: Create an Azure Stream Analytics cloud job in Visual Studio Code (Preview )
Test Stream Analytics queries locally with Visual Studio Code (Preview )
Explore Azure Stream Analytics with Visual Studio Code (Preview )
Explore Azure Stream Analytics with Visual Studio
Code (Preview)
5/16/2019 • 2 minutes to read • Edit Online
The Azure Stream Analytics for Visual Studio Code extension gives developers a lightweight experience for
managing their Stream Analytics jobs. It can be used on Windows, Mac and Linux. With the Azure Stream
Analytics extension, you can:
Create, start, and stop jobs
Export existing jobs to a local project
List jobs and view job entities
Next steps
Create an Azure Stream Analytics cloud job in Visual Studio Code (Preview )
Troubleshoot input connections
4/11/2019 • 5 minutes to read • Edit Online
This page describes common issues with input connections and how to troubleshoot them.
Enable the diagnostics logs to view the details of the warning. For malformed input events, the execution logs
contain an entry with the message that looks like:
Could not deserialize the input event(s) from resource <blob URI> as json.
NOTE
When the number of readers changes during a job upgrade, transient warnings are written to audit logs. Stream Analytics
jobs automatically recover from these transient issues.
7. When you created the input in the Stream Analytics job to point to the Event Hub, you specified the
consumer group there. $Default is used when none is specified. Once you create a new consumer group,
edit the Event Hub input in the Stream Analytics job and specify the name of the new consumer group.
SELECT bar
INTO output2
FROM inputEventHub
…
WITH data AS (
SELECT * FROM inputEventHub
)
SELECT foo
INTO output1
FROM data
SELECT bar
INTO output2
FROM data
…
Get help
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Troubleshoot Azure Stream Analytics outputs
12/10/2018 • 5 minutes to read • Edit Online
This page describes common issues with output connections and how to troubleshoot and address them.
Get help
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Troubleshoot Azure Stream Analytics queries
12/10/2018 • 3 minutes to read • Edit Online
This article describes common issues with developing Stream Analytics queries and how to troubleshoot them.
In this situation, you can add a few extra SELECT INTO statements to "log" the intermediate JOIN results and the
data that's read from the input.
In this example, we've added two new "temporary outputs." They can be any sink you like. Here we use Azure
Storage as an example:
temp2 table
As you can see, temp1 and temp2 both have data, and the name column is populated correctly in temp2. However,
because there is still no data in output, something is wrong:
By sampling the data, you can be almost certain that the issue is with the second JOIN. You can download the
reference data from the blob and take a look:
As you can see, the format of the GUID in this reference data is different from the format of the [from] column in
temp2. That’s why the data didn’t arrive in output1 as expected.
You can fix the data format, upload it to reference blob, and try again:
This time, the data in the output is formatted and populated as expected.
Get help
For further assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Troubleshoot Azure Stream Analytics by using
diagnostics logs
6/4/2019 • 6 minutes to read • Edit Online
Occasionally, an Azure Stream Analytics job unexpectedly stops processing. It's important to be able to
troubleshoot this kind of event. Failures can be caused by an unexpected query result, by connectivity to devices,
or by an unexpected service outage. The diagnostics logs in Stream Analytics can help you identify the cause of
issues when they occur and reduce recovery time.
Log types
Stream Analytics offers two types of logs:
Activity logs (always on), which give insights into operations performed on jobs.
Diagnostics logs (configurable), which provide richer insights into everything that happens with a job.
Diagnostics logs start when the job is created and end when the job is deleted. They cover events when the
job is updated and while it’s running.
NOTE
You can use services like Azure Storage, Azure Event Hubs, and Azure Monitor logs to analyze nonconforming data. You are
charged based on the pricing model for those services.
NOTE
This article was recently updated to use the term Azure Monitor logs instead of Log Analytics. Log data is still stored in a
Log Analytics workspace and is still collected and analyzed by the same Log Analytics service. We are updating the
terminology to better reflect the role of logs in Azure Monitor. See Azure Monitor terminology changes for details.
2. You can see a list of operations that have been performed. Any operation that caused your job to fail has a
red info bubble.
3. Click an operation to see its summary view. Information here is often limited. To learn more details about
the operation, click JSON.
4. Scroll down to the Properties section of the JSON, which provides details of the error that caused the
failed operation. In this example, the failure was due to a runtime error from out of bound latitude values.
Discrepancy in the data that is processed by a Stream Analytics job causes a data error. You can learn about
different input and output data errors and why they occur.
5. You can take corrective actions based on the error message in JSON. In this example, checks to ensure
latitude value is between -90 degrees and 90 degrees need to be added to the query.
6. If the error message in the Activity logs isn’t helpful in identifying root cause, enable diagnostic logs and
use Azure Monitor logs.
3. When your Stream Analytics job starts, diagnostic logs are routed to your Log Analytics workspace.
Navigate to the Log Analytics workspace and choose Logs under the General section.
4. You can write your own query to search for terms, identify trends, analyze patterns, and provide insights
based on your data. For example, you can write a query to filter only diagnostic logs that have the message
“The streaming job failed.” Diagnostic logs from Azure Stream Analytics are stored in the
AzureDiagnostics table.
5. When you have a query that is searching for the right logs, save it by selecting Save and provide a Name
and Category. You can then create an alert by selecting New alert rule. Next, specify the alert condition.
Select Condition and enter the threshold value and the frequency at which this custom log search is
evaluated.
6. Choose the action group and specify alert details, like name and description, before you can create the alert
rule. You can route the diagnostic logs of various jobs to the same Log Analytics workspace. This allows you
to set up alerts once that work across all jobs.
NAME DESCRIPTION
resourceId ID of the resource that the operation took place on, in upper
case. It includes the subscription ID, the resource group, and
the job name. For example, /SUBSCRIPTIONS/6503D296-
DAC1-4449-9B03-
609A1F4A1C87/RESOURCEGROUPS/MY-RESOURCE-
GROUP/PROVIDERS/MICROSOFT.STREAMANALYTICS/ST
REAMINGJOBS/MYSTREAMINGJOB.
NAME DESCRIPTION
Source Name of the job input or output where the error occurred.
Depending on the operationName value, data errors have the following schema:
Serialize events occur during event read operations. They occur when the data at the input does not
satisfy the query schema for one of these reasons:
Type mismatch during event (de)serialize: Identifies the field that's causing the error.
Cannot read an event, invalid serialization: Lists information about the location in the input data
where the error occurred. Includes blob name for blob input, offset, and a sample of the data.
Send events occur during write operations. They identify the streaming event that caused the error.
Generic events
Generic events cover everything else.
NAME DESCRIPTION
Correlation ID GUID that uniquely identifies the job execution. All execution
log entries from the time the job starts until the job stops
have the same Correlation ID value.
Next steps
Introduction to Stream Analytics
Get started with Stream Analytics
Scale Stream Analytics jobs
Stream Analytics query language reference
Stream Analytics management REST API reference
Azure Stream Analytics data errors
5/14/2019 • 5 minutes to read • Edit Online
When there's a discrepancy in the data that is processed by an Azure Stream Analytics job, Stream Analytics sends
a data error event to the diagnostic logs. Stream Analytics writes detailed information and example events, to its
diagnostic logs when data errors occur. A summary of this information is also provided through portal notifications
for some errors.
This article outlines the different error types, causes, and diagnostic log details for input and output data errors.
{
"Source": "InputTelemetryData",
"Type": "DataError",
"DataErrorType": "InputDeserializerError.InvalidData",
"BriefMessage": "Json input stream should either be an array of objects or line separated objects. Found
token type: Integer",
"Message": "Input Message Id: https:\\/\\/exampleBlob.blob.core.windows.net\\/inputfolder\\/csv.txt Error:
Json input stream should either be an array of objects or line separated objects. Found token type: Integer",
"ExampleEvents": "[\"1,2\\\\u000d\\\\u000a3,4\\\\u000d\\\\u000a5,6\"]",
"FromTimestamp": "2019-03-22T22:34:18.5664937Z",
"ToTimestamp": "2019-03-22T22:34:18.5965248Z",
"EventCount": 1
}
InputDeserializerError.InvalidHeader
Cause: The header of input data is invalid. For example, a CSV has columns with duplicate names.
Portal notification provided: Yes
Diagnostic log level: Warning
Log details
Input message identifier.
Actual payload up to few kilobytes.
Error message
InputDeserializerError.MissingColumns
Cause: The input columns defined with CREATE TABLE or through TIMESTAMP BY doesn't exist.
Portal notification provided: Yes
Diagnostic log level: Warning
Log details
Input message identifier.
Names of the columns that are missing.
Actual payload up to a few kilobytes.
Error messages
"Message": "Missing fields specified in query or in create table. Fields expected:ColumnA Fields found:ColumnB"
InputDeserializerError.TypeConversionError
Cause: Unable to convert the input to the type specified in the CREATE TABLE statement.
Portal notification provided: Yes
Diagnostic log level: Warning
Log details
Input message identifier.
Name of the column and expected type.
Error messages
InputDeserializerError.InvalidData
Cause: Input data is not in the right format. For example, the input isn't valid JSON.
Portal notification provided: Yes
Diagnostic log level: Warning
Log details
Input message identifier.
Actual payload up to few kilobytes.
Error messages
"BriefMessage": "Json input stream should either be an array of objects or line separated objects. Found token
type: String"
"Message": "Json input stream should either be an array of objects or line separated objects. Found token type:
String"
InvalidInputTimeStamp
Cause: The value of the TIMESTAMP BY expression can't be converted to datetime.
Portal notification provided: Yes
Diagnostic log level: Warning
Log details
Input message identifier.
Error message.
Actual payload up to few kilobytes.
Error message
InvalidInputTimeStampKey
Cause: The value of TIMESTAMP BY OVER timestampColumn is NULL.
Portal notification provided: Yes
Diagnostic log level: Warning
Log details
The actual payload up to few kilobytes.
Error message
LateInputEvent
Cause: The difference between application time and arrival time is greater than late arrival tolerance window.
Portal notification provided: No
Diagnostic log level: Information
Log details
Application time and arrival time.
Actual payload up to few kilobytes.
Error message
"BriefMessage": "Input event with application timestamp '2019-01-01' and arrival time '2019-01-02' was sent
later than configured tolerance."
EarlyInputEvent
Cause: The difference between Application time and Arrival time is greater than 5 minutes.
Portal notification provided: No
Diagnostic log level: Information
Log details
Application time and arrival time.
Actual payload up to few kilobytes.
Error message
"BriefMessage": "Input event arrival time '2019-01-01' is earlier than input event application timestamp '2019-
01-02' by more than 5 minutes."
OutOfOrderEvent
Cause: Event is considered out of order according to the out of order tolerance window defined.
Portal notification provided: No
Diagnostic log level: Information
Log details
Actual payload up to few kilobytes.
Error message
"Message": "The output record does not contain primary key property: [deviceId] Ensure the query output
contains the column [deviceId] with a unique non-empty string less than '255' characters."
OutputDataConversionError.ColumnNameInvalid
Cause: The column value doesn't conform with the output. For example, the column name isn't a valid Azure
table column.
Portal notification provided: Yes
Diagnostic log level: Warning
Log details
Name of the column and either record identifier or part of the record.
Error message
"Message": "Invalid property name #deviceIdValue. Please refer MSDN for Azure table property naming
convention."
OutputDataConversionError.TypeConversionError
Cause: A column can't be converted to a valid type in the output. For example, the value of column is
incompatible with constraints or type defined in SQL table.
Portal notification provided: Yes
Diagnostic log level: Warning
Log details
Name of the column.
Either record identifier or part of the record.
Error message
"Message": "The column [id] value null or its type is invalid. Ensure to provide a unique non-empty string less
than '255' characters."
OutputDataConversionError.RecordExceededSizeLimit
Cause: The value of the message is greater than the supported output size. For example, a record is larger than
1 MB for an Event Hub output.
Portal notification provided: Yes
Diagnostic log level: Warning
Log details
Either record identifier or part of the record.
Error message
"BriefMessage": "Single output event exceeds the maximum message size limit allowed (262144 bytes) by Event
Hub."
OutputDataConversionError.DuplicateKey
Cause: A record already contains a column with the same name as a System column. For example, CosmosDB
output with a column named ID when ID column is to a different column.
Portal notification provided: Yes
Diagnostic log level: Warning
Log details
Name of the column.
Either record identifier or part of the record.
Next steps
Troubleshoot Azure Stream Analytics by using diagnostics logs
Understand Stream Analytics job monitoring and how to monitor queries
Performing sentiment analysis by using Azure Stream
Analytics and Azure Machine Learning
3/15/2019 • 7 minutes to read • Edit Online
This article describes how to quickly set up a simple Azure Stream Analytics job that integrates Azure Machine
Learning. You use a Machine Learning sentiment analytics model from the Cortana Intelligence Gallery to analyze
streaming text data and determine the sentiment score in real time. Using the Cortana Intelligence Suite lets you
accomplish this task without worrying about the intricacies of building a sentiment analytics model.
You can apply what you learn from this article to scenarios such as these:
Analyzing real-time sentiment on streaming Twitter data.
Analyzing records of customer chats with support staff.
Evaluating comments on forums, blogs, and videos.
Many other real-time, predictive scoring scenarios.
In a real-world scenario, you would get the data directly from a Twitter data stream. To simplify the tutorial, it's
written so that the Streaming Analytics job gets tweets from a CSV file in Azure Blob storage. You can create your
own CSV file, or you can use a sample CSV file, as shown in the following image:
The Streaming Analytics job that you create applies the sentiment analytics model as a user-defined function
(UDF ) on the sample text data from the blob store. The output (the result of the sentiment analysis) is written to
the same blob store in a different CSV file.
The following figure demonstrates this configuration. As noted, for a more realistic scenario, you can replace blob
storage with streaming Twitter data from an Azure Event Hubs input. Additionally, you could build a Microsoft
Power BI real-time visualization of the aggregate sentiment.
Prerequisites
Before you start, make sure you have the following:
An active Azure subscription.
A CSV file with some data in it. You can download the file shown earlier from GitHub, or you can create your
own file. For this article, it is assumed that you're using the file from GitHub.
At a high level, to complete the tasks demonstrated in this article, you do the following:
1. Create an Azure storage account and a blob storage container, and upload a CSV -formatted input file to the
container.
2. Add a sentiment analytics model from the Cortana Intelligence Gallery to your Azure Machine Learning
workspace and deploy this model as a web service in the Machine Learning workspace.
3. Create a Stream Analytics job that calls this web service as a function in order to determine sentiment for the
text input.
4. Start the Stream Analytics job and check the output.
6. In the Containers blade, select the new container, which opens the blade for that container.
7. Click Upload.
8. In the Upload blob blade, upload the sampleinput.csv file that you downloaded earlier. For Blob type,
select Block blob and set the block size to 4 MB, which is sufficient for this tutorial.
9. Click the Upload button at the bottom of the blade.
Add the sentiment analytics model from the Cortana Intelligence
Gallery
Now that the sample data is in a blob, you can enable the sentiment analysis model in Cortana Intelligence
Gallery.
1. Go to the predictive sentiment analytics model page in the Cortana Intelligence Gallery.
2. Click Open in Studio.
5. After the process has run successfully, select Deploy Web Service at the bottom of the page.
6. To validate that the sentiment analytics model is ready to use, click the Test button. Provide text input such
as "I love Microsoft".
If the test works, you see a result similar to the following example:
7. In the Apps column, click the Excel 2010 or earlier workbook link to download an Excel workbook. The
workbook contains the API key and the URL that you need later to set up the Stream Analytics job.
Create a Stream Analytics job that uses the Machine Learning model
You can now create a Stream Analytics job that reads the sample tweets from the CSV file in blob storage.
Create the job
1. Go to the Azure portal.
2. Click Create a resource > Internet of Things > Stream Analytics job.
3. Name the job azure-sa-ml-demo , specify a subscription, specify an existing resource group or create a new
one, and select the location for the job.
Configure the job input
The job gets its input from the CSV file that you uploaded earlier to blob storage.
1. After the job has been created, under Job Topology in the job blade, click the Inputs option.
2. In the Inputs blade, click Add Stream Input >Blob storage
3. Fill out the Blob Storage blade with these values:
FIELD VALUE
Input alias Use the name datainput and select Select blob
storage from your subscription
FIELD VALUE
Output alias Use the name datamloutput and select Select blob
storage from your subscription
FIELD VALUE
Function alias Use the name sentiment and select Provide Azure
Machine Learning function settings manually which
gives you an option to enter the URL and key.
WITH sentiment AS (
SELECT text, sentiment1(text) as result
FROM datainput
)
The query invokes the function you created earlier ( sentiment ) in order to perform sentiment analysis on
each tweet in the input.
4. Click Save to save the query.
3. Open the generated CSV file. You see something like the following example:
View metrics
You also can view Azure Machine Learning function-related metrics. The following function-related metrics are
displayed in the Monitoring box in the job blade:
Function Requests indicates the number of requests sent to a Machine Learning web service.
Function Events indicates the number of events in the request. By default, each request to a Machine
Learning web service contains up to 1,000 events.
Next steps
Introduction to Azure Stream Analytics
Azure Stream Analytics Query Language Reference
Integrate REST API and Machine Learning
Azure Stream Analytics Management REST API Reference
Anomaly detection in Azure Stream Analytics
6/4/2019 • 3 minutes to read • Edit Online
Available in both the cloud and Azure IoT Edge, Azure Stream Analytics offers built-in machine learning based
anomaly detection capabilities that can be used to monitor the two most commonly occurring anomalies:
temporary and persistent. With the AnomalyDetection_SpikeAndDip and AnomalyDetection_ChangePoint
functions, you can perform anomaly detection directly in your Stream Analytics job.
The machine learning models assume a uniformly sampled time series. If the time series is not uniform, you may
insert an aggregation step with a tumbling window prior to calling anomaly detection.
The machine learning operations do not support seasonality trends or multi-variate correlations.
In the same sliding window, if a second spike is smaller than the first one, the computed score for the smaller spike
is probably not significant enough compared to the score for the first spike within the confidence level specified.
You can try decreasing the model's confidence level setting to catch such anomalies. However, if you start to get
too many alerts, you can use a higher confidence interval.
The following example query assumes a uniform input rate of one event per second in a 2-minute sliding window
with a history of 120 events. The final SELECT statement extracts and outputs the score and anomaly status with a
confidence level of 95%.
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME AS time,
CAST(temperature AS float) AS temp,
AnomalyDetection_SpikeAndDip(CAST(temperature AS float), 95, 120, 'spikesanddips')
OVER(LIMIT DURATION(second, 120)) AS SpikeAndDipScores
FROM input
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'Score') AS float) AS
SpikeAndDipScore,
CAST(GetRecordPropertyValue(SpikeAndDipScores, 'IsAnomaly') AS bigint) AS
IsSpikeAndDipAnomaly
INTO output
FROM AnomalyDetectionStep
Change point
Persistent anomalies in a time series event stream are changes in the distribution of values in the event stream,
like level changes and trends. In Stream Analytics, such anomalies are detected using the Machine Learning based
AnomalyDetection_ChangePoint operator.
Persistent changes last much longer than spikes and dips and could indicate catastrophic event(s). Persistent
changes are not usually visible to the naked eye, but can be detected with the AnomalyDetection_ChangePoint
operator.
The following image is an example of a level change:
WITH AnomalyDetectionStep AS
(
SELECT
EVENTENQUEUEDUTCTIME AS time,
CAST(temperature AS float) AS temp,
AnomalyDetection_ChangePoint(CAST(temperature AS float), 80, 1200)
OVER(LIMIT DURATION(minute, 20)) AS ChangePointScores
FROM input
)
SELECT
time,
temp,
CAST(GetRecordPropertyValue(ChangePointScores, 'Score') AS float) AS
ChangePointScore,
CAST(GetRecordPropertyValue(ChangePointScores, 'IsAnomaly') AS bigint) AS
IsChangePointAnomaly
INTO output
FROM AnomalyDetectionStep
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Machine Learning integration in Stream Analytics
(Preview)
5/8/2019 • 4 minutes to read • Edit Online
Stream Analytics supports user-defined functions that call out to Azure Machine Learning endpoints. REST API
support for this feature is detailed in the Stream Analytics REST API library. This article provides supplemental
information needed for successful implementation of this capability in Stream Analytics. A tutorial has also been
posted and is available here.
Configure a Stream Analytics and Machine Learning UDF via REST API
By using REST APIs you may configure your job to call Azure Machine Language functions. The steps are as
follows:
1. Create a Stream Analytics job
2. Define an input
3. Define an output
4. Create a user-defined function (UDF )
5. Write a Stream Analytics transformation that calls the UDF
6. Start the job
{
"name": "newudf",
"properties": {
"type": "Scalar",
"properties": {
"binding": {
"type": "Microsoft.MachineLearning/WebService",
"properties": {
"endpoint":
"https://ussouthcentral.services.azureml.net/workspaces/f80d5d7a77fb4b46bf2a30c63c078dca/services/b7be5e40fd19
4258796fb402c1958eaf/execute ",
"apiKey": "replacekeyhere"
}
}
}
}
}
POST :
/subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjo
bs/<streamingjobName>/functions/<udfName>/RetrieveDefaultDefinition?api-version=<apiVersion>
{
"bindingType": "Microsoft.MachineLearning/WebService",
"bindingRetrievalProperties": {
"executeEndpoint": null,
"udfType": "Scalar"
}
}
PATCH :
/subscriptions/<subscriptionId>/resourceGroups/<resourceGroup>/providers/Microsoft.StreamAnalytics/streamingjo
bs/<streamingjobName>/functions/<udfName>?api-version=<apiVersion>
{
"name": "transformation",
"properties": {
"streamingUnits": null,
"query": "select *,scoreTweet(Tweet) TweetSentiment into blobOutput from blobInput"
}
}
Get help
For further assistance, try our Azure Stream Analytics forum
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference
Data-driven debugging by using the job diagram
4/17/2018 • 3 minutes to read • Edit Online
The job diagram on the Monitoring blade in the Azure portal can help you visualize your job pipeline. It shows
inputs, outputs, and query steps. You can use the job diagram to examine the metrics for each step, to more quickly
isolate the source of a problem when you troubleshoot issues.
Select each query step to see the corresponding section in a query editing pane. A metric chart for the step is
displayed in a lower pane on the page.
To see the partitions of the Azure Event Hubs input, select . . . A context menu appears. You also can see the input
merger.
To see the metric chart for only a single partition, select the partition node. The metrics are shown at the bottom of
the page.
To see the metrics chart for a merger, select the merger node. The following chart shows that no events were
dropped or adjusted.
To see the details of the metric value and time, point to the chart.
Troubleshoot by using metrics
The QueryLastProcessedTime metric indicates when a specific step received data. By looking at the topology,
you can work backward from the output processor to see which step is not receiving data. If a step is not getting
data, go to the query step just before it. Check whether the preceding query step has a time window, and if enough
time has passed for it to output data. (Note that time windows are snapped to the hour.)
If the preceding query step is an input processor, use the input metrics to help answer the following targeted
questions. They can help you determine whether a job is getting data from its input sources. If the query is
partitioned, examine each partition.
How much data is being read?
InputEventsSourcesTotal is the number of data units read. For example, the number of blobs.
InputEventsTotal is the number of events read. This metric is available per partition.
InputEventsInBytesTotal is the number of bytes read.
InputEventsLastArrivalTime is updated with every received event's enqueued time.
Is time moving forward? If actual events are read, punctuation might not be issued.
InputEventsLastPunctuationTime indicates when a punctuation was issued to keep time moving forward. If
punctuation is not issued, data flow can get blocked.
Are there any errors in the input?
InputEventsEventDataNullTotal is a count of events that have null data.
InputEventsSerializerErrorsTotal is a count of events that could not be deserialized correctly.
InputEventsDegradedTotal is a count of events that had an issue other than with deserialization.
Are events being dropped or adjusted?
InputEventsEarlyTotal is the number of events that have an application timestamp before the high
watermark.
InputEventsLateTotal is the number of events that have an application timestamp after the high watermark.
InputEventsDroppedBeforeApplicationStartTimeTotal is the number events dropped before the job start
time.
Are we falling behind in reading data?
Input Events Backlogged (Total) tells you how many more messages need to be read for Event Hubs and
Azure IoT Hub inputs. When this number is greater than 0, it means your job can't process the data as fast as it
is coming in. In this case you may need to increase the number of Streaming Units and/or make sure your job
can be parallelized. You can see more info on this on the query parallelization page.
Get help
For additional assistance, try our Azure Stream Analytics forum.
Next steps
Introduction to Stream Analytics
Get started with Stream Analytics
Scale Stream Analytics jobs
Stream Analytics query language reference
Stream Analytics management REST API reference
Parse JSON and Avro data in Azure Stream Analytics
6/4/2019 • 2 minutes to read • Edit Online
Azure Stream Analytics supports processing events in CSV, JSON, and Avro data formats. Both JSON and Avro
data can contain complex types such as nested objects (records) and arrays.
Examples
Select array element at a specified index (selecting the first array element):
SELECT
GetArrayElement(arrayField, 0) AS firstElement
FROM input
SELECT
GetArrayLength(arrayField) AS arrayLength
FROM input
Select all array element as individual events. The APPLY operator together with the GetArrayElements built-in
function extracts all array elements as individual events:
SELECT
arrayElement.ArrayIndex,
arrayElement.ArrayValue
FROM input as event
CROSS APPLY GetArrayElements(event.arrayField) AS arrayElement
Examples
Use dot notation (.) to access nested fields. For example, this query selects the Latitude and Longitude coordinates
under the Location property in the preceding JSON data:
SELECT
DeviceID,
Location.Lat,
Location.Long
FROM input
Use the GetRecordPropertyValue function if the property name is unknown. For example, imagine a sample data
stream needs to be joined with reference data containing thresholds for each device sensor:
{
"DeviceId" : "12345",
"SensorName" : "Temperature",
"Value" : 75
}
SELECT
input.DeviceID,
thresholds.SensorName
FROM input
JOIN thresholds
ON
input.DeviceId = thresholds.DeviceId
WHERE
GetRecordPropertyValue(input.SensorReadings, thresholds.SensorName) > thresholds.Value
To convert record fields into separate events, use the APPLY operator together with the GetRecordProperties
function. For example, to convert a sample stream into a stream of events with individual sensor readings, this
query could be used:
SELECT
event.DeviceID,
sensorReading.PropertyName,
sensorReading.PropertyValue
FROM input as event
CROSS APPLY GetRecordProperties(event.SensorReadings) AS sensorReading
You can select all the properties of a nested record using '*' wildcard. Consider the following example:
SELECT input.SensorReadings.*
FROM input
{
"Temperature" : 80,
"Humidity" : 70,
"CustomSensor01" : 5,
"CustomSensor022" : 99
}
See Also
Data Types in Azure Stream Analytics
Azure Stream Analytics preview features
5/30/2019 • 2 minutes to read • Edit Online
This article summarizes all the features currently in preview for Azure Stream Analytics. Using preview features in
a production environment isn't recommended.
Public previews
The following features are in public preview. You can take advantage of these features today, but don't use them in
your production environment.
Visual Studio Code for Azure Stream Analytics (Released May 2019)
Azure Stream Analytics jobs can be authored in Visual Studio Code. See our VS Code getting started tutorial.
Anomaly Detection
Azure Stream Analytics introduces new machine learning models with support for spike and dips detection in
addition to bi-directional, slow positive, and slow negative trends detection. For more information, visit Anomaly
detection in Azure Stream Analytics.
Integration with Azure Machine Learning
You can scale Stream Analytics jobs with Machine Learning (ML ) functions. To learn more about how you can use
ML functions in your Stream Analytics job, visit Scale your Stream Analytics job with Azure Machine Learning
functions. Check out a real-world scenario with Performing sentiment analysis by using Azure Stream Analytics
and Azure Machine Learning.
JavaScript user-defined aggregate
Azure Stream Analytics supports user-defined aggregates (UDA) written in JavaScript, which enable you to
implement complex stateful business logic. Learn how to use UDAs from the Azure Stream Analytics JavaScript
user-defined aggregates documentation.
Live data testing in Visual Studio
Visual Studio tools for Azure Stream Analytics enhance the local testing feature that allows you to test you queries
against live event streams from cloud sources such as Event Hub or IoT hub. Learn how to Test live data locally
using Azure Stream Analytics tools for Visual Studio.
.NET user-defined functions on IoT Edge
With .NET standard user-defined functions, you can run .NET Standard code as part of your streaming pipeline.
You can create simple C# classes or import full project and libraries. Full authoring and debugging experience is
supported in Visual Studio. For more information, visit Develop .NET Standard user-defined functions for Azure
Stream Analytics Edge jobs.
Other previews
The following features are also available in preview.
C# custom deserializer for Azure Stream Analytics on IoT Edge and Cloud (Announced May 2019)
Developers can implement custom deserializers in C# to deserialize events received by Azure Stream Analytics.
Examples of formats that can be deserialized include Parquet, Protobuf, XML, or any binary format. Sign up for this
preview here.
Parquet Output (Announced May 2019)
Parquet is a columnar format enabling efficient big data processing. By outputting data in Parquet format in a data
lake, you can take advantage of Azure Stream Analytics to power large scale streaming ETL and run batch
processing, train machine learning algorithms or run interactive queries on your historical data. Sign up for this
preview here.
One -click integration with Event Hubs (Announced May 2019)
With this integration, you will now be able to visualize incoming data and start to write a Stream Analytics query
with one click from the Event Hub portal. Once your query is ready, you will be able to productize it in few clicks
and start to get real-time insights. This will significantly reduce the time and cost to develop real-time analytics
solutions. Sign up for this preview here.
Support for Azure Stack (Announced May 2019)
This feature enabled on the Azure IoT Edge runtime, leverages custom Azure Stack features, such as native support
for local inputs and outputs running on Azure Stack (for example Event Hubs, IoT Hub, Blob Storage). This new
integration enables you to build hybrid architectures that can analyze your data close to where it is generated,
lowering latency and maximizing insights. Sign up for this preview here.
Reference architecture: Real-time event processing
with Microsoft Azure Stream Analytics
12/17/2018 • 2 minutes to read • Edit Online
The reference architecture for real-time event processing with Azure Stream Analytics is intended to provide a
generic blueprint for deploying a real-time platform as a service (PaaS ) stream-processing solution with Microsoft
Azure.
Summary
Traditionally, analytics solutions have been based on capabilities such as ETL (extract, transform, load) and data
warehousing, where data is stored prior to analysis. Changing requirements, including more rapidly arriving data,
are pushing this existing model to the limit. The ability to analyze data within moving streams prior to storage is
one solution, and while it is not a new capability, the approach has not been widely adopted across all industry
verticals.
Microsoft Azure provides an extensive catalog of analytics technologies that are capable of supporting an array of
different solution scenarios and requirements. Selecting which Azure services to deploy for an end-to-end solution
can be a challenge given the breadth of offerings. This paper is designed to describe the capabilities and
interoperation of the various Azure services that support an event-streaming solution. It also explains some of the
scenarios in which customers can benefit from this type of approach.
Contents
Executive Summary
Introduction to Real-Time Analytics
Value Proposition of Real-Time Data in Azure
Common Scenarios for Real-Time Analytics
Architecture and Components
Data Sources
Data-Integration Layer
Real-time Analytics Layer
Data Storage Layer
Presentation / Consumption Layer
Conclusion
Author: Charles Feddersen, Solution Architect, Data Insights Center of Excellence, Microsoft Corporation
Published: January 2015
Revision: 1.0
Download: Real-Time Event Processing with Microsoft Azure Stream Analytics
Get help
For further assistance, try the Azure Stream Analytics forum
Next steps
Introduction to Azure Stream Analytics
Get started using Azure Stream Analytics
Scale Azure Stream Analytics jobs
Azure Stream Analytics Query Language Reference
Azure Stream Analytics Management REST API Reference