Data Sources

About Analytic Data Handling for Orchestration

The analytic template defines the format of its input and output JSON structure. The port-to-field map tells the runtime engine how to get the values to insert into the analytic's input JSON structure and where to write the values from the analytic's output JSON structure.

The port-to-field map simply maps input/output entries, from the analytic's template to data sources and sinks. The mapping uses fieldId to represent a value in the data sources and sinks.

How Predix Time Series Data is Handled

The fieldId is a string that is mapped to a Predix Time Series tag id for a given asset id. At runtime, the orchestration engine acquires the Predix Times Series Tag id for the fieldId and will read/write the values as follows:

  • If the orchestration is run for the asset group or a single asset, the processing will look in the port-to-field map for tagNameQuery. If tagNameQuery exists, it will use this query to translate the fieldId to a time series tag id. If tagNameQuery doesn't exist, the processing will use the tenant's default tag query.
  • If the orchestration is run without an asset identifier, the request must contain a map of fieldId to Predix Time Series tag ids. The runtime will use the Time Series tag ids from that map.

The integration with Predix Time Series provides the conversion of the Predix Time Series data values to Predix Analytics data structures supported by the analytic template. For more information about data type conversion, see Data Conversion with Predix Time Series.

How Persistent and Temporary Data is Handled

The port-to-field map also defines whether the runtime should read/write a value to a persistent store (for example, Predix Time Series or an external data source) or keep it as temporary data held in-memory to pass from one analytic to another in the orchestration. The temporary data held in-memory is available for the life of the orchestration. Each non-constant port map in the port-to-field map has a dataSourceId field to identify the data source to be passed. When the dataSourceId is 'Temporary', the runtime will cache the value for use by analytics in the orchestration. All ports with identical fieldId values across the orchestration's port-to-field maps will resolve to a common cached value. When the dataSourceId is a persistent store, the runtime will read/write the value to the persistent storage.

About Asset Model Data Handling for Orchestration

Use a custom tag query to override the default tag query value in the port-to-field map. The tagNameQuery field is available for this purpose, and should be used for all input and output port definitions that requires tag data from Predix Asset service.

The tagNameQuery in the port-to-field map can be used to find the Predix Time Series tag id corresponding to the fieldId and the asset id. If the data source for the tagNameQuery is Predix Asset service, the tagNameQuery is a GEL query. The tagNameQuery can have two parameters: ${ASSET_ID} and ${FIELD_ID}. During the orchestration run, ${ASSET_ID} and ${FIELD_ID} will be replaced and the resulting query will be issued to the Predix Asset service to retrieve the Predix Time Series tag id. The query response should have a JSON array with one element and the value of the element should be the time series tag id.

The following is a sample tagNameQuery in the port-to-field map.

meters?filter=uri=${ASSET_ID}<jetEngineUri<compressorUri:name=${FIELD_ID}&fields=source

In this example, the asset id is /jet_engines/1 and the fieldId is low pressure sensor. The following is a corresponding GEL query.

meters?filter=uri=/jet_engines/1<jetEngineUri<compressorUri:name=low pressure sensor&fields=source

The following is a sample response where SLEOX is the time series tag id.

[
  {
    "source": "SLEOX"
  }
]

About Analytics Using an External Data Source

Support for Predix Time Series as a persistent data source is integrated (built-in) with Predix Analytics Services framework. However, you can also run an orchestration request for an analytic that ingests data from an external data source.

External Data Source Use Cases

The Analytics Framework also supports using analytic data from an external source. For example, data stored in PostgreSQL, RDBMS, Predix Asset, Predix Blobstore, among others, can be used to read/write data to an analytic. This requires that you build a Custom Data Connector service to read data from and write data to the external source. This data connector service must be reachable by the Predix cloud using HTTPS. The framework will call your data connector service to read the analytic input data and write the analytic output data to the data source.

The following scenarios are examples of when you will want to build your own Custom Data Connector service.

  • Sensor data is stored in PostgreSQL DB and this data is used by the analytics.
  • Image data is stored in Predix Blobstore and sensor data is stored in Predix Time Series. Both the image and time series data will be passed to the analytics.
  • An analytic is programmed to self-learn and to produce model data every time it runs. The model data is stored in Blobstore and will be used to execute the next analytic in the orchestration.
  • Legacy asset data is stored in PostgreSQL DB and more recent asset data is stored in Predix Asset. You would like to use data from both sources to pass to an analytic.

