KPI Development (Java)

Analytic Development in Java

Use the Operations Performance Management analytic management service to build your computation logic and artifacts. It includes runtime services to deploy to the analytic framework.

As a analytic developer, you can create analytic definitions to read data coming from a specific input data source and write the analytic calculations into a persistent data store. The default persistent data store for the current analytic management services is the APM time series.

The computation artifact includes implementation logic for computing the key performance indicators using data from defined input data sources.

Download and Explore Sample Java Spark Analytic

As an analytic developer, you can use the analytics sample package to explore and understand a starter Java package containing sample analytic code. The following information will help you understand and develop your computation logic in Java.

Before You Begin

You have downloaded and set up your OPM Sandbox Docker or environment to enable the development of the analytic in Java.

Note: The OPM Sandbox Docker folder on Box is currently available to internal users only. If you do not have view or download access the folder and would like to request access, contact the OPM development support team.

Procedure

  1. Download the oo-analytics repository locally.
    If you do not have access to the repo, contact the OPM development support team.
  2. Extract the contents of the repo locally.
  3. Navigate to the Andromeda_Simple_KPIs folder in the extracted.
  4. In the JavaTimes10 folder, open the SampleAnalytic.java file to view and understand the sample computation logic.
    The example code sample is shown below.
    package com.ge.opm.kpi;
    
    import java.util.Collections;
    import java.util.HashMap;
    import java.util.Map;
    import com.ge.arf.rt.IAnalytics;
    import com.ge.arf.rt.IRuntimeDataset;
    import com.ge.arf.rt.config.HasConfig;
    
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.SQLContext;
    
    /**
     *
     * KPI Analytic Template to run in the OO KPI Framework
     *
     */
     public class SampleAnalytic extends HasConfig implements IAnalytics<SQLContext, Dataset> {
    
         @Override
         public Map<String, Dataset> runAnalysis(Map<String, IRuntimeDataset<SQLContext>> inputDatasets)
         {
           Map outputs = new HashMap();
           IRuntimeDataset tsInput = (IRuntimeDataset)inputDatasets.get("timeseriesReadDS");
           IRuntimeDataset configDS = (IRuntimeDataset)inputDatasets.get("configDS");
           String outputTag = ((SQLContext)configDS.getContext()).sql("select MappingValue from `" + configDS.getName()
                                        + "` where MappingType='OutputMappings'").head().getString(0);
           String inputTag = ((SQLContext)configDS.getContext()).sql("select MappingValue from `" + configDS.getName()
                                        + "` where MappingType='InputMappings'").head().getString(0);
    
           Dataset tsDF = ((SQLContext)tsInput.getContext()).sql("select * from `" + tsInput.getName() + "` where tag='" + inputTag + "'");
    
           tsDF.show(false);
           tsDF.printSchema();
           tsDF.registerTempTable("tsFiltered");
           Dataset outputDf = null;
           String analyticSQL = null;
    
           analyticSQL = "select '" + outputTag + "' as tag, timestamp, value*10 as value, quality from `tsFiltered`";
           outputDf = ((SQLContext)tsInput.getContext()).sql(analyticSQL).cache();
           outputs.put("timeseriesWriteDS", outputDf);
           return outputs;
         }
     }
  5. Navigate to and open the file oo-analytics/Andromeda_Simple_KPIs/JavaTimes10/src/main/resources/META-INF/services/com.ge.arf.rt.IAnalytics. Make sure the value matches the java implementation name, for example, com.ge.opm.kpi.SampleAnalytic

What To Do Next

Use the packaged computation logic to upload to the Analytics Catalog.

Understand the Computation Logic in Java

This is a reference document for developers who want to understand the expected structure of the computation logic in Java.

After you download the Java sample code, you can start developing the custom logic for the your KPI.

Find the complete implementation example code at Andromeda_Simple_KPIs/JavaTimes10/src/main/java/com/ge/opm/kpi/SampleAnalytic.java

OPM Data Providers

OPM data providers construct the incoming data into collections of data tables that you can manipulate using filters and queries.

The following example code snippet shows retrieving the asset and time-series data sources.
IRuntimeDataset<SQLContext> assetDS = (IRuntimeDataset)inputDatasets.get("assetReadDS");
IRuntimeDataset<SQLContext> timeseriesDS = inputDatasets.get("timeseriesReadDS");

In the above code assetReadDS and timeseriesReadDS are the input data source provider names used in the KPI definition.

Getting specific input data into data frames

OPM data providers use data frames to construct the incoming and outgoing data. These data frames enable you to use simple query statements to get the required data for your computation. You can also perform simple join operations to filter for specific data to run the computation. The computed KPIs and tag data are produced as data frames to write back into the corresponding output data providers.

The following example code snippet shows setting your IRuntimeDataset objects.
 IRuntimeDataset tsInput = (IRuntimeDataset)inputDatasets.get("timeseriesReadDS");
       IRuntimeDataset configDS = (IRuntimeDataset)inputDatasets.get("configDS");
Time-Series Input Data Frame Example

