KPI Development (Java)
Analytic Development in Java
Use the Operations Performance Management analytic management service to build your computation logic and artifacts. It includes runtime services to deploy to the analytic framework.
As a analytic developer, you can create analytic definitions to read data coming from a specific input data source and write the analytic calculations into a persistent data store. The default persistent data store for the current analytic management services is the APM time series.
The computation artifact includes implementation logic for computing the key performance indicators using data from defined input data sources.
Download and Explore Sample Java Spark Analytic
As an analytic developer, you can use the analytics sample package to explore and understand a starter Java package containing sample analytic code. The following information will help you understand and develop your computation logic in Java.
Before You Begin
You have downloaded and set up your OPM Sandbox Docker or environment to enable the development of the analytic in Java.
Procedure
What To Do Next
Understand the Computation Logic in Java
This is a reference document for developers who want to understand the expected structure of the computation logic in Java.
After you download the Java sample code, you can start developing the custom logic for the your KPI.
Find the complete implementation example code at Andromeda_Simple_KPIs/JavaTimes10/src/main/java/com/ge/opm/kpi/SampleAnalytic.java
OPM Data Providers
OPM data providers construct the incoming data into collections of data tables that you can manipulate using filters and queries.
IRuntimeDataset<SQLContext> assetDS = (IRuntimeDataset)inputDatasets.get("assetReadDS");
IRuntimeDataset<SQLContext> timeseriesDS = inputDatasets.get("timeseriesReadDS");
In the above code assetReadDS
and timeseriesReadDS
are the input data source provider names used in the KPI definition.
Getting specific input data into data frames
OPM data providers use data frames to construct the incoming and outgoing data. These data frames enable you to use simple query statements to get the required data for your computation. You can also perform simple join operations to filter for specific data to run the computation. The computed KPIs and tag data are produced as data frames to write back into the corresponding output data providers.
IRuntimeDataset tsInput = (IRuntimeDataset)inputDatasets.get("timeseriesReadDS");
IRuntimeDataset configDS = (IRuntimeDataset)inputDatasets.get("configDS");
|tag |timestamp |value |quality|
+--------------------+-------------+-------+-------+
|OO_Tag_Pressure_ID84|1478764800000|300.555|3 |
|OO_Tag_Pressure_ID84|1478854800000|301.666|3 |
|OO_Tag_Pressure_ID84|1478948400000|401.001|3 |
|OO_Tag_Pressure_ID83|1478764800000|200.555|3 |
|OO_Tag_Pressure_ID83|1478854800000|201.666|3 |
|OO_Tag_Pressure_ID83|1478948400000|201.001|3 |
+--------------------+-------------+-------+-------+
This data frame contains the input, constants, and output parameters mapped to the asset tags.
|MappingType |MappingKey |MappingValue |AssetSourceKey |
+--------------+------------+-------------------------------------------------+---------------------+
|InputMappings |temperature |OO_Tag_Pressure_ID84 |OO-CA-SIMUL-ASSET-ID0|
|InputMappings |pressure |OO_Tag_Pressure_ID83 |OO-CA-SIMUL-ASSET-ID0|
|Constants |threshold |100 |null |
|OutputMappings|availability|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_availability|OO-CA-SIMUL-ASSET-ID0|
|OutputMappings|power |OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_power |OO-CA-SIMUL-ASSET-ID0|
|OutputMappings|heatrate |OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_heatrate |OO-CA-SIMUL-ASSET-ID0|
+--------------+------------+-------------------------------------------------+---------------------+
This data frame contains the tag data with attributes.
+--------------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+
| sourceKey| name| type| tagSourceKey| tagName|tagUnit| attributes| tagAttributes|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+
|Analytics-CA-ASSE...|Analytics CA Asse...|/assetTypes/01b15...|Analytics-CA-ASSE...|Analytics Asset T...| psi|[[Boolean,Wrapped...|[[Boolean,Wrapped...|
|Analytics-CA-ASSE...|Analytics CA Asse...|/assetTypes/01b15...|Analytics-CA-ASSE...|Analytics Asset T...| psi|[[Boolean,Wrapped...|[[Boolean,Wrapped...|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+
Saving Filtered Data to the Data Frame
You can perform simple join operations to combine the contents of two or more data sets into a single data frame. Save the select query to a string (for example, String outputTag. In the example snippet below, we are show contents of two datasets: timeseriesDS and ConfiDS.
String outputTag = ((SQLContext)configDS.getContext()).sql("select MappingValue from `" + configDS.getName()
+ "` where MappingType='OutputMappings'").head().getString(0);
String inputTag = ((SQLContext)configDS.getContext()).sql("select MappingValue from `" + configDS.getName()
+ "` where MappingType='InputMappings'").head().getString(0);
Dataset tsDF = ((SQLContext)tsInput.getContext()).sql("select * from `" + tsInput.getName() + "` where tag='" + inputTag + "'");
Querying with Attributes
You can perform simple join operations to combine the contents of two or more data sets into a single data frame. You can save the select query to a string (for example, String assetFilterWithAttributes. In the example snippet below, we are show contents of two datasets: timeseriesDS and ConfiDS.
String assetFilterWithAttributes = "select '" + outputTag + "' as tag, s.timestamp as timestamp, "
+ "s.value*2 as value, "
+ "s.quality as quality "
+ "from `" + assetDS.getName()
+ "` a join `" + timeseriesDS.getName()
+ "` s on a.tagSourceKey = s.tag "
+ "where s.tag='" + inputTag + "'"
+ "and a.attributes."+attributeName+".value > 2000000000";
Logging Information
Use LOGGER.info to log any information related to the KPI computation such as input, output and joined data frame schema and values, constant definitions and values, and log section labels as text. This would help troubleshoot errors after the KPI job execution.
LOGGER.info(" HEATRATE DATAFRAME @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
LOGGER.info(heatRateDf.schema().treeString());
LOGGER.info(heatRateDf.showString(100, false));
LOGGER.info(" POWER DATAFRAME @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
LOGGER.info(powerDf.schema().treeString());
LOGGER.info(powerDf.showString(100, false));
Writing Time-series Results to the Data Frame
After performing your computation, you would write the results to the output data source provider from your result data frame.
Dataset outputDf = null;
String analyticSQL = null;
analyticSQL = "select '" + outputTag + "' as tag, timestamp, value*10 as value, quality from `tsFiltered`";
outputDf = ((SQLContext)tsInput.getContext()).sql(analyticSQL).cache();
outputs.put("timeseriesWriteDS", outputDf);
return outputs;
|tag |value |quality|timestamp |
+-------------------------------------------------+------------------+-------+-------------+
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_heatrate |4349.443333333333 |3 |1480637708419|
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_power |2174.7216666666664|3 |1480637708419|
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_availability|0.0 |3 |1480637708419|
+-------------------------------------------------+------------------+-------+-------------+
Package the Java Analytics
Package the Java Analytics to upload it to the analytics catalog for deployment to the Spark runtime.
POM Dependencies
<dependencies>
<dependency>
<groupId>com.ge.arf</groupId>
<artifactId>arf-runtime-api</artifactId>
<version>1.0.0-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>${spark.sql.version}</version>
</dependency>
<dependency>
Directory Structure
To package the analytic, you should have something like the following directory structure:
pom.xml
src
main
java
com
ge
opm
kpi
SampleAnalytic.java
resources
META-INF
services
com.ge.arf.rt.IAnaltyics
Package
com.ge.opm.kpi.SampleAnalytic
This allows the service loader in the runtime framework to obtain your analytic implementation, as long as it implements the IAnalytics
interface.
Use the mvn clean install --settings mvnsettings.xml
command to build your analytic .jar file. Make sure you point to the settings.xml in the sample analytic folder. The location of the settings.xml is the same as the POM file location.
Zip the .jar file to complete the packaging process.
Simple Java Analytic
This reference document describes how to develop, build, and deploy a simple Java Analytic on Spark runtime.
Define the Job Specification
The sample analytic takes a single input parameter for a single timeseries tag, multiplies it by 10, and writes the values back to a single output tag with the same timestamp. Refer to the analytic code itself to see how these datasources are handled, and note in particular that the keys used to access the SQLContext for each dataframe correspond to those provided in the JSON job specification (java_job.json.
{
"inputs": [
{
"name": "timeseriesReadDS",
"id": "fileInputDS-1234567",
"provider": "predix.apm.ts",
"parameters": {
"url": "${apm.timeseriesUrl}",
"Content-Type": "application/json",
"tenant": "${tenant}",
"Authorization": "Bearer ${apm.token}",
"filter": "operation=raw&startTime=1495217624000&sampleCount=30&useOnlyGoodData=true&tagList=OO_Tag_Temperature_ID26",
"InputMappings": {
"OO-CA-SIMUL-ASSET-ID1": {
"input1": "OO_Tag_Temperature_ID26"
}
}
}
}
],
"outputs": [
{
"name": "timeseriesWriteDS",
"id": "fileOutputDS-1234567",
"provider": "predix.apm.ts",
"parameters": {
"url": "${apm.timeseriesUrl}",
"Content-Type": "application/json",
"tenant": "${tenant}",
"Authorization": "Bearer ${apm.token}",
"OutputMappings": {
"OO-CA-SIMUL-ASSET-ID1": {
"output1": "OO_Tag_Temperature_ID28"
}
}
},
"streaming": false,
"schema": null
}
],
"language": "JAVA"
}
Upload and Deploy a Simple Java Analytic
In this procedure you will find steps to upload and deploy a sample Java analytic on Spark runtime.
Before You Begin
- You have uploaded the required assets, tags, and time series data required for this analytic.
- You are signed in to an OPM tenant with access privileges to manage analytics on Spark runtime.
- You have downloaded the copy of sparkAnalytic_simple_java_version_1_0_0.zip.
- You have downloaded and extracted the contents of sparkAnalytic_simple_java_io-definitions.zip locally.