Developing Your Custom Data Connector Service

You can implement more than one Custom Data Connector service to work with different data sources. This service must meet the following requirements.

  • The service must implement the following three Analytics Runtime APIs.
    • HEALTHCHECK (/api/v1/analytics/customdata/healthcheck) — used to check the health of your Custom Data Connector service before executing an orchestration. Successful execution of the API must return a Status 200 response.
    • READ (/api/v1/analytics/customdata/read) — used to read data passed to the analytic (input data) from the external data source. Successful execution of the API must return a Status 200 response.
    • WRITE (/api/v1/analytics/customdata/write) — used to write the data produced by an analytic (output data) to the external data source. Successful execution of the API must return a Status 201 response.
  • The REST end points are secured using custom UAA-based authentication. If custom credentials were not provided during runtime instance configuration, your data service API will be invoked using the same authorization token used during orchestration execution. Otherwise, the API will be invoked using a fresh token fetched using the provided custom credentials.
  • The service must be deployed, run, and managed in your Predix org and space.
  • The service must be reachable by the Analytics Framework via HTTPS.

A Java-based reference implementation with PostgreSQL DB support is available. Use Reference Data Connector for PostgreSQL as a starting point to develop a Custom Data Connector service that uses a PostgreSQL data source.

Implementing Your Custom Data Connector Service

The Analytics Framework executes the orchestration and its orchestration steps, using the port-to-field map and your Custom Data Connector service. It reads data just before executing an analytic and writes analytic produced output data back to your Custom Data Connector service.

The high level process to implement your service is as follows.
  • Develop your Custom Data Connector service.
  • Deploy it to Predix cloud.
  • Configure a port-to-field map to include the required input/output fields with a unique dataSourceId.
  • Execute an orchestration with your data source definitions (dataSourceId, baseUri, apiVersion). The baseUri, apiVersion determines the unique endpoint for the READ, WRITE, HEALTHCHECK APIs you implement.

Update Runtime with External Data Source Client Credentials

During orchestration execution, the credentials for the external data source must be passed to the runtime. Using an external data source requires you update the runtime configuration with these credentials, by providing the corresponding extenalService configuration parameters. If they are not provided, access to the external data source will default to the OAuth2 token used in the orchestration request.

For more information about how to do this, see afs-get-started.html#task_694d2085-e2e4-4097-a829-50eb9b42d43c.

Limitations

Note the following limitations:

  • The READ data and WRITE data operations implemented by your Custom Data Connector service should complete and return within one minute.
    Note: Any HTTP/S call to a REST end point will timeout after one minute in Predix cloud.
  • The Analytics Framework will invoke your service using the same authorization token used to run the orchestration execution. The Custom Data Connector service must be able to authenticate the same token.
  • The Analytics Framework will not invoke your Custom Data Connector service with custom HTTP header details.

Custom Data Connector Reference

You will build a Custom Data Connector service when analytics in your orchestration rely upon an external data source. This data connector service must be reachable by the Predix cloud using HTTPS.

A Java-based reference implementation with PostgreSQL DB support is available. Use Reference Data Connector for PostgreSQL as a starting point to develop a Custom Data Connector service that uses a PostgreSQL data source.

Data Format

This is the overall data format:

 [ [ <EpochInMs>, <Measurement Value>, <Quality> ] ] 

Where:

  • <EpochInMs> — Epoch Time in milliseconds
  • <Measurement Value> — Measurement data value
  • <Quality> — Quality (Optional)

See the following table for the expected format for all the data types.

Data TypeExpected Data FormatDescription
DOUBLE[ [ null, 10.0 ] ]Double is represented in standard time series format.
Note: If the data type is DOUBLE, the system will read the measurement data value from 1st index.
DOUBLE_ARRAY[ [ null, 10.0 ] , [ null, 11.0] ]
TIMESERIES_ARRAY[ [1435776300000, 2, 1], [1435776400000, null], [1435776500000, 10.5, 3] ]

