KPI Development (Python)

Download and Explore Sample Python Spark Analytic

As an analytic developer, you can use the analytic sample package to generate a starter Python package with some sample code. The following information will help you understand and develop your computation logic in Python.

Before You Begin

You have downloaded and set up your OPM Sandbox Docker or environment to enable the development of Python analytics.

Note: The OPM Sandbox Docker folder on Box is currently available to internal users only. If you do not have view or download access the folder and would like to request access, contact the OPM development support team.

Procedure

  1. Download the oo-analytics repository locally.
    If you do not have access to the repo, contact the OPM development support team.
  2. Extract the contents of the repo locally.
  3. Navigate to the Andromeda_Simple_KPIs folder in the extracted.
  4. In the PythonTimes10 folder, find and open the _init_.py file to view and understand the sample computation logic.

Results

The example code sample is shown below.
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.dataframe import *
from pyspark.sql.functions import *
import sys
import time
from datetime import datetime

class SimpleJob():

    def run_job(self, spark_session, runtime_config, job_json, context_dict, logger):
        try:
            spark = spark_session

            logger.info("Starting analytic...")
            configContext = context_dict["configDS"]
            tsContext = context_dict["timeseriesReadDS"]

            configDF = configContext.sql("select * from " + context_dict["configDS"].table_name)
            configDF.createOrReplaceTempView("configDF")
            inputTag = configContext.sql("select MappingValue from configDF where MappingType='InputMappings'").head()[0]
            timeseriesDF = tsContext.sql("select * from " + context_dict["timeseriesReadDS"].table_name + " where tag='" + inputTag + "'")
            timeseriesDF.createOrReplaceTempView("timeseriesDF")

            outputTag = configContext.sql("select MappingValue from configDF where MappingType='OutputMappings'").head()[0]
            resultDF = tsContext.sql("select '" + outputTag + "' as tag, timestamp as timestamp, 10*value as value, quality as quality from timeseriesDF")

            logger.info("Returning result...")
            result = {"timeseriesWriteDS" : resultDF}
            return result
        except Exception as e:
            logger.info("Error: " + str(e))
            exc_tb = sys.exc_info()[2]
            logger.info("Line number: " + str(exc_tb.tb_lineno))

What To Do Next

You can now add computation logic to the project and package it.

Understand the Computation Logic in Python

This is a reference document for developers who want to develop their KPI computation logic in Python for Spark runtime.

After you download the python sample code, you can start developing the custom logic for your KPI.

Find the complete implementation example code at oo-analytics/Andromeda_Simple_KPIs/PythonTimes10/simple_py/__init__.py.

Define the input data sources

OPM data providers construct the incoming data into RDD. You can manipulate these collections of data tables delete using filters and queries.

The following example code snippet shows you use the data from the timeseriesReadDS and configDS.
# timeseriesReadDS is dataset with timeseries read data source
tsContext = context_dict["timeseriesReadDS"]
# configDS has the user provided configurations and tag mappings
configContext = context_dict["configDS"]

In the above code, timeseriesReadDS is the input data source provider name used in the KPI definition.

Fetch the input data using SparkSQL functions

OPM data providers construct the incoming and outgoing data into data frames. These data frames enable you to use simple query statements in SparkSQL to get the required data for your computation. You can also perform simple join operations to filter for specific data to run the computation. The computed KPIs and tag data are produced as data frames to write back into the corresponding output data providers.

