Building Extensible Data Processing Pipeline Using Snowflake External Functions

Susmit Sircar
8 min readApr 13, 2022
Snowflake Integrated Data Platform

External Functions

External functions are user-defined functions (UDFs)that are stored and executed outside of Snowflake.

External functions make it easier to access external API services such as geocoders, data transformation, machine learning models, and other custom code running outside of Snowflake. This feature eliminates the need to export and reimport data when using third-party services, significantly simplifying the data pipeline.

How does an External function work?

An external function calls code that is executed outside Snowflake. Snowflake stores security-related external function information in the API integration object.

Information flow from a client program

The external function is a type of UDF. Unlike other UDFs, an external function does not contain its own code instead, the external function calls code that is stored and executed outside Snowflake.

Inside Snowflake, the external function is stored as a database object that contains information that Snowflake uses to call the remote service. This stored information includes the URL of the proxy service that relays information to and from the remote service.

Snowflake does not call a remote service directly. Instead, Snowflake calls a proxy service, which delegates the data to the remote service. The proxy service is basically an API Gateway, which can increase security by authenticating requests to the remote service using a Lambda authorizer or IAM Roles and Policies.

Examples of proxy services include:

  • Amazon API Gateway
  • Google Cloud API Gateway
  • Microsoft Azure API Management service

The main steps involved calling an external function:

  1. Open a Snowflake session, typically a Snowflake web interface session or developers can use Snowflake JDBC diver. Trigger the below command to create an API integration

2. Record the API_AWS_IAM_USER_ARN and API_AWS_EXTERNAL_ID. Execute the query below

3. Now we have to link the API Integration for AWS to the Proxy Service (API Gateway) in the AWS Management Console. We do this by creating a TRUST relationship between Snowflake and the IAM (identity and access management) role, in the AWS management console. For details please follow here.

4. We need to create the External Function for AWS in Snowflake. For more information refer

5. Lastly, we need to trigger the select query using the external function on the column person of the type array, and we finally get the JSON response back from the remote code.

Designing a Batch Processing ELT data pipeline

What is ELT?

ELT stands for “extract, load, and transform” — the processes a data pipeline uses to replicate data from a source system into a target system such as a cloud data warehouse (Snowflake in our case).

  • Extraction: This first step involves copying data from the source system. In our use case, we will be extracting the data (in semi-structured form) from different data sources: Kafka, AWS Kinesis Streams, Oracle DB, Cassandra No-SQL, etc. and will be dumping it in AWS S3 buckets by means of connectors.
  • Loading: During the loading step, the pipeline replicates data from the source into the target system, which might be a data warehouse or data lake. In our use case, once the data is in the AWS S3 object storage, we will load the same to the Snowflake warehouse and will be using Snowflake Storage Integration
  • Transformation: Once the data is in the target system, organizations can run whatever transformations they need. Often organizations will transform raw data in different ways for use with different tools or business processes. In this stage, we will be using the Snowflake External Functions to interact with the remote code, where we will be transforming and doing aggregations on our data.

Data flow and different components

Batch Processing ELT Data Pipeline using Snowflake as a Data Warehouse
  1. Data from different data sources will be pushed into the S3 buckets by means of connectors, at different time intervals
  2. Once the data resides inside the S3 object storage we can create a data pipeline scheduled on a daily interval to pull the data from S3 to the Snowflake data warehouse. Orchestration tools like Apache Airflow can be used to serve the purpose. This blog perfectly sums up how to use Airflow to transfer data from AWS S3 to Snowflake, or we can trigger manually as well by means of Snowflake Storage Integration
  3. Data is now loaded to the Snowflake data warehouse, Business Analysts, Data Engineers, and Data Scientists can execute a high volume of workloads (in batches) in Snowflake data warehouses via External User Defined Functions (UDF)
  4. The call to the remote service can be synchronous or asynchronous. The blog will focus on the asynchronous request-reply pattern for the following reasons:
  • An asynchronous remote service can be polled while the caller waits for results.
  • Asynchronous handling reduces sensitivity to timeouts (29 seconds time-out for AWS API gateway and 15 minutes for AWS Lambda functions), the primary reason for choosing Asynchronous Request-Reply Pattern over Synchronous

The following diagram shows a typical flow:

Asynchronous Request-Reply pattern
  • When the remote service initially receives an HTTP POST for a specific batch of rows, the remote service returns HTTP code 202 (“Processing not completed”).
  • If the remote service receives any HTTP GET requests from Snowflake after the POST but before the output is ready, the remote service returns HTTP code 202 again.
  • After the remote service has generated all of the output rows, it waits for the next HTTP GET with the same batch ID and then returns the rows received, along with HTTP code 200 (“Successful completion”).