Payloads are expected as follows.

APIRequest Payload TypeResponse Payload Type
/api/v1/analytics/customdata/readAnalyticReadDataRequest extends DataRequestAnalyticReadDataResponse extends DataResponse
/api/v1/analytics/customdata/writeAnalyticWriteDataRequest extends DataRequestAnalyticWriteDataResponse extends DataResponse
/api/v1/analytics/customdata/healthcheck

Type: DataRequest

This is the overall structure of a DataRequest object.

{
  "field": List<Field>,
  "customAttributes": Object,
  "systemAttributes": Object,
  "orchestrationExecutionContext": OrchestrationExecutionContext,
  "dataSourceId": String
}

See the following table for a description of the elements in a DataRequest.

AttributeDescription
FieldList of Fields
CustomAttributesUser-defined JSON object.
SystemAttributesMap of analytics system generated Key/Value(s). Reserved for future use.
OrchestrationExecutionContextOrchestration execution context with system generated IDs to track the request within analytics services.
DataSourceIdAn external data connector service identifier, added for monitoring purpose.

Type: DataResponse

This is the overall structure of a DataResponse object.

{
  "field": List<Field>,
  "orchestrationExecutionContext": OrchestrationExecutionContext,
  "errorResponse": ErrorResponse,
  "dataSourceId": String
}

See the following table for a description of the elements in a DataResponse object.

AttributeDescription
FieldList of Fields
OrchestrationExecutionContextOrchestration execution context with system generated IDs to track the request within analytics services.
DataSourceIdAn external data connector service identifier, added for monitoring purpose.
ErrorResponseError message details

Type: Field

This is the overall structure of a Field object.

{
  "fieldId": String,
  "fullyQualifiedPortName": String,
  "dataType": String,
  "engUnit": String,
  "data": Object,
  "queryCriteria": Object,
  "errorResponse": ErrorResponse
}

See the following table for a description of the elements in a Field object.

AttributeDescription
FieldIdField identifier defined in port-to-field map.
FullyQualifiedPortNameUnique port name to identify a port in the port-to-field map.
DataTypeField data type. The following is the list of supported analytic data types.
  • LONG
  • INTEGER
  • DOUBLE
  • FLOAT
  • STRING
  • BOOLEAN
  • LONG_ARRAY
  • INTEGER_ARRAY
  • DOUBLE_ARRAY
  • FLOAT_ARRAY
  • STRING_ARRAY
  • BOOLEAN_ARRAY
  • TIMESERIES_ARRAY
EngUnitEngineering Unit as defined in port-to-field map.
QueryCriteriaA custom object to define the criteria to query the requested field as defined in port-to-field map.
DataData value.
  • For read request, provide the data for this field in AnalyticDataReadResponse.
  • For write request, analytics adds the analytic generated output data in this attribute.
ErrorResponseIf there is an error in processing the request, this attribute is updated with error details.

Type: OrchestrationExecutionContext

This is the overall structure of an OrchestrationExecutionContext object.

{
  "assetId": String,
  "orchestrationConfigurationID": String,
  "orchestrationExecutionRequestID": String,
  "analyticId": String,
  "analyticName": String,
  "analyticVersion": String,
  "analyticExecutionRequestID": String
}

See the following table for a description of the elements in an OrchestrationExecutionContext object.

AttributeDescription
assetIdThe asset identifier.
orchestrationConfigurationIDThe orchestration configuration identifier.
orchestrationExecutionRequestIDThe orchestration execution request identifier.
analyticIDThe analytic catalog entry identifier.
analyticNameThe analytic name.
analyticVersionThe analytic version.
analyticExecutionRequestIDThe analytic execution request identifier.

Type: ErrorResponse

This is the overall structure of an ErrorResponse object.

{
  "code": String,
  "severity": String,
  "detail": String,
  "message": String
}

See the following table for a description of the elements in an ErrorResponse object.

AttributeDescription
codeThe error code.
severityThe error severity
messageA short error message.
detailA detailed error message with stack trace, etc.

Sample AnalyticReadDataRequest

