Data Sources
About Analytic Data Handling for Orchestration
The analytic template defines the format of its input and output JSON structure. The port-to-field map tells the runtime engine how to get the values to insert into the analytic's input JSON structure and where to write the values from the analytic's output JSON structure.
The port-to-field map simply maps input/output entries, from the analytic's template to data sources and sinks. The mapping uses fieldId
to represent a value in the data sources and sinks.
How Predix Time Series Data is Handled
The fieldId
is a string that is mapped to a Predix Time Series tag id for a given asset id. At runtime, the orchestration engine acquires the Predix Times Series Tag id for the fieldId
and will read/write the values as follows:
- If the orchestration is run for the asset group or a single asset, the processing will look in the port-to-field map for
tagNameQuery
. IftagNameQuery
exists, it will use this query to translate thefieldId
to a time series tag id. IftagNameQuery
doesn't exist, the processing will use the tenant's default tag query. - If the orchestration is run without an asset identifier, the request must contain a map of
fieldId
to Predix Time Series tag ids. The runtime will use the Time Series tag ids from that map.
The integration with Predix Time Series provides the conversion of the Predix Time Series data values to Predix Analytics data structures supported by the analytic template. For more information about data type conversion, see Data Conversion with Predix Time Series.
How Persistent and Temporary Data is Handled
The port-to-field map also defines whether the runtime should read/write a value to a persistent store (for example, Predix Time Series or an external data source) or keep it as temporary data held in-memory to pass from one analytic to another in the orchestration. The temporary data held in-memory is available for the life of the orchestration. Each non-constant port map in the port-to-field map has a dataSourceId
field to identify the data source to be passed. When the dataSourceId
is 'Temporary'
, the runtime will cache the value for use by analytics in the orchestration. All ports with identical fieldId
values across the orchestration's port-to-field maps will resolve to a common cached value. When the dataSourceId
is a persistent store, the runtime will read/write the value to the persistent storage.
About Asset Model Data Handling for Orchestration
Use a custom tag query to override the default tag query value in the port-to-field map. The tagNameQuery
field is available for this purpose, and should be used for all input and output port definitions that requires tag data from Predix Asset service.
The tagNameQuery
in the port-to-field map can be used to find the Predix Time Series tag id corresponding to the fieldId
and the asset id. If the data source for the tagNameQuery
is Predix Asset service, the tagNameQuery
is a GEL query. The tagNameQuery
can have two parameters: ${ASSET_ID}
and ${FIELD_ID}
. During the orchestration run, ${ASSET_ID}
and ${FIELD_ID}
will be replaced and the resulting query will be issued to the Predix Asset service to retrieve the Predix Time Series tag id. The query response should have a JSON array with one element and the value of the element should be the time series tag id.
The following is a sample tagNameQuery
in the port-to-field map.
meters?filter=uri=${ASSET_ID}<jetEngineUri<compressorUri:name=${FIELD_ID}&fields=source
In this example, the asset id is /jet_engines/1
and the fieldId
is low pressure sensor
. The following is a corresponding GEL query.
meters?filter=uri=/jet_engines/1<jetEngineUri<compressorUri:name=low pressure sensor&fields=source
The following is a sample response where SLEOX
is the time series tag id.
[
{
"source": "SLEOX"
}
]
About Analytics Using an External Data Source
Support for Predix Time Series as a persistent data source is integrated (built-in) with Predix Analytics Services framework. However, you can also run an orchestration request for an analytic that ingests data from an external data source.
- afs-data-sources.html#concept_0fa2cc54-b511-4524-8aa3-85da48a5acc9__section_0e95d283-c11e-450c-b317-7e1b24591f04
- afs-data-sources.html#concept_0fa2cc54-b511-4524-8aa3-85da48a5acc9__section_5321bfdc-b5a2-42bb-b36c-f4c56be77e0b
- afs-data-sources.html#concept_0fa2cc54-b511-4524-8aa3-85da48a5acc9__section_12ea4b26-19b3-4b1c-aa63-f26d5d9b8273
- afs-data-sources.html#concept_0fa2cc54-b511-4524-8aa3-85da48a5acc9__section_641a6b73-48fc-46bc-b430-149b149723ab
- afs-data-sources.html#concept_0fa2cc54-b511-4524-8aa3-85da48a5acc9__section_89ded98a-11b1-4e77-b9e2-47a7e6475ae1
External Data Source Use Cases
The Analytics Framework also supports using analytic data from an external source. For example, data stored in PostgreSQL, RDBMS, Predix Asset, Predix Blobstore, among others, can be used to read/write data to an analytic. This requires that you build a Custom Data Connector service to read data from and write data to the external source. This data connector service must be reachable by the Predix cloud using HTTPS. The framework will call your data connector service to read the analytic input data and write the analytic output data to the data source.
The following scenarios are examples of when you will want to build your own Custom Data Connector service.
- Sensor data is stored in PostgreSQL DB and this data is used by the analytics.
- Image data is stored in Predix Blobstore and sensor data is stored in Predix Time Series. Both the image and time series data will be passed to the analytics.
- An analytic is programmed to self-learn and to produce model data every time it runs. The model data is stored in Blobstore and will be used to execute the next analytic in the orchestration.
- Legacy asset data is stored in PostgreSQL DB and more recent asset data is stored in Predix Asset. You would like to use data from both sources to pass to an analytic.
Developing Your Custom Data Connector Service
You can implement more than one Custom Data Connector service to work with different data sources. This service must meet the following requirements.
- The service must implement the following three Analytics Runtime APIs.
- HEALTHCHECK (/api/v1/analytics/customdata/healthcheck) — used to check the health of your Custom Data Connector service before executing an orchestration. Successful execution of the API must return a Status 200 response.
- READ (/api/v1/analytics/customdata/read) — used to read data passed to the analytic (input data) from the external data source. Successful execution of the API must return a Status 200 response.
- WRITE (/api/v1/analytics/customdata/write) — used to write the data produced by an analytic (output data) to the external data source. Successful execution of the API must return a Status 201 response.
- The REST end points are secured using custom UAA-based authentication. If custom credentials were not provided during runtime instance configuration, your data service API will be invoked using the same authorization token used during orchestration execution. Otherwise, the API will be invoked using a fresh token fetched using the provided custom credentials.
- The service must be deployed, run, and managed in your Predix org and space.
- The service must be reachable by the Analytics Framework via HTTPS.
A Java-based reference implementation with PostgreSQL DB support is available. Use Reference Data Connector for PostgreSQL as a starting point to develop a Custom Data Connector service that uses a PostgreSQL data source.
Implementing Your Custom Data Connector Service
The Analytics Framework executes the orchestration and its orchestration steps, using the port-to-field map and your Custom Data Connector service. It reads data just before executing an analytic and writes analytic produced output data back to your Custom Data Connector service.
- Develop your Custom Data Connector service.
- Deploy it to Predix cloud.
- Configure a port-to-field map to include the required input/output fields with a unique
dataSourceId
. - Execute an orchestration with your data source definitions (
dataSourceId
,baseUri
,apiVersion
). ThebaseUri, apiVersion
determines the unique endpoint for the READ, WRITE, HEALTHCHECK APIs you implement.
Update Runtime with External Data Source Client Credentials
During orchestration execution, the credentials for the external data source must be passed to the runtime. Using an external data source requires you update the runtime configuration with these credentials, by providing the corresponding extenalService
configuration parameters. If they are not provided, access to the external data source will default to the OAuth2 token used in the orchestration request.
For more information about how to do this, see afs-get-started.html#task_694d2085-e2e4-4097-a829-50eb9b42d43c.
Limitations
Note the following limitations:
- The READ data and WRITE data operations implemented by your Custom Data Connector service should complete and return within one minute. Note: Any HTTP/S call to a REST end point will timeout after one minute in Predix cloud.
- The Analytics Framework will invoke your service using the same authorization token used to run the orchestration execution. The Custom Data Connector service must be able to authenticate the same token.
- The Analytics Framework will not invoke your Custom Data Connector service with custom HTTP header details.
Custom Data Connector Reference
You will build a Custom Data Connector service when analytics in your orchestration rely upon an external data source. This data connector service must be reachable by the Predix cloud using HTTPS.
A Java-based reference implementation with PostgreSQL DB support is available. Use Reference Data Connector for PostgreSQL as a starting point to develop a Custom Data Connector service that uses a PostgreSQL data source.
- Data Format
- Type: DataRequest
- Type: DataResponse
- Type: Field
- Type: OrchestrationExecutionContext
- Type: ErrorResponse
- Sample AnalyticReadDataRequest
- Sample AnalyticReadDataResponse
- Sample AnalyticDataReadResponse with Error
- Sample AnalyticDataWriteRequest
- Sample AnalyticDataWriteResponse
- Sample AnalyticDataWriteResponse with Error
Data Format
This is the overall data format:
[ [ <EpochInMs>, <Measurement Value>, <Quality> ] ]
Where:
- <EpochInMs> — Epoch Time in milliseconds
- <Measurement Value> — Measurement data value
- <Quality> — Quality (Optional)
See the following table for the expected format for all the data types.
Data Type | Expected Data Format | Description |
---|---|---|
DOUBLE | [ [ null, 10.0 ] ] | Double is represented in standard time series format. Note: If the data type is DOUBLE, the system will read the measurement data value from 1st index . |
DOUBLE_ARRAY | [ [ null, 10.0 ] , [ null, 11.0] ] | |
TIMESERIES_ARRAY | [ [1435776300000, 2, 1], [1435776400000, null], [1435776500000, 10.5, 3] ] |
Payloads are expected as follows.
API | Request Payload Type | Response Payload Type |
---|---|---|
/api/v1/analytics/customdata/read | AnalyticReadDataRequest extends DataRequest | AnalyticReadDataResponse extends DataResponse |
/api/v1/analytics/customdata/write | AnalyticWriteDataRequest extends DataRequest | AnalyticWriteDataResponse extends DataResponse |
/api/v1/analytics/customdata/healthcheck |
Type: DataRequest
This is the overall structure of a DataRequest
object.
{
"field": List<Field>,
"customAttributes": Object,
"systemAttributes": Object,
"orchestrationExecutionContext": OrchestrationExecutionContext,
"dataSourceId": String
}
See the following table for a description of the elements in a DataRequest
.
Attribute | Description |
---|---|
Field | List of Fields |
CustomAttributes | User-defined JSON object. |
SystemAttributes | Map of analytics system generated Key/Value(s). Reserved for future use. |
OrchestrationExecutionContext | Orchestration execution context with system generated IDs to track the request within analytics services. |
DataSourceId | An external data connector service identifier, added for monitoring purpose. |
Type: DataResponse
This is the overall structure of a DataResponse
object.
{
"field": List<Field>,
"orchestrationExecutionContext": OrchestrationExecutionContext,
"errorResponse": ErrorResponse,
"dataSourceId": String
}
See the following table for a description of the elements in a DataResponse
object.
Attribute | Description |
---|---|
Field | List of Fields |
OrchestrationExecutionContext | Orchestration execution context with system generated IDs to track the request within analytics services. |
DataSourceId | An external data connector service identifier, added for monitoring purpose. |
ErrorResponse | Error message details |
Type: Field
This is the overall structure of a Field
object.
{
"fieldId": String,
"fullyQualifiedPortName": String,
"dataType": String,
"engUnit": String,
"data": Object,
"queryCriteria": Object,
"errorResponse": ErrorResponse
}
See the following table for a description of the elements in a Field
object.
Attribute | Description |
---|---|
FieldId | Field identifier defined in port-to-field map. |
FullyQualifiedPortName | Unique port name to identify a port in the port-to-field map. |
DataType | Field data type. The following is the list of supported analytic data types.
|
EngUnit | Engineering Unit as defined in port-to-field map. |
QueryCriteria | A custom object to define the criteria to query the requested field as defined in port-to-field map. |
Data | Data value.
|
ErrorResponse | If there is an error in processing the request, this attribute is updated with error details. |
Type: OrchestrationExecutionContext
This is the overall structure of an OrchestrationExecutionContext
object.
{
"assetId": String,
"orchestrationConfigurationID": String,
"orchestrationExecutionRequestID": String,
"analyticId": String,
"analyticName": String,
"analyticVersion": String,
"analyticExecutionRequestID": String
}
See the following table for a description of the elements in an OrchestrationExecutionContext
object.
Attribute | Description |
---|---|
assetId | The asset identifier. |
orchestrationConfigurationID | The orchestration configuration identifier. |
orchestrationExecutionRequestID | The orchestration execution request identifier. |
analyticID | The analytic catalog entry identifier. |
analyticName | The analytic name. |
analyticVersion | The analytic version. |
analyticExecutionRequestID | The analytic execution request identifier. |
Type: ErrorResponse
This is the overall structure of an ErrorResponse
object.
{
"code": String,
"severity": String,
"detail": String,
"message": String
}
See the following table for a description of the elements in an ErrorResponse
object.
Attribute | Description |
---|---|
code | The error code. |
severity | The error severity |
message | A short error message. |
detail | A detailed error message with stack trace, etc. |
Sample AnalyticReadDataRequest
{
"field": [
{
"fieldId": "KW",
"fullyQualifiedPortName": "data.time_series.numberArray1",
"dataType": "DOUBLE_ARRAY",
"engUnit": "kw",
"data": [],
"queryCriteria": {
"columns": [
"recorded_at",
"data_value"
],
"table": "sensor_data",
"conditions": [
{
"key": "asset_id",
"value": "${ASSET_ID}",
"valueType": "string",
"relation": " = "
},
{
"key": "recorded_at",
"value": "current_timestamp",
"valueType": "none",
"relation": " < "
},
{
"key": "field_id",
"value": "KW",
"valueType": "string",
"relation": " = "
}
]
},
"errorResponse": null
},
{
"fieldId": "vibration",
"fullyQualifiedPortName": "data.time_series.numberArray2",
"dataType": "DOUBLE_ARRAY",
"engUnit": "hertz",
"data": [],
"queryCriteria": {
"columns": [
"recorded_at",
"data_value"
],
"table": "sensor_data",
"conditions": [
{
"key": "asset_id",
"value": "${ASSET_ID}",
"valueType": "string",
"relation": " = "
},
{
"key": "recorded_at",
"value": "current_timestamp",
"valueType": "none",
"relation": " < "
},
{
"key": "field_id",
"value": "vibration",
"valueType": "string",
"relation": " = "
}
]
},
"errorResponse": null
}
],
"customAttributes": {
"IS_GENERIC_SCHEMA": "TRUE"
},
"systemAttributes": null,
"orchestrationExecutionContext": {
"assetId": "/assets/32-90effe42-eb21-4611-b734-83f707d89d7a",
"orchestrationConfigurationID": "e06982f5-f446-46d2-bc21-9fdf64c111ab",
"orchestrationExecutionRequestID": "8f990980-b208-4418-99b1-f46d26cd4530",
"analyticId": "b9e999a1-8931-459d-8d34-2f76c2b9fd95",
"analyticName": null,
"analyticVersion": null,
"analyticExecutionRequestID": null
},
"dataSourceId": "Postgres Reference External Data Connector"
}
Sample AnalyticReadDataResponse
{
"field": [
{
"fieldId": "KW",
"fullyQualifiedPortName": "data.time_series.numberArray1",
"dataType": "DOUBLE_ARRAY",
"engUnit": "kw",
"data": [
[
1473366334967,
144.33
],
[
1473366334968,
244.33
],
[
1473366334969,
344.33
]
],
"queryCriteria": {
"columns": [
"recorded_at",
"data_value"
],
"table": "sensor_data",
"conditions": [
{
"key": "asset_id",
"value": "${ASSET_ID}",
"valueType": "string",
"relation": " = "
},
{
"key": "recorded_at",
"value": "current_timestamp",
"valueType": "none",
"relation": " < "
},
{
"key": "field_id",
"value": "KW",
"valueType": "string",
"relation": " = "
}
]
}
},
{
"fieldId": "vibration",
"fullyQualifiedPortName": "data.time_series.numberArray2",
"dataType": "DOUBLE_ARRAY",
"engUnit": "hertz",
"data": [
[
1473366334967,
1244.33
],
[
1473366334968,
2244.33
],
[
1473366334969,
3244.33
]
],
"queryCriteria": {
"columns": [
"recorded_at",
"data_value"
],
"table": "sensor_data",
"conditions": [
{
"key": "asset_id",
"value": "${ASSET_ID}",
"valueType": "string",
"relation": " = "
},
{
"key": "recorded_at",
"value": "current_timestamp",
"valueType": "none",
"relation": " < "
},
{
"key": "field_id",
"value": "vibration",
"valueType": "string",
"relation": " = "
}
]
}
}
],
"orchestrationExecutionContext": {
"assetId": "/assets/32-90effe42-eb21-4611-b734-83f707d89d7a",
"orchestrationConfigurationID": "e06982f5-f446-46d2-bc21-9fdf64c111ab",
"orchestrationExecutionRequestID": "8f990980-b208-4418-99b1-f46d26cd4530",
"analyticId": "b9e999a1-8931-459d-8d34-2f76c2b9fd95"
}
}
Sample AnalyticDataReadResponse with Error
{
"field": [
{
"fieldId": "KW",
"fullyQualifiedPortName": "data.time_series.numberArray1",
"dataType": "DOUBLE_ARRAY",
"engUnit": "kw",
"data": [],
"queryCriteria": {
"columns": [
"recorded_at",
"value"
],
"table": "sensor_data",
"conditions": [
{
"key": "asset_id",
"value": "${ASSET_ID}",
"valueType": "string",
"relation": " = "
},
{
"key": "recorded_at",
"value": "current_timestamp",
"valueType": "none",
"relation": " < "
},
{
"key": "field_id",
"value": "KW",
"valueType": "string",
"relation": " = "
}
]
},
"errorResponse": {
"code": "FIELD_EXCEPTION",
"message": "Unable to retrieve field KW. StatementCallback; bad SQL grammar [select recorded_at, value from sensor_data where asset_id = '/assets/32-3c686c25-2f57-4f13-8cf0-04bc6bb26866' and recorded_at < current_timestamp and field_id = 'KW']; nested exception is org.postgresql.util.PSQLException: ERROR: column \"value\" does not exist\n Position: 21",
"parameters": []
}
},
{
"fieldId": "vibration",
"fullyQualifiedPortName": "data.time_series.numberArray2",
"dataType": "DOUBLE_ARRAY",
"engUnit": "hertz",
"data": [
[
1473366334967,
1244.33
],
[
1473366334968,
2244.33
],
[
1473366334969,
3244.33
]
],
"queryCriteria": {
"columns": [
"recorded_at",
"data_value"
],
"table": "sensor_data",
"conditions": [
{
"key": "asset_id",
"value": "${ASSET_ID}",
"valueType": "string",
"relation": " = "
},
{
"key": "recorded_at",
"value": "current_timestamp",
"valueType": "none",
"relation": " < "
},
{
"key": "field_id",
"value": "vibration",
"valueType": "string",
"relation": " = "
}
]
}
}
],
"orchestrationExecutionContext": {
"assetId": "/assets/32-3c686c25-2f57-4f13-8cf0-04bc6bb26866",
"orchestrationConfigurationID": "6dfae6f5-8be1-4f90-b567-0970a34f2c53",
"orchestrationExecutionRequestID": "accab63f-da95-4ac4-8120-fcfca109011d",
"analyticId": "b9e999a1-8931-459d-8d34-2f76c2b9fd95"
}
}
Sample AnalyticDataWriteRequest
{
"field": [
{
"fieldId": "bearing temperature",
"fullyQualifiedPortName": "data.time_series.sum",
"dataType": "DOUBLE_ARRAY",
"engUnit": "Celsius",
"data": [
[
1473366334967,
1388.6599999999999,
"2"
],
[
1473366334968,
2488.66,
"2"
],
[
1473366334969,
3588.66,
"2"
]
],
"queryCriteria": {
"columns": [
"field_id",
"recorded_at",
"data_value"
],
"table": "sensor_data"
},
"errorResponse": null
}
],
"customAttributes": {
"IS_GENERIC_SCHEMA": "TRUE"
},
"systemAttributes": null,
"orchestrationExecutionContext": {
"assetId": "/assets/32-90effe42-eb21-4611-b734-83f707d89d7a",
"orchestrationConfigurationID": "e06982f5-f446-46d2-bc21-9fdf64c111ab",
"orchestrationExecutionRequestID": "8f990980-b208-4418-99b1-f46d26cd4530",
"analyticId": null,
"analyticName": null,
"analyticVersion": null,
"analyticExecutionRequestID": null
},
"dataSourceId": "Postgres Reference External Data Connector"
}
Sample AnalyticDataWriteResponse
{
"field": [
{
"fieldId": "bearing temperature",
"fullyQualifiedPortName": "data.time_series.sum",
"dataType": "DOUBLE_ARRAY",
"engUnit": "Celsius",
"data": [
[
1473366334967,
1388.6599999999999,
"2"
],
[
1473366334968,
2488.66,
"2"
],
[
1473366334969,
3588.66,
"2"
]
],
"queryCriteria": {
"columns": [
"field_id",
"recorded_at",
"data_value"
],
"table": "sensor_data"
}
}
],
"orchestrationExecutionContext": {
"assetId": "/assets/32-90effe42-eb21-4611-b734-83f707d89d7a",
"orchestrationConfigurationID": "e06982f5-f446-46d2-bc21-9fdf64c111ab",
"orchestrationExecutionRequestID": "8f990980-b208-4418-99b1-f46d26cd4530"
}
}
Sample AnalyticDataWriteResponse with Error
"field": [
{
"fieldId": "bearing temperature",
"fullyQualifiedPortName": "data.time_series.sum",
"dataType": "DOUBLE_ARRAY",
"engUnit": "Celsius",
"data": [
[
1473366334967,
1388.6599999999999,
"2"
],
[
1473366334968,
2488.66,
"2"
],
[
1473366334969,
3588.66,
"2"
]
],
"queryCriteria": {
"columns": [
"field_id",
"recorded_at",
"value"
],
"table": "sensor_data"
},
"errorResponse": {
"code": "FIELD_EXCEPTION",
"message": "Unable to update field bearing temperature. StatementCallback; bad SQL grammar [ insert into sensor_data(asset_id, field_id, recorded_at, value) values( '/assets/37-a57b74c1-28e1-44a5-b59f-7456411a7ab5', 'bearing temperature', to_timestamp(1473366334967::double precision/1000), 1388.6599999999999 ); insert into sensor_data(asset_id, field_id, recorded_at, value) values( '/assets/37-a57b74c1-28e1-44a5-b59f-7456411a7ab5', 'bearing temperature', to_timestamp(1473366334968::double precision/1000), 2488.66 ); insert into sensor_data(asset_id, field_id, recorded_at, value) values( '/assets/37-a57b74c1-28e1-44a5-b59f-7456411a7ab5', 'bearing temperature', to_timestamp(1473366334969::double precision/1000), 3588.66 )]; nested exception is java.sql.BatchUpdateException: Batch entry 0 insert into sensor_data(asset_id, field_id, recorded_at, value) values( '/assets/37-a57b74c1-28e1-44a5-b59f-7456411a7ab5', 'bearing temperature', to_timestamp(1473366334967::double precision/1000), 1388.6599999999999 ) was aborted. Call getNextException to see the cause.",
"parameters": []
}
}
],
"orchestrationExecutionContext": {
"assetId": "/assets/37-a57b74c1-28e1-44a5-b59f-7456411a7ab5",
"orchestrationConfigurationID": "eecc3b4a-274a-4fba-804a-c2d47f85e9f8",
"orchestrationExecutionRequestID": "a418c284-774a-4080-8c59-b2afd5d0cc27"
}
}