App Flows
App Flows is a feature in Amorphic that allows users to ingest data from SaaS applications into Amorphic Datasets for further ETL/analytics. Amorphic uses AWS App Flow in the backend to transfer the data. App Flow supports both AWS managed connectors and custom connectors created by the user. To use App Flows, there are three steps:
- Choose an inbuilt connector or create your own Custom Connector.
- Create an App Flow using the selected connector.
- Define a flow on the App Flow and run the flow.
Amorphic currently supports Slack, Zendesk, Salesforce, Google Analytics, and ServiceNow as AWS managed connectors and also allows the use of Custom Connectors. The first section will cover all functions related to custom connectors, if you are using an AWS managed connector, you can skip to App Flows section.
Use Case: Ingest data from Slack
The use case is to ingest data from Slack for further downstream analysis. The App Flow UI relies on uploading API payloads as JSON files.
App Flow has two components: App Flows and Flows.
App Flows are used to authenticate with third-party services like Slack and establish a connection with them. Once an App Flow is created, any number of Flows can be defined using it.
A Flow defines the specific data points to be ingested from the third-party service, such as User, Timestamp, Text, Threads, Replies, and Reactions for Slack. A flow can have three types of triggers: on-demand, scheduled, or event-based (only for Salesforce).
To create an App Flow, follow these steps:
- Go to
Ingestion
>App Flows
- Click on
+Create App Flow
- Upload a JSON file containing the payload to create an App Flow.
- The required parameters for each SaaS app will vary slightly. Refer to the specific SaaS App's documentation for more information. For Slack, the payload would be like shown below:
{
"ConnAppName": "slack_test_9",
"Description": "This is a test connection to ingest data from slack",
"SourceType": "slack",
"ConnectorProfileConfig": {
"clientId": "3567161620994.3567146437507",
"clientSecret": "80673273e291fc237f2a5dd41c64d07c",
"accessToken": "xoxp-3567161620994-3564254695269-3579807622033-c452d46e9e8fbac855852e7926e2b9cf",
"instanceUrl": "https://testorg-jvq2634.slack.com"
},
"Keywords": [
"test",
"slack",
"post"
]
}
Once the App Flow is created, you can create a Flow by following these steps:
- Go to the
Flows
tab - Click on
+ Create Flow
- Upload a JSON file containing the payload to create a Flow.
- Define the TriggerType, the dataset you want to write to, the file format, and specify the actual data points you want to read from the SaaS App.
These data points are specified in the Tasks parameter, which is specific to that SaaS App. For example, in this case, the data points are ts, type, and text.
{
"ConnAppFlowName": "slack_flow_11",
"TriggerType": "scheduled",
"TriggerProperties": {
"scheduleStartTime": "2022-12-25T01:45:00",
"scheduleEndTime": "2022-12-26T02:10:00",
"scheduleExpression": "rate(3days)",
"dataPullMode": "Complete"
},
"ApiVersion": "v1.0",
"SourceConnectorProperties": {
"Object": "conversations/C03HCTC8B0Q"
},
"CreateDataset": false,
"DatasetDetails": {
"DatasetId": "bb2d571e-65b3-4f22-b18a-39b8dc7f2ce0"
},
"DataFormat": "csv",
"Tasks": [
{
"taskType": "Filter",
"sourceFields": [
"ts",
"type",
"text"
],
"taskProperties": {},
"connectorOperator": {
"Slack": "PROJECTION"
}
},
{
"taskType": "Map",
"sourceFields": [
"ts"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "DateTime",
"DESTINATION_DATA_TYPE": "DateTime"
},
"destinationField": "ts",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Map",
"sourceFields": [
"type"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "String",
"DESTINATION_DATA_TYPE": "String"
},
"destinationField": "type",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Map",
"sourceFields": [
"text"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "String",
"DESTINATION_DATA_TYPE": "String"
},
"destinationField": "text",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Validate",
"sourceFields": [
"ts"
],
"taskProperties": {
"VALIDATION_ACTION": "DropRecord"
},
"connectorOperator": {
"Slack": "VALIDATE_NON_NULL"
}
}
],
"tags": {}
}
Click on View in the same drop down to go to the Flow Runs page and check the status.
Check the status of the flow run.
Once the flow run is complete, the data will be available in the dataset specified in step 4. This dataset can be used to perform further analysis, such as in an ETL job.
Custom Connector
Custom Connector allows ingestion of data from any 3rd party Saas application. This is enabled by the AppFlow Custom Connector SDK. For this purpose, the connector developer will need to implement 3 main interfaces from the SDK. These are:
Interface | Description |
---|---|
Configuration Handler | Defines all functionality related to authentication and configuration. |
Metadata Handler | Retrieves metadata and parses it into App Flow specific format. |
Record Handler | Defines all functionality related to record related CRUD operations. |
The connector developers therefore are essentially writing the wiring code to translate from vendor’s API to the generic AppFlow custom connector interface. The custom connector implementation code along with the SDK and any 3rd party dependencies needs to be bundled into a zip file. This zip file can be used to register a custom connector in Amorphic.
Below are the list of API's and their corresponding methods to be used to create, update and delete Custom Connectors in Amorphic:
API's List
/connections/apps/connectors
- GET -- Returns details about all custom & inbuilt connectors.
- POST -- Saves custom connector metadata & returns a signed URL for file upload.
/connections/apps/connectors/{connectorId}
- GET -- Returns details about a single custom connector.
- POST -- Start the asynchronous process for custom connector creation/updation.
- PUT -- Updates custom connector metadata & returns a signed URL for file upload.
- DELETE -- Delete a custom connector.
Please replace {connectorId}
with the ConnectorId which is received from the backend response.
API Request Payload Details
To retrieve details about all custom/inbuilt connectors as well as to download the Custom Connector template
/connections/apps/connectors & GET method
Response depends on query parameters(if any).
Query parameter Response None Returns details about all inbuilt connectors. connector_type = custom Returns details about all custom connectors. download_template = yes Returns a download link for a Custom Connector sample template. To create a custom connector
This is a multi step process - save metadata, upload zip file and then run the asynchronous process for registering a custom connector.
/connections/apps/connectors & POST method
Request payload for saving custom connector metadata{
"ConnectorName": <String>, (must be unique across an AWS account)
"Description": <String>,
"LambdaHandler": <String>,
"Keywords": <String>
}- LambdaHandler: The fully qualified name of the lambda handler used in the Custom Connector code.For example, in the provided Sample Connector Template, it will be: custom_connector_example.handlers.lambda_handler.app_lambda_handler
NoteThe response will contain a signed URL for uploading the custom connector zip file. Make a PUT request using this link. Please make the next API request only after this file is uploaded.
/connections/apps/connectors/{ConnectorId} & POST method
Request payload for registering custom connector{
"ConnectorAction" : "create-connector"
}To update a custom connector
Similar to create, this is also a multi step process - update metadata, upload zip file and then run the asynchronous process for registering a custom connector.
/connections/apps/connectors/{connectorId} & PUT method
Request payload for saving custom connector metadata{
"Description": <String>,
"LambdaHandler": <String>,
"Keywords": <String>
}- LambdaHandler: The fully qualified name of the lambda handler used in the Custom Connector code.For example, in the provided Sample Connector Template, it will be: custom_connector_example.handlers.lambda_handler.app_lambda_handler
NoteThe response will contain a signed URL for uploading the custom connector zip file. Make a PUT request using this link. Please make the next API request only after this file is uploaded.
/connections/apps/connectors/{connetorId} & POST method
Request payload for registering custom connector{
"ConnectorAction" : "update-connector"
}NoteIf the update fails for any reason, then the connector will automatically rollback to the previous implementation.
App Flows
Below are the list of API's and their corresponding methods to be used to create, update and delete App Flows and their corresponding data flows:
API's List
- /connections/apps
- POST -- Create App flow
- GET -- Returns all App Flows that a user has access to
- /connections/apps/{id}
- GET -- Returns details of a single App Flow
- PUT -- Edit a single App Flow
- DELETE -- Delete a single App Flow
- /connections/apps/{id}/flows
- POST -- Create a data flow
- GET -- Returns all data flows associated with an App Flow
- /connections/apps/{id}/flows/{flowid}
- POST -- Start or Stop a data flow
- PUT -- Edit a data flow
- GET -- Returns details of a single data flow
- DELETE -- Delete a data flow
- /connections/apps/{id}/flows/{flowid}/runs
- GET -- Returns run history of a single data flow
- /connections-apps/{id}/grants
- GET -- Returns all authorized users and tags of an App Flow
- POST -- Grant access to a user/tag
- DELETE -- Revoke access from a user/tag
Please replace variables enclosed in {}
with the corresponding values (Ex: ConnAppId or ConnAppFlowId which is received from the backend response)
API Request Payload Details
To create an App Flow
/connections/apps & POST method
Request payload to create App Flow{
"ConnAppName": <String>,
"Description": <String>,
"ConnectorName": <String>, (Applicable only for customconnector)
"SourceType": <String>, (slack or zendesk or salesforce or googleanalytics or servicenow or customconnector)
"ConnectorProfileConfig": { # For Inbuilt Connectors
"clientId": <String>, (Applicable only for slack or zendesk or googleanalytics)
"clientSecret": <String>, (Applicable only for slack or zendesk or googleanalytics)
"instanceUrl": <String>, (Applicable only for slack or zendesk or salesforce or servicenow)
"accessToken": <String>, (Applicable only for slack or zendesk or salesforce or googleanalytics),
"refreshToken": <String>, (Applicable only for salesforce or googleanalytics),
"username": <String>, (Applicable only for servicenow),
"password": <String>, (Applicable only for servicenow)
},
"ConnectorProfileConfig":{ # For Custom Connectors
"authenticationType": "OAUTH2"|"APIKEY"|"BASIC"|"CUSTOM", (required field, corresponding details needs to be provided)
"basic": { (optional field, required if authenticationType is "BASIC")
"username": <String>, (required field)
"password": <String> (required field)
},
"oauth2": { (optional field, required if authenticationType is "OAUTH2")
"clientId": <String>,
"clientSecret": <String>,
"accessToken": <String>,
"refreshToken": <String>,
},
"apiKey": { (optional field, required if authenticationType is "APIKEY")
"apiKey": <String>, (required field)
"apiSecretKey": <String>
},
"custom": { (optional field, required if authenticationType is "CUSTOM")
"customAuthenticationType": <String>, (required field)
"credentialsMap": {
"customProperty1": <String>,
"customProperty2": <String>
},
"profileProperties": { (optional runtime parameters)
"runtimeProperty1":<String>,
"runtimeProperty2":<String>
},
"oAuth2Properties":{ (optional field, required if authenticationType is "OAUTH2")
"tokenUrl":<String>, (required field)
"oAuth2GrantType":<String> (required field, 'CLIENT_CREDENTIALS' or 'AUTHORIZATION_CODE')
},
"Keywords": ["test", "new"]
}
}
The above payload can also be uploaded through UI as shown below:
The following picture depicts the App Flow details page in Amorphic:
To create a data flow
/connections/apps/{id}/flows & POST method
Request payload to create App Flow data flow{
"ConnAppFlowName": <String>,
"TriggerType": <String>, (scheduled or ondemand or event(applicable only for salesforce))
"TriggerProperties": {
"scheduleStartTime": <String>, (Timestamp in the format of YYYY-MM-DDThh:mm:ss)
"scheduleEndTime": <String>, (Timestamp in the format of YYYY-MM-DDThh:mm:ss)
"scheduleExpression": <String>, (cron and rate expressions are supported. Ex:- rate(2days), cron(0 12 * * ? *), etc)
"scheduleOffset":<String>, (Specifies the optional offset that is added to the time interval in the format of hh:mm:ss)
"dataPullMode": <String>, (Incremental or Complete)
},
"IncrementalPullConfig":<String>, (optional field for customconnector)
"ApiVersion": <String>, (optional field for customconnector)
"CreateDataset": Boolean (true or false),
"SourceConnectorProperties": {
"Object": <String> (Object that needed to transfer the data from source)
},
"DatasetDetails": {
"DatasetName" : <String>,
"Description": <String>,
"Keywords": <List of strings>,
"Domain": <String>,
"FileType": <String>,
"TargetLocation": <String>,
"DatasetId": <String>,
"SkipLZProcess": Boolean (true or false)
},
"DataFormat": <String>, (Only csv or parquet or others is supported)
"Tasks": <List of objects>
}- TriggerProperties: This field is supported/applicable when the TriggerType is of scheduled
- ApiVersion: This is an optional field for custom connectors, which specifies the Saas Application's API version.
- IncrementalPullConfig: This is an optional field for custom connectors, which specifies which field should be used to identify records that have changed since the last flow run, for incremental flows.
- CreateDataset: This is a Boolean key with possible values of true or false.
- DatasetDetails: Fields in this object are determined by the key CreateDataset. When it is set to true then all the metadata related to dataset creation is required in the input body as stated in the above payload else only DatasetId is required.
- SourceConnectorProperties: This is where we need to specify what data from the source needs to be transferred to the target (Ex:- conversations/C0234C1JTUP).
- DataFormat: Format of the data in which the data needs to be transferred ("csv", "parquet" & "others" formats are supported)
- Tasks: List of objects, all the columns that are needed from the source and their corresponding data types with the target names and data types need to be specified in the Tasks list as individual objects. An example is provided at the end of this page
The above payload can also be uploaded through UI as shown below:
The following picture depicts the Flow listing page in Amorphic, along with the various options:
To start or stop a data flow
/connections/apps/{id}/flows/{flowid} & POST method
Request payload to start or stop a data flow{
"ConnsAppsFlowAction": <String> (Allowed values are start-flow, stop-flow)
}- For an ondemand type of data flow start-flow will start the run and stop-flow will stop the data flow.
- For a scheduled type of data flow start-flow will activate the data flow if it is in suspended state and stop-flow action is not supported
The following picture depicts the Flow Runs details page in Amorphic:
To grant Flow access to a tag
connections-apps/{id}/grants & POST method
Grant access to a tag{
"TagKey": <tag_key>,
"TagValue": <tag_value>,
"AccessType": <String> (Allowed values are "owner" or "editor" or "read-only")
}
To grant access to an individual user.
connections-apps/{id}/grants & POST method
Grant access to a user{
"TagKey": "user",
"TagValue": <user_id>,
"AccessType": <String> (Allowed values are "owner" or "editor" or "read-only")
}connections-apps/{id}/grants & DELETE method will revoke the access from the user
To retrieve metadata about objects.
/connections/apps/{id}?request_type=list_connector_entities & GET method
- The above call will return all the top level objects for that connector.
- If the response for the desired object contains hasNestedEntities = true, then it means there are lower level objects.
- In this case, make a second request with an additional query parameter, entity_type = "top_level_object".
- Include a query parameter, api_version = "version" when using Custom Connectors.This specifies the API version of the underlying SaaS application.
To retrieve metadata about attributes of an object.
/connections/apps/{id}?request_type=describe_connector_entity&entity_type="object_name" & GET method
- The above call will return all the fields associated with the object.
- Include a query parameter, api_version = "version" when using Custom Connectors.This specifies the API version of the underlying SaaS application.
To retrieve all datasets associated with an App Flow.
/connections/apps/{id}?request_type=get_dependent_resources & GET method
- The above call will return all the datasets associated with the App Flow.
- Dataset details includes Id, Name and Domain.
Sample payloads
{
"ConnAppName": "slack-conn-profile",
"Description": "This is a test connection to ingest data from slack",
"SourceType": "slack",
"ConnectorProfileConfig": {
"clientId": "214567891798.2144988765890",
"clientSecret": "21216f9185b2ff73905aa5676589a",
"instanceUrl": "https://abcd-hq.slack.com",
"accessToken": "xoxp-2114398501798-2144980258432-456788970-cc3b194d3f0e05620bafdbdc77870285"
},
"Keywords": ["test", "new"]
}
{ "ConnAppName": "custom-conn-profile",
"ConnectorName": "custom-connector-name",
"Description": "This is a test connection to ingest data from custom connector",
"SourceType": "customconnector",
"ConnectorProfileConfig": {
"authenticationType": "oauth2",
"oauth2": {
"clientId": "3MVG9pRzvMkjMb6mEZ2PKzbg7vJrNdV3PGMXkTNi1Oh4O1AXhPY6htzxylytj9W.ZsqCneFQZLEC3JIbIAd",
"clientSecret": "996205D1E0E9C830D0B2A951BF4243EA43F20A6452455EF2B04D042376AF5FB4",
"accessToken": "00D5j00000AFMHW!ARgAQKvyyO2i0hXUV163vl00JOcPlKhI0tOexI7mXiLVzvoo6YKvZ.oORG.CnwWqRu9pFfbM8v__KKdyFeYdIrNxsjxlN",
"refreshToken": "5Aep861mdLLi91HqFfChWtIcJFTpP_yPr4TdBUsgTw2Cw7smtNx9KIxIOU874xUg7QgwsNkNmOd1eJrTbTk0Uma"
},
"profileProperties": {
"api_version": "v51.0",
"instanceUrl":"https://cloudwick-dev-dev-ed.my.salesforce.com"
},
"oAuth2Properties":{
"tokenUrl":"https://cloudwick-dev-dev-ed.my.salesforce.com/services/oauth2/token",
"oAuth2GrantType":"AUTHORIZATION_CODE"
}
},
"Keywords": ["test", "custom","post"]
}
{
"ConnAppFlowName": "slack-data-flow-ondemand",
"TriggerType": "ondemand",
"SourceType": "slack",
"SourceConnectorProperties": {
"Object": "conversations/C0234C1JTUP"
},
"CreateDataset": true,
"DatasetDetails": {
"DatasetName" : "slack_dataset",
"Description": "This is a test dataset to ingest slack data",
"Keywords": ["Test", "SlackData"],
"Domain": "testapr22",
"FileType": "csv",
"TargetLocation": "s3",
"SkipLZProcess" : true
},
"DataFormat": "csv",
"Tasks": [
{
"connectorOperator" : { "Slack": "GREATER_THAN"},
"sourceFields" : [ "ts" ],
"taskProperties" : {
"DATA_TYPE" : "date",
"VALUE" : "1592159400000"
},
"taskType" : "Filter"
},
{
"destinationField" : "attachments",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "attachments" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "object",
"SOURCE_DATA_TYPE" : "object"
},
"taskType" : "Map"
},
{
"destinationField" : "bot_id",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "bot_id" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
},
{
"destinationField" : "blocks",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "blocks" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "object",
"SOURCE_DATA_TYPE" : "object"
},
"taskType" : "Map"
},
{
"destinationField" : "client_msg_id",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "client_msg_id" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
},
{
"destinationField" : "is_starred",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "is_starred" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "boolean",
"SOURCE_DATA_TYPE" : "boolean"
},
"taskType" : "Map"
}, {
"destinationField" : "last_read",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "last_read" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "latest_reply",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "latest_reply" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "reactions",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "reactions" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "object",
"SOURCE_DATA_TYPE" : "object"
},
"taskType" : "Map"
}, {
"destinationField" : "replies",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "replies" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "object",
"SOURCE_DATA_TYPE" : "object"
},
"taskType" : "Map"
}, {
"destinationField" : "reply_count",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "reply_count" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "integer",
"SOURCE_DATA_TYPE" : "integer"
},
"taskType" : "Map"
}, {
"destinationField" : "reply_users",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "reply_users" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "object",
"SOURCE_DATA_TYPE" : "object"
},
"taskType" : "Map"
}, {
"destinationField" : "reply_users_count",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "reply_users_count" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "integer",
"SOURCE_DATA_TYPE" : "integer"
},
"taskType" : "Map"
}, {
"destinationField" : "subscribed",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "subscribed" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "boolean",
"SOURCE_DATA_TYPE" : "boolean"
},
"taskType" : "Map"
}, {
"destinationField" : "subtype",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "subtype" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "text",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "text" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "team",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "team" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "thread_ts",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "thread_ts" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "ts",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "ts" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "date",
"SOURCE_DATA_TYPE" : "date"
},
"taskType" : "Map"
}, {
"destinationField" : "type",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "type" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"destinationField" : "user",
"connectorOperator" : { "Slack": "NO_OP"},
"sourceFields" : [ "user" ],
"taskProperties" : {
"DESTINATION_DATA_TYPE" : "string",
"SOURCE_DATA_TYPE" : "string"
},
"taskType" : "Map"
}, {
"connectorOperator" : { "Slack": "VALIDATE_NUMERIC"},
"sourceFields" : [ "client_msg_id" ],
"taskProperties" : {
"VALIDATION_ACTION" : "DropRecord"
},
"taskType" : "Validate"
}
]
}
For scheduled type of flow, TriggerProperties will be like below
{
"TriggerProperties": {
"scheduleStartTime": "2021-08-25T01:45:00",
"scheduleEndTime": "2021-08-26T02:10:00",
"scheduleExpression": "rate(1days)",
"dataPullMode": "Complete"
}
}
{
"ConnAppFlowName": "slack-custom-flow",
"TriggerType": "scheduled",
"TriggerProperties": {
"scheduleStartTime": "2022-05-17T09:44:00",
"scheduleEndTime": "2022-05-17T09:50:00",
"scheduleExpression": "rate(2minutes)",
"dataPullMode": "Incremental"
},
"SourceConnectorProperties": {
"Object": "conversations/C035Y07CFJQ"
},
"IncrementalPullConfig":"ts",
"CreateDataset": false,
"DatasetDetails": {
"DatasetId": "2de2497c-8b96-47c1-b0b0-aabd80cb524e"
},
"DataFormat": "csv",
"Tasks": [
{
"taskType": "Filter",
"sourceFields": [
"ts",
"type",
"text"
],
"taskProperties": {},
"connectorOperator": {
"Slack": "PROJECTION"
}
},
{
"taskType": "Map",
"sourceFields": [
"ts"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "DateTime",
"DESTINATION_DATA_TYPE": "DateTime"
},
"destinationField": "ts",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Map",
"sourceFields": [
"type"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "String",
"DESTINATION_DATA_TYPE": "String"
},
"destinationField": "type",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Map",
"sourceFields": [
"text"
],
"taskProperties": {
"SOURCE_DATA_TYPE": "String",
"DESTINATION_DATA_TYPE": "String"
},
"destinationField": "text",
"connectorOperator": {
"Slack": "NO_OP"
}
},
{
"taskType": "Validate",
"sourceFields": [
"ts"
],
"taskProperties": {
"VALIDATION_ACTION": "DropRecord"
},
"connectorOperator": {
"Slack": "VALIDATE_NON_NULL"
}
}
],
"tags": {}
}
{
"ConnsAppsFlowAction": "start-flow"
}