{
    "field": [
        {
            "fieldId": "KW",
            "fullyQualifiedPortName": "data.time_series.numberArray1",
            "dataType": "DOUBLE_ARRAY",
            "engUnit": "kw",
            "data": [],
            "queryCriteria": {
                "columns": [
                    "recorded_at",
                    "data_value"
                ],
                "table": "sensor_data",
                "conditions": [
                    {
                        "key": "asset_id",
                        "value": "${ASSET_ID}",
                        "valueType": "string",
                        "relation": " = "
                    },
                    {
                        "key": "recorded_at",
                        "value": "current_timestamp",
                        "valueType": "none",
                        "relation": " < "
                    },
                    {
                        "key": "field_id",
                        "value": "KW",
                        "valueType": "string",
                        "relation": " = "
                    }
                ]
            },
            "errorResponse": null
        },
        {
            "fieldId": "vibration",
            "fullyQualifiedPortName": "data.time_series.numberArray2",
            "dataType": "DOUBLE_ARRAY",
            "engUnit": "hertz",
            "data": [],
            "queryCriteria": {
                "columns": [
                    "recorded_at",
                    "data_value"
                ],
                "table": "sensor_data",
                "conditions": [
                    {
                        "key": "asset_id",
                        "value": "${ASSET_ID}",
                        "valueType": "string",
                        "relation": " = "
                    },
                    {
                        "key": "recorded_at",
                        "value": "current_timestamp",
                        "valueType": "none",
                        "relation": " < "
                    },
                    {
                        "key": "field_id",
                        "value": "vibration",
                        "valueType": "string",
                        "relation": " = "
                    }
                ]
            },
            "errorResponse": null
        }
    ],
    "customAttributes": {
        "IS_GENERIC_SCHEMA": "TRUE"
    },
    "systemAttributes": null,
    "orchestrationExecutionContext": {
        "assetId": "/assets/32-90effe42-eb21-4611-b734-83f707d89d7a",
        "orchestrationConfigurationID": "e06982f5-f446-46d2-bc21-9fdf64c111ab",
        "orchestrationExecutionRequestID": "8f990980-b208-4418-99b1-f46d26cd4530",
        "analyticId": "b9e999a1-8931-459d-8d34-2f76c2b9fd95",
        "analyticName": null,
        "analyticVersion": null,
        "analyticExecutionRequestID": null
    },
    "dataSourceId": "Postgres Reference External Data Connector"
}

Sample AnalyticReadDataResponse

{
    "field": [
        {
            "fieldId": "KW",
            "fullyQualifiedPortName": "data.time_series.numberArray1",
            "dataType": "DOUBLE_ARRAY",
            "engUnit": "kw",
            "data": [
                [
                    1473366334967,
                    144.33
                ],
                [
                    1473366334968,
                    244.33
                ],
                [
                    1473366334969,
                    344.33
                ]
            ],
            "queryCriteria": {
                "columns": [
                    "recorded_at",
                    "data_value"
                ],
                "table": "sensor_data",
                "conditions": [
                    {
                        "key": "asset_id",
                        "value": "${ASSET_ID}",
                        "valueType": "string",
                        "relation": " = "
                    },
                    {
                        "key": "recorded_at",
                        "value": "current_timestamp",
                        "valueType": "none",
                        "relation": " < "
                    },
                    {
                        "key": "field_id",
                        "value": "KW",
                        "valueType": "string",
                        "relation": " = "
                    }
                ]
            }
        },
        {
            "fieldId": "vibration",
            "fullyQualifiedPortName": "data.time_series.numberArray2",
            "dataType": "DOUBLE_ARRAY",
            "engUnit": "hertz",
            "data": [
                [
                    1473366334967,
                    1244.33
                ],
                [
                    1473366334968,
                    2244.33
                ],
                [
                    1473366334969,
                    3244.33
                ]
            ],
            "queryCriteria": {
                "columns": [
                    "recorded_at",
                    "data_value"
                ],
                "table": "sensor_data",
                "conditions": [
                    {
                        "key": "asset_id",
                        "value": "${ASSET_ID}",
                        "valueType": "string",
                        "relation": " = "
                    },
                    {
                        "key": "recorded_at",
                        "value": "current_timestamp",
                        "valueType": "none",
                        "relation": " < "
                    },
                    {
                        "key": "field_id",
                        "value": "vibration",
                        "valueType": "string",
                        "relation": " = "
                    }
                ]
            }
        }
    ],
    "orchestrationExecutionContext": {
        "assetId": "/assets/32-90effe42-eb21-4611-b734-83f707d89d7a",
        "orchestrationConfigurationID": "e06982f5-f446-46d2-bc21-9fdf64c111ab",
        "orchestrationExecutionRequestID": "8f990980-b208-4418-99b1-f46d26cd4530",
        "analyticId": "b9e999a1-8931-459d-8d34-2f76c2b9fd95"
    }
}