Note: You cannot use Pyspark dataframe functions; use SparkSQL instead. For example, use sqlContext.sql(<select_statement>.cache() instead of sqlContext.createDataFrame(rdd,schema) for setting a data frame.
The following example code snippet shows using SparkSQL functions.
configDF = configContext.sql("select * from " + context_dict["configDS"].table_name)
         inputTag = configContext.sql("select MappingValue from configDF where MappingType='InputMappings'").head()[0]
         timeseriesDF = tsContext.sql("select * from " + context_dict["timeseriesReadDS"].table_name + " where tag='" + inputTag + "'")
         

The example data frame outputs are shown below:

Asset Data Frame Example
-+----------------------------------------------------+-----------------------------------------------+-------+
|sourceKey            |name            |type                                            |tagSourceKey                 |tagName                        |tagDescription     |tagUnit|
+---------------------+----------------+------------------------------------------------+-----------------------------+-------------------------------+-------------------+-------+
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID153        |OO_Tag_Pressure_ID153          |description|atm    |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID57         |OO_Tag_Pressure_ID57           |description|atm    |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID173        |OO_Tag_Pressure_ID173          |description|atm    |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO-CA-SIMUL-ASSET-ID0        |OO-CA-SIMUL-ASSET-ID0          |description|null   |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID152        |OO_Tag_Pressure_ID152          |description|atm    |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID165        |OO_Tag_Pressure_ID165          |description|atm    |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID144        |OO_Tag_Pressure_ID144          |description|atm    |
+---------------------+----------------+------------------------------------------------+-----------------------------+-------------------------------+-------------------+-------+
Time-Series Input Data Frame Example

|tag                 |timestamp    |value  |quality|
+--------------------+-------------+-------+-------+
|OO_Tag_Pressure_ID84|1478764800000|300.555|3      |
|OO_Tag_Pressure_ID84|1478854800000|301.666|3      |
|OO_Tag_Pressure_ID84|1478948400000|401.001|3      |
|OO_Tag_Pressure_ID83|1478764800000|200.555|3      |
|OO_Tag_Pressure_ID83|1478854800000|201.666|3      |
|OO_Tag_Pressure_ID83|1478948400000|201.001|3      |
+--------------------+-------------+-------+-------+
Configuration Data Frame Example

This data frame contains the input, constants, and output parameters mapped to the asset tags.


|MappingType   |MappingKey  |MappingValue                                     |AssetSourceKey       |
+--------------+------------+-------------------------------------------------+---------------------+
|InputMappings |temperature |OO_Tag_Pressure_ID84                             |OO-CA-SIMUL-ASSET-ID0|
|InputMappings |pressure    |OO_Tag_Pressure_ID83                             |OO-CA-SIMUL-ASSET-ID0|
|Constants     |threshold   |100                                              |null                 |
|OutputMappings|availability|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_availability|OO-CA-SIMUL-ASSET-ID0|
|OutputMappings|power       |OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_power       |OO-CA-SIMUL-ASSET-ID0|
|OutputMappings|heatrate    |OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_heatrate    |OO-CA-SIMUL-ASSET-ID0|
+--------------+------------+-------------------------------------------------+---------------------+

Save Filtered Data to the Data Frame

You can perform simple join operations to combine the contents of two or more data sets into a single data frame. Save the select query to a string (for example, String analyticSql). In the example snippet below, we are joining the contents of two datasets assetDS and timeseriesDS based on the columns tagSourceKey in the asset data set and tag in the time-series data set.

String analyticSql = "select a.sourceKey, s.tag, s.timestamp, "
				+ "s.value as tagvalues "
				+ "from `" + assetDS.getName()
				+ "` a join `" + timeseriesDS.getName()
				+ "` s on a.tagSourceKey=s.tag";

	DataFrame joinedDf = timeseriesDS.getContext().sql(analyticSql).cache();

Logging Information

Use the LOGGER.info to log any information related to the KPI computation such as input, output and joined data frame schema and values, constant definitions and values, and log section labels as text. This would help troubleshoot errors after the KPI job execution.

logger.info("Starting analytic...")
logger.info("Returning result...")
except Exception as e:
            logger.info("Error: " + str(e))
            exc_tb = sys.exc_info()[2]
            logger.info("Line number: " + str(exc_tb.tb_lineno))
Make sure you enable logger debugging before showing dataframes to restrict expensive calls to dataframe.show()
if LOGGER.isDebugEnabled():            
        hrdmm.show(100, False)

Writing Time-series Results to the Data Frame

After performing your computation, you would write the results to the output data source provider from your result data frame.

outputTag = configContext.sql("select MappingValue from configDF where MappingType='OutputMappings'").head()[0]
         resultDF = tsContext.sql("select '" + outputTag + "' as tag, timestamp as timestamp, 10*value as value, quality as quality from timeseriesDF")	
         result = {"timeseriesWriteDS" : resultDF}
            return result
        except Exception as e:
            logger.info("Error: " + str(e))
            exc_tb = sys.exc_info()[2]
            logger.info("Line number: " + str(exc_tb.tb_lineno))
Time-Series Output Data Frame Example

|tag                                              |value             |quality|timestamp    |
+-------------------------------------------------+------------------+-------+-------------+
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_heatrate    |4349.443333333333 |3      |1480637708419|
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_power       |2174.7216666666664|3      |1480637708419|
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_availability|0.0               |3      |1480637708419|
+-------------------------------------------------+------------------+-------+-------------+

Alarm Data

Example code snippet on writing alarm/alert data into dataframe. Date for the alert is hardcoded in the analytic.

alarm_schema = StructType([StructField("taskId", StringType(), True), StructField("type", StringType(), True),
                           StructField("name", StringType(), True),
                           StructField("severity", IntegerType(), True),
                           StructField("sourceKey", StringType(), True),
                           StructField("eventStart", LongType(), True),
                           StructField("storageReceiveTime", LongType(), True),
                           StructField("associatedMonitoredEntitySourceKey", StringType(), True)])

            alarm_data = [(
              'opm_arf_alarm_1', 'createAlarm', 'Analytics', 4, 'opm-alarms-'+str(uuid.uuid4()), 1505857250000, 1505857250000,
              'OO-CA-SIMUL-ASSET-ID1')]


            alarm_rdd = spark_session.sparkContext.parallelize(alarm_data)
            alarm_df = spark_session.createDataFrame(data=alarm_rdd, schema=alarm_schema)
            result = {"timeseriesWriteDS" : resultDF, "alarmsWrite":alarm_df}

Package the Analytic Computation Logic in Python

Add the analytic computation logic developed in Python and package it.

Before You Begin

Complete the following tasks:
  • Name the main package such that it matches the analytic definition name. For example, if your setup.py has the name heatrate_gap_kpi make sure that your analytic saved to the catalog reflects the same name.
  • Name the classifiers with the convention [python package name].[class name].
  • Add the KPI computation logic to the main class in _init_.py.
  • Write the run_job() in the main class of the _init_.py of the main package.

About This Task

After packaging the job implementation, you must add this to the respective analytic definition for deployment.

To package a Python analytic, you should have something like the following directory structure:

oopackage.sh
setup.py
simple_py
    __init__.py

Procedure

  1. Update the __init__.py file to contain your analytic class with logic in the function run_job, and update the setup.py to the following format:
    from setuptools import setup, find_packages
    import os
    
    setup(
            name='simple_py',
            version='1.0.0',
            description='OPM Python Simple (x10) Example',
            packages=find_packages(),
            install_requires=['pyhocon', 'py4j'],
        	classifiers=[
        	 "MyAnalytic.ClassName"
        	],
    )
  2. Access the command line and navigate to the root folder for the Python module.
  3. Run the following Python command to build and package the artifact.
    ./oopackage.sh <name of the target zip file>#./oopackage.sh heatratekpi.zip

Results

The ZIP file is created with the Python egg package.