KPI Development (Python)
Download and Explore Sample Python Spark Analytic
As an analytic developer, you can use the analytic sample package to generate a starter Python package with some sample code. The following information will help you understand and develop your computation logic in Python.
Before You Begin
You have downloaded and set up your OPM Sandbox Docker or environment to enable the development of Python analytics.
Procedure
Results
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql import functions
from pyspark.sql.types import *
from pyspark.sql.dataframe import *
from pyspark.sql.functions import *
import sys
import time
from datetime import datetime
class SimpleJob():
def run_job(self, spark_session, runtime_config, job_json, context_dict, logger):
try:
spark = spark_session
logger.info("Starting analytic...")
configContext = context_dict["configDS"]
tsContext = context_dict["timeseriesReadDS"]
configDF = configContext.sql("select * from " + context_dict["configDS"].table_name)
configDF.createOrReplaceTempView("configDF")
inputTag = configContext.sql("select MappingValue from configDF where MappingType='InputMappings'").head()[0]
timeseriesDF = tsContext.sql("select * from " + context_dict["timeseriesReadDS"].table_name + " where tag='" + inputTag + "'")
timeseriesDF.createOrReplaceTempView("timeseriesDF")
outputTag = configContext.sql("select MappingValue from configDF where MappingType='OutputMappings'").head()[0]
resultDF = tsContext.sql("select '" + outputTag + "' as tag, timestamp as timestamp, 10*value as value, quality as quality from timeseriesDF")
logger.info("Returning result...")
result = {"timeseriesWriteDS" : resultDF}
return result
except Exception as e:
logger.info("Error: " + str(e))
exc_tb = sys.exc_info()[2]
logger.info("Line number: " + str(exc_tb.tb_lineno))
What To Do Next
Understand the Computation Logic in Python
This is a reference document for developers who want to develop their KPI computation logic in Python for Spark runtime.
After you download the python sample code, you can start developing the custom logic for your KPI.
Find the complete implementation example code at oo-analytics/Andromeda_Simple_KPIs/PythonTimes10/simple_py/__init__.py.
Define the input data sources
OPM data providers construct the incoming data into RDD. You can manipulate these collections of data tables delete using filters and queries.
# timeseriesReadDS is dataset with timeseries read data source
tsContext = context_dict["timeseriesReadDS"]
# configDS has the user provided configurations and tag mappings
configContext = context_dict["configDS"]
In the above code, timeseriesReadDS
is the input data source provider name used in the KPI definition.
Fetch the input data using SparkSQL functions
OPM data providers construct the incoming and outgoing data into data frames. These data frames enable you to use simple query statements in SparkSQL to get the required data for your computation. You can also perform simple join operations to filter for specific data to run the computation. The computed KPIs and tag data are produced as data frames to write back into the corresponding output data providers.
sqlContext.sql(<select_statement>.cache()
instead of sqlContext.createDataFrame(rdd,schema)
for setting a data frame.configDF = configContext.sql("select * from " + context_dict["configDS"].table_name)
inputTag = configContext.sql("select MappingValue from configDF where MappingType='InputMappings'").head()[0]
timeseriesDF = tsContext.sql("select * from " + context_dict["timeseriesReadDS"].table_name + " where tag='" + inputTag + "'")
The example data frame outputs are shown below:
-+----------------------------------------------------+-----------------------------------------------+-------+
|sourceKey |name |type |tagSourceKey |tagName |tagDescription |tagUnit|
+---------------------+----------------+------------------------------------------------+-----------------------------+-------------------------------+-------------------+-------+
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID153 |OO_Tag_Pressure_ID153 |description|atm |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID57 |OO_Tag_Pressure_ID57 |description|atm |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID173 |OO_Tag_Pressure_ID173 |description|atm |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO-CA-SIMUL-ASSET-ID0 |OO-CA-SIMUL-ASSET-ID0 |description|null |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID152 |OO_Tag_Pressure_ID152 |description|atm |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID165 |OO_Tag_Pressure_ID165 |description|atm |
|OO-CA-SIMUL-ASSET-ID0|OO CA Asset One |/assetTypes/ebb91686-67f5-3612-b9c5-6381e99837e6|OO_Tag_Pressure_ID144 |OO_Tag_Pressure_ID144 |description|atm |
+---------------------+----------------+------------------------------------------------+-----------------------------+-------------------------------+-------------------+-------+
|tag |timestamp |value |quality|
+--------------------+-------------+-------+-------+
|OO_Tag_Pressure_ID84|1478764800000|300.555|3 |
|OO_Tag_Pressure_ID84|1478854800000|301.666|3 |
|OO_Tag_Pressure_ID84|1478948400000|401.001|3 |
|OO_Tag_Pressure_ID83|1478764800000|200.555|3 |
|OO_Tag_Pressure_ID83|1478854800000|201.666|3 |
|OO_Tag_Pressure_ID83|1478948400000|201.001|3 |
+--------------------+-------------+-------+-------+
This data frame contains the input, constants, and output parameters mapped to the asset tags.
|MappingType |MappingKey |MappingValue |AssetSourceKey |
+--------------+------------+-------------------------------------------------+---------------------+
|InputMappings |temperature |OO_Tag_Pressure_ID84 |OO-CA-SIMUL-ASSET-ID0|
|InputMappings |pressure |OO_Tag_Pressure_ID83 |OO-CA-SIMUL-ASSET-ID0|
|Constants |threshold |100 |null |
|OutputMappings|availability|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_availability|OO-CA-SIMUL-ASSET-ID0|
|OutputMappings|power |OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_power |OO-CA-SIMUL-ASSET-ID0|
|OutputMappings|heatrate |OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_heatrate |OO-CA-SIMUL-ASSET-ID0|
+--------------+------------+-------------------------------------------------+---------------------+
Save Filtered Data to the Data Frame
You can perform simple join operations to combine the contents of two or more data sets into a single data frame. Save the select query to a string (for example, String analyticSql). In the example snippet below, we are joining the contents of two datasets assetDS and timeseriesDS based on the columns tagSourceKey in the asset data set and tag in the time-series data set.
String analyticSql = "select a.sourceKey, s.tag, s.timestamp, "
+ "s.value as tagvalues "
+ "from `" + assetDS.getName()
+ "` a join `" + timeseriesDS.getName()
+ "` s on a.tagSourceKey=s.tag";
DataFrame joinedDf = timeseriesDS.getContext().sql(analyticSql).cache();
Logging Information
Use the LOGGER.info to log any information related to the KPI computation such as input, output and joined data frame schema and values, constant definitions and values, and log section labels as text. This would help troubleshoot errors after the KPI job execution.
logger.info("Starting analytic...")
logger.info("Returning result...")
except Exception as e:
logger.info("Error: " + str(e))
exc_tb = sys.exc_info()[2]
logger.info("Line number: " + str(exc_tb.tb_lineno))
dataframe.show()
if LOGGER.isDebugEnabled():
hrdmm.show(100, False)
Writing Time-series Results to the Data Frame
After performing your computation, you would write the results to the output data source provider from your result data frame.
outputTag = configContext.sql("select MappingValue from configDF where MappingType='OutputMappings'").head()[0]
resultDF = tsContext.sql("select '" + outputTag + "' as tag, timestamp as timestamp, 10*value as value, quality as quality from timeseriesDF")
result = {"timeseriesWriteDS" : resultDF}
return result
except Exception as e:
logger.info("Error: " + str(e))
exc_tb = sys.exc_info()[2]
logger.info("Line number: " + str(exc_tb.tb_lineno))
|tag |value |quality|timestamp |
+-------------------------------------------------+------------------+-------+-------------+
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_heatrate |4349.443333333333 |3 |1480637708419|
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_power |2174.7216666666664|3 |1480637708419|
|OO-CA-SIMUL-ASSET-ID0.Kpi100_Deploy1_availability|0.0 |3 |1480637708419|
+-------------------------------------------------+------------------+-------+-------------+
Alarm Data
Example code snippet on writing alarm/alert data into dataframe. Date for the alert is hardcoded in the analytic.
alarm_schema = StructType([StructField("taskId", StringType(), True), StructField("type", StringType(), True),
StructField("name", StringType(), True),
StructField("severity", IntegerType(), True),
StructField("sourceKey", StringType(), True),
StructField("eventStart", LongType(), True),
StructField("storageReceiveTime", LongType(), True),
StructField("associatedMonitoredEntitySourceKey", StringType(), True)])
alarm_data = [(
'opm_arf_alarm_1', 'createAlarm', 'Analytics', 4, 'opm-alarms-'+str(uuid.uuid4()), 1505857250000, 1505857250000,
'OO-CA-SIMUL-ASSET-ID1')]
alarm_rdd = spark_session.sparkContext.parallelize(alarm_data)
alarm_df = spark_session.createDataFrame(data=alarm_rdd, schema=alarm_schema)
result = {"timeseriesWriteDS" : resultDF, "alarmsWrite":alarm_df}
Package the Analytic Computation Logic in Python
Add the analytic computation logic developed in Python and package it.
Before You Begin
- Name the main package such that it matches the analytic definition name. For example, if your setup.py has the name
heatrate_gap_kpi
make sure that your analytic saved to the catalog reflects the same name. - Name the classifiers with the convention
[python package name].[class name]
. - Add the KPI computation logic to the main class in _init_.py.
- Write the
run_job()
in the main class of the _init_.py of the main package.
About This Task
To package a Python analytic, you should have something like the following directory structure:
oopackage.sh
setup.py
simple_py
__init__.py