|tag                 |timestamp    |value  |quality|
+--------------------+-------------+-------+-------+
|OO_Tag_Pressure_ID84|1478764800000|300.555|3      |
|OO_Tag_Pressure_ID84|1478854800000|301.666|3      |
|OO_Tag_Pressure_ID84|1478948400000|401.001|3      |
|OO_Tag_Pressure_ID83|1478764800000|200.555|3      |
|OO_Tag_Pressure_ID83|1478854800000|201.666|3      |
|OO_Tag_Pressure_ID83|1478948400000|201.001|3      |
+--------------------+-------------+-------+-------+
Configuration Data Frame Example

This data frame contains the input, constants, and output parameters mapped to the asset tags.


|MappingType   |MappingKey  |MappingValue                                     |AssetSourceKey       |
+--------------+------------+-------------------------------------------------+---------------------+
|InputMappings |temperature |OO_Tag_Pressure_ID84                             |OO-CA-SIMUL-ASSET-ID0|
|InputMappings |pressure    |OO_Tag_Pressure_ID83                             |OO-CA-SIMUL-ASSET-ID0|
|Constants     |threshold   |100                                              |null                 |
|OutputMappings|availability|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_availability|OO-CA-SIMUL-ASSET-ID0|
|OutputMappings|power       |OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_power       |OO-CA-SIMUL-ASSET-ID0|
|OutputMappings|heatrate    |OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_heatrate    |OO-CA-SIMUL-ASSET-ID0|
+--------------+------------+-------------------------------------------------+---------------------+
Example Data Frame with Tag Attributes

This data frame contains the tag data with attributes.