5. Once the query is triggered, the request is first authenticated via the AWS IAM role trust relationship and then delegated to the AWS API gateway, every request (POST/GET)will have additional headers: API key and secret which will be needed while generating the access tokens to invoke the 3rd party Enrichment APIs (as shown in the workflow above)

6. From the API Gateway the call is delegated to the AWS Lambda function, the Publish Handler. The responsibility of the handler is to push the incoming events to the AWS SQS and send HTTP 202 back to the Snowflake client.

Note:

Once Snowflake gets an HTTP 202 response, then Snowflake loops until one of the following are true:

  • Snowflake receives the data and an HTTP 200.
  • Snowflake’s internal timeout is reached.
  • Snowflake receives an error (e.g. HTTP response code 5XX).

In each iteration of the loop, Snowflake delays, then issue an HTTP GET that contains the same batch ID as the corresponding HTTP POST’s batch ID, so that the remote service can return information for the correct batch.

The delay inside the loop starts out short but grows longer for each HTTP 202 response received until Snowflake’s timeout is reached (which is 10 minutes and cannot be changed).

7. Meanwhile the processing handler pulls the event from the queue, extracts the header information the API Key, and the secret, and invokes a REST API call to the Okta Identity Providers API (OAuth2.0). If the access token request is valid and authorized, the authorization server issues an access token as described below.

For example, the client makes the following HTTP request using transport-layer security (with extra line breaks for display purposes only):

POST /token HTTP/1.1
Host: server.example.com
Authorization: Basic czZCaGRSa3F0MzpnWDFmQmF0M2JW
Content-Type: application/x-www-form-urlencoded

grant_type=client_credentials

An example successful response:

HTTP/1.1 200 OK
Content-Type: application/json;charset=UTF-8
Cache-Control: no-store
Pragma: no-cache
{
"access_token":"2YotnFZFEjr1zCsicMWpAA",
"token_type":"bearer",
"expires_in":3600,
"org_id":"ABC Consultant"
}

8. The Processing handler then issues an HTTP POST call to the Data Enrichment APIs along with the bearer token acquired from Step 7.

The Data Enrichment API is being deployed in K8s and is responsible for performing aggregation of the incoming request and sending it back to the handler. The handler finally persists the JSON response to the DynamoDB along with the query batch Id as the key.

9. Once Snowflake gets an HTTP 202 response, from the Publish Handler (mentioned in Step 6), it then issues an HTTP GET, the API mapping for the GET is being maintained in the API Gateway mapping template, and thereafter the call gets delegated to the Processing Handler as shown in the architecture above.

10. The Processing Handler then queries the DynamoDB for that query batch Id get’s the JSON value and sends it back to the Snowflake client with a status HTTP code 200 (“Successful completion…”).

Scalability

The AWS Lambda Function, API Gateway, DynamoDB, Okta Server, Data Enrichment APIs, and any other components involved between the Snowflake and the remote service, must be able to handle the peak workloads sent to them.

Larger Snowflake warehouse sizes can increase the concurrency with which requests are sent, which might exceed the API Gateway’s quota for that account.

By default API gateway account level throttle limits to 10,000 Requests per second (RPS), any requests above that will start throttling

The remote service provider is responsible for providing enough capacity to handle peak workloads from Snowflake. Different techniques can be used to scale the service, such as hosting in an auto-scaled/elastic service, such as AWS Lambda. We can use Provisioned Concurrency to avoid the “Cold Start” problem for the initial burst of requests from Snowflake to Lambda.

For DynamoDB it’s better to use On-Demand mode so that DynamoDB can instantly accommodate customers’ workloads as they ramp up or down to any previously observed traffic level. If the level of traffic hits a new peak, DynamoDB adapts rapidly to accommodate the workload.

The memory usage for the Lambda function can be fine-tuned to an optimum range based on the traffic (3 GB per instance will suffice)

Concurrency

Several factors affect the number of concurrent calls from Snowflake to a proxy service or remote service, including:

  • The number of concurrent users who are running queries with external functions.
  • The batch size of each query
  • The amount of compute resources in the virtual warehouse (how powerful the compute machines are, for instance, AWS EC2 machines)

The developer needs to be very sure in handling concurrency properly, it can be particularly complex if external functions have side effects.

If you found this blog helpful please give a clap and please do share with a larger community,

Cheers & Happy Reading!

--

--