Sample AnalyticDataReadResponse with Error

{
    "field": [
        {
            "fieldId": "KW",
            "fullyQualifiedPortName": "data.time_series.numberArray1",
            "dataType": "DOUBLE_ARRAY",
            "engUnit": "kw",
            "data": [],
            "queryCriteria": {
                "columns": [
                    "recorded_at",
                    "value"
                ],
                "table": "sensor_data",
                "conditions": [
                    {
                        "key": "asset_id",
                        "value": "${ASSET_ID}",
                        "valueType": "string",
                        "relation": " = "
                    },
                    {
                        "key": "recorded_at",
                        "value": "current_timestamp",
                        "valueType": "none",
                        "relation": " < "
                    },
                    {
                        "key": "field_id",
                        "value": "KW",
                        "valueType": "string",
                        "relation": " = "
                    }
                ]
            },
            "errorResponse": {
                "code": "FIELD_EXCEPTION",
                "message": "Unable to retrieve field KW. StatementCallback; bad SQL grammar [select recorded_at, value from sensor_data where  asset_id  =  '/assets/32-3c686c25-2f57-4f13-8cf0-04bc6bb26866' and  recorded_at  <  current_timestamp and  field_id  =  'KW']; nested exception is org.postgresql.util.PSQLException: ERROR: column \"value\" does not exist\n  Position: 21",
                "parameters": []
            }
        },
        {
            "fieldId": "vibration",
            "fullyQualifiedPortName": "data.time_series.numberArray2",
            "dataType": "DOUBLE_ARRAY",
            "engUnit": "hertz",
            "data": [
                [
                    1473366334967,
                    1244.33
                ],
                [
                    1473366334968,
                    2244.33
                ],
                [
                    1473366334969,
                    3244.33
                ]
            ],
            "queryCriteria": {
                "columns": [
                    "recorded_at",
                    "data_value"
                ],
                "table": "sensor_data",
                "conditions": [
                    {
                        "key": "asset_id",
                        "value": "${ASSET_ID}",
                        "valueType": "string",
                        "relation": " = "
                    },
                    {
                        "key": "recorded_at",
                        "value": "current_timestamp",
                        "valueType": "none",
                        "relation": " < "
                    },
                    {
                        "key": "field_id",
                        "value": "vibration",
                        "valueType": "string",
                        "relation": " = "
                    }
                ]
            }
        }
    ],
    "orchestrationExecutionContext": {
        "assetId": "/assets/32-3c686c25-2f57-4f13-8cf0-04bc6bb26866",
        "orchestrationConfigurationID": "6dfae6f5-8be1-4f90-b567-0970a34f2c53",
        "orchestrationExecutionRequestID": "accab63f-da95-4ac4-8120-fcfca109011d",
        "analyticId": "b9e999a1-8931-459d-8d34-2f76c2b9fd95"
    }
}

Sample AnalyticDataWriteRequest