+--------------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+
|           sourceKey|                name|                type|        tagSourceKey|             tagName|tagUnit|          attributes|       tagAttributes|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+
|Analytics-CA-ASSE...|Analytics CA Asse...|/assetTypes/01b15...|Analytics-CA-ASSE...|Analytics Asset T...|    psi|[[Boolean,Wrapped...|[[Boolean,Wrapped...|
|Analytics-CA-ASSE...|Analytics CA Asse...|/assetTypes/01b15...|Analytics-CA-ASSE...|Analytics Asset T...|    psi|[[Boolean,Wrapped...|[[Boolean,Wrapped...|
+--------------------+--------------------+--------------------+--------------------+--------------------+-------+--------------------+--------------------+

Saving Filtered Data to the Data Frame

You can perform simple join operations to combine the contents of two or more data sets into a single data frame. Save the select query to a string (for example, String outputTag. In the example snippet below, we are show contents of two datasets: timeseriesDS and ConfiDS.

String outputTag = ((SQLContext)configDS.getContext()).sql("select MappingValue from `" + configDS.getName()
                                    + "` where MappingType='OutputMappings'").head().getString(0);
       String inputTag = ((SQLContext)configDS.getContext()).sql("select MappingValue from `" + configDS.getName()
                                    + "` where MappingType='InputMappings'").head().getString(0);

       Dataset tsDF = ((SQLContext)tsInput.getContext()).sql("select * from `" + tsInput.getName() + "` where tag='" + inputTag + "'");

Querying with Attributes

You can perform simple join operations to combine the contents of two or more data sets into a single data frame. You can save the select query to a string (for example, String assetFilterWithAttributes. In the example snippet below, we are show contents of two datasets: timeseriesDS and ConfiDS.


String assetFilterWithAttributes = "select '" + outputTag + "' as tag, s.timestamp as timestamp, "
            + "s.value*2 as value, "
            + "s.quality as quality "
            + "from `" + assetDS.getName() 
            + "` a join `" + timeseriesDS.getName() 
            + "` s on a.tagSourceKey = s.tag "
            + "where s.tag='" + inputTag + "'"
            + "and a.attributes."+attributeName+".value > 2000000000";

Logging Information

Use LOGGER.info to log any information related to the KPI computation such as input, output and joined data frame schema and values, constant definitions and values, and log section labels as text. This would help troubleshoot errors after the KPI job execution.

LOGGER.info("     HEATRATE DATAFRAME @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
	LOGGER.info(heatRateDf.schema().treeString());
	LOGGER.info(heatRateDf.showString(100, false));
		
	LOGGER.info("     POWER DATAFRAME @@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@");
	LOGGER.info(powerDf.schema().treeString());
	LOGGER.info(powerDf.showString(100, false));

Writing Time-series Results to the Data Frame

After performing your computation, you would write the results to the output data source provider from your result data frame.

Dataset outputDf = null;
       String analyticSQL = null;

       analyticSQL = "select '" + outputTag + "' as tag, timestamp, value*10 as value, quality from `tsFiltered`";
       outputDf = ((SQLContext)tsInput.getContext()).sql(analyticSQL).cache();
       outputs.put("timeseriesWriteDS", outputDf);
       return outputs;
Time-Series Output Data Frame Example

|tag                                              |value             |quality|timestamp    |
+-------------------------------------------------+------------------+-------+-------------+
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_heatrate    |4349.443333333333 |3      |1480637708419|
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_power       |2174.7216666666664|3      |1480637708419|
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_availability|0.0               |3      |1480637708419|
+-------------------------------------------------+------------------+-------+-------------+

Package the Java Analytics

Package the Java Analytics to upload it to the analytics catalog for deployment to the Spark runtime.

POM Dependencies

The following dependencies must be listed in the POM.
<dependencies>
		<dependency>
			<groupId>com.ge.arf</groupId>
			<artifactId>arf-runtime-api</artifactId>
			<version>1.0.0-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-sql_2.10</artifactId>
			<version>${spark.sql.version}</version>
		</dependency>
        <dependency>

Directory Structure

To package the analytic, you should have something like the following directory structure:

pom.xml
src
    main
        java
            com
                ge
                    opm
                        kpi
                            SampleAnalytic.java
        resources
            META-INF
                services
                    com.ge.arf.rt.IAnaltyics

Package

The com.ge.arf.rt.IAnaltyics file should simply contain the fully qualified class name of your analytic. For example, in the sample analytic, it would be as follows:
com.ge.opm.kpi.SampleAnalytic

This allows the service loader in the runtime framework to obtain your analytic implementation, as long as it implements the IAnalytics interface.

Use the mvn clean install --settings mvnsettings.xml command to build your analytic .jar file. Make sure you point to the settings.xml in the sample analytic folder. The location of the settings.xml is the same as the POM file location.

Zip the .jar file to complete the packaging process.

Simple Java Analytic

This reference document describes how to develop, build, and deploy a simple Java Analytic on Spark runtime.

Define the Job Specification

The sample analytic takes a single input parameter for a single timeseries tag, multiplies it by 10, and writes the values back to a single output tag with the same timestamp. Refer to the analytic code itself to see how these datasources are handled, and note in particular that the keys used to access the SQLContext for each dataframe correspond to those provided in the JSON job specification (java_job.json.

Note: Use OPM Analytic Services directly to run this analytic on your data, you will need to change some of the parameters in this example JSON, specifically the values for filter, InputMappings, and OutputMappings.
{
  "inputs": [
    {
      "name": "timeseriesReadDS",
      "id": "fileInputDS-1234567",
      "provider": "predix.apm.ts",
      "parameters": {
        "url": "${apm.timeseriesUrl}",
        "Content-Type": "application/json",
        "tenant": "${tenant}",
        "Authorization": "Bearer ${apm.token}",
        "filter": "operation=raw&startTime=1495217624000&sampleCount=30&useOnlyGoodData=true&tagList=OO_Tag_Temperature_ID26",
        "InputMappings": {
          "OO-CA-SIMUL-ASSET-ID1": {
            "input1": "OO_Tag_Temperature_ID26"
          }
        }
      }
    }
  ],
  "outputs": [
    {
      "name": "timeseriesWriteDS",
      "id": "fileOutputDS-1234567",
      "provider": "predix.apm.ts",
      "parameters": {
        "url": "${apm.timeseriesUrl}",
        "Content-Type": "application/json",
        "tenant": "${tenant}",
        "Authorization": "Bearer ${apm.token}",
        "OutputMappings": {
          "OO-CA-SIMUL-ASSET-ID1": {
            "output1": "OO_Tag_Temperature_ID28"
          }
        }
      },
      "streaming": false,
      "schema": null
    }
  ],
  "language": "JAVA"
}

Upload and Deploy a Simple Java Analytic

In this procedure you will find steps to upload and deploy a sample Java analytic on Spark runtime.

Before You Begin
This procedure assumes that the following prerequisite tasks have been completed.
Procedure
  1. Upload the analytics template to the catalog.
    Configure the following information for the analytic.
    OptionDescription
    NameSpark Simple Java Times 10
    OwnerYour Name
    Analytic TypeJava
    Type Version1.8
    Analytic FilesparkAnalytic_simple_java_version_1_0_0.zip
    Analytic Version1.0.0
    Primary CategoryForecasting
  2. In the Analytic Template, configure the input definition, constant, and output definition through .CSV upload.
  3. Add and configure the deployment as follows:
    1. Enter deployment_forecast_train in the Deployment Name box, and then select Submit.
    2. In the 1. Asset Selection step, select the asset defined in the analytic, and then select Save.
    3. Select Next to access the 2. I/O Mapping step.
    4. Select Tag, and then select Add Tags....
    5. In the tag browser, search for the tag in the analytic. As represented in the sample example, search for OO_TAG_Temperature_ID12. After the search displays the tag, drag and drop it onto the input for mapping it.
    6. Select Save and Next to save the I/O Mapping configuration.
    7. In the 3. Schedule step, leave the selection at Only Once for Define how often a new run will be executed option.
    8. Select Time Span for past 30 days.
    9. Leave the Sample Interval at the default value of 1 Minute.
    10. Select Save, and then select Deploy.
    The deployment is saved to the Spark runtime. After successful deployment, the status updates to Run Once.
What To Do Next
Visualize the analytic output tags using the Analysis app.