{
    "field": [
        {
            "fieldId": "bearing temperature",
            "fullyQualifiedPortName": "data.time_series.sum",
            "dataType": "DOUBLE_ARRAY",
            "engUnit": "Celsius",
            "data": [
                [
                    1473366334967,
                    1388.6599999999999,
                    "2"
                ],
                [
                    1473366334968,
                    2488.66,
                    "2"
                ],
                [
                    1473366334969,
                    3588.66,
                    "2"
                ]
            ],
            "queryCriteria": {
                "columns": [
                    "field_id",
                    "recorded_at",
                    "data_value"
                ],
                "table": "sensor_data"
            },
            "errorResponse": null
        }
    ],
    "customAttributes": {
        "IS_GENERIC_SCHEMA": "TRUE"
    },
    "systemAttributes": null,
    "orchestrationExecutionContext": {
        "assetId": "/assets/32-90effe42-eb21-4611-b734-83f707d89d7a",
        "orchestrationConfigurationID": "e06982f5-f446-46d2-bc21-9fdf64c111ab",
        "orchestrationExecutionRequestID": "8f990980-b208-4418-99b1-f46d26cd4530",
        "analyticId": null,
        "analyticName": null,
        "analyticVersion": null,
        "analyticExecutionRequestID": null
    },
    "dataSourceId": "Postgres Reference External Data Connector"
}

Sample AnalyticDataWriteResponse

{
    "field": [
        {
            "fieldId": "bearing temperature",
            "fullyQualifiedPortName": "data.time_series.sum",
            "dataType": "DOUBLE_ARRAY",
            "engUnit": "Celsius",
            "data": [
                [
                    1473366334967,
                    1388.6599999999999,
                    "2"
                ],
                [
                    1473366334968,
                    2488.66,
                    "2"
                ],
                [
                    1473366334969,
                    3588.66,
                    "2"
                ]
            ],
            "queryCriteria": {
                "columns": [
                    "field_id",
                    "recorded_at",
                    "data_value"
                ],
                "table": "sensor_data"
            }
        }
    ],
    "orchestrationExecutionContext": {
        "assetId": "/assets/32-90effe42-eb21-4611-b734-83f707d89d7a",
        "orchestrationConfigurationID": "e06982f5-f446-46d2-bc21-9fdf64c111ab",
        "orchestrationExecutionRequestID": "8f990980-b208-4418-99b1-f46d26cd4530"
    }
}

Sample AnalyticDataWriteResponse with Error

  "field": [
    {
      "fieldId": "bearing temperature",
      "fullyQualifiedPortName": "data.time_series.sum",
      "dataType": "DOUBLE_ARRAY",
      "engUnit": "Celsius",
      "data": [
        [
          1473366334967,
          1388.6599999999999,
          "2"
        ],
        [
          1473366334968,
          2488.66,
          "2"
        ],
        [
          1473366334969,
          3588.66,
          "2"
        ]
      ],
      "queryCriteria": {
        "columns": [
          "field_id",
          "recorded_at",
          "value"
        ],
        "table": "sensor_data"
      },
      "errorResponse": {
        "code": "FIELD_EXCEPTION",
        "message": "Unable to update field bearing temperature. StatementCallback; bad SQL grammar [ insert into sensor_data(asset_id, field_id, recorded_at, value) values( '/assets/37-a57b74c1-28e1-44a5-b59f-7456411a7ab5', 'bearing temperature', to_timestamp(1473366334967::double precision/1000), 1388.6599999999999 );  insert into sensor_data(asset_id, field_id, recorded_at, value) values( '/assets/37-a57b74c1-28e1-44a5-b59f-7456411a7ab5', 'bearing temperature', to_timestamp(1473366334968::double precision/1000), 2488.66 );  insert into sensor_data(asset_id, field_id, recorded_at, value) values( '/assets/37-a57b74c1-28e1-44a5-b59f-7456411a7ab5', 'bearing temperature', to_timestamp(1473366334969::double precision/1000), 3588.66 )]; nested exception is java.sql.BatchUpdateException: Batch entry 0  insert into sensor_data(asset_id, field_id, recorded_at, value) values( '/assets/37-a57b74c1-28e1-44a5-b59f-7456411a7ab5', 'bearing temperature', to_timestamp(1473366334967::double precision/1000), 1388.6599999999999 ) was aborted.  Call getNextException to see the cause.",
        "parameters": []
      }
    }
  ],
  "orchestrationExecutionContext": {
    "assetId": "/assets/37-a57b74c1-28e1-44a5-b59f-7456411a7ab5",
    "orchestrationConfigurationID": "eecc3b4a-274a-4fba-804a-c2d47f85e9f8",
    "orchestrationExecutionRequestID": "a418c284-774a-4080-8c59-b2afd5d0cc27"
  }
}