Data Engineers often need to write workflows that read data, process it, and then output the results. When the volume of data handled is very large, it's common to use cluster-based tools, such as Apache Spark. These tools increase both running costs and solution complexity.
Often, the data volumes are high, the data is in a queryable format, and the required processing is very straightforward. In these cases, the increased complexity and costs of using Apache Spark may not be worth it.
In this post, we present a lightweight, scalable method for processing large volumes of data at a low cost.
Launch your business in minutes with GoDaddy Airo™
Problem: Making simple things easy
The majority of our team's data processing needs are simple enough that they do not require a complex tool like Apache Spark. The challenge that we faced was finding a way to express simple workflows with ease.
We wanted our solution to have the following properties:
- Writing, understanding, and maintaining workflows should be easy.
- Workflows should be able to scale to very large volumes of data.
- Workflows should be re-runnable. Each time a workflow is re-run, it should leave the output data in a consistent state that reflects the latest run rather than the accumulation of all runs.
- Each workflow should track its running costs.
- Engineers should be able to exert fine-grained control over running costs.
Solution: Apache Airflow, SQL templates, and Amazon Athena
Our first task in designing a solution was to analyze our data processing needs. We concluded that we would be able to do all our processing using SQL. SQL has the benefit of being powerful, well-known, easy to write, and easy to understand. Using SQL as our starting point, we knew we'd need to have something to query the data and something to orchestrate the workflows. We landed on Amazon Athena and Apache Airflow to handle those duties.
Amazon Athena
Amazon Athena is a serverless, petabyte-scale SQL query execution engine for data on S3. The pricing model for Athena is based on the number of bytes of data scanned. This allows us to control query costs by limiting the data scanned.
It is worth noting that undisciplined use of Athena can result in very high costs. For this reason, we recommend using CloudWatch Alarms to monitor the data scanned and Athena Workgroups to limit the data scanned.
One of the primary means of limiting the amount of data scanned is to use time-based partitions when creating tables. This allows queries to only scan the data for a specified date range.
Athena can insert the results of a query into an existing table and update the partitions of that table by using INSERT INTO
. Athena generates output files from INSERT INTO
on S3 but it will never delete files on S3. If you run the same INSERT INTO
query multiple times, Athena will generate new output each time, but it will never delete the output from previous runs.
Given the above, we are able to use Athena to perform our data processing in a simple and scalable way. The billing basis allows us to manage our costs in a fine-grained manner.
Apache Airflow
We use Apache Airflow to schedule and orchestrate our workflows. Workflows are defined as directed acyclic graphs (DAGs) of dependent tasks. Each task is a type of Airflow operator and specifies some unit of work to be done. For example, Amazon provides the AthenaOperator
that executes the provided SQL query.
Airflow allows us to re-run previously scheduled DAGs or even subsets of tasks. By ensuring that each task execution only updates a single partition, it becomes easy to understand the outcome of re-running multiple tasks.
SQL templates
We use Airflow's Macros and Templates to substitute the workflow's scheduled execution date into our SQL query strings. This allows us to ensure that each execution of the workflow only scans a specific subset of table partitions.
For example, the following query string includes a variable date depending on the workflow's scheduled execution date:
SELECT
partition_utc_date,
element_type_name,
COUNT(DISTINCT event_id) as tap_cnt
FROM element_tap_events
-- `ds` is a variable provided by Airflow that represents that execution date for a DAG run in the format YYYY-MM-DD.
WHERE partition_utc_date = DATE('{{ ds }}');
So a workflow execution scheduled for November 1st, 2023 would render as follows:
SELECT
partition_utc_date,
element_type_name,
COUNT(DISTINCT event_id) as tap_cnt
FROM element_tap_events
WHERE partition_utc_date = DATE('2023-11-01');
Re-runnable tasks
We now have all the required components:
- SQL to express our data processing.
- Amazon Athena to execute the SQL and output the results to a table.
- Airflow to orchestrate our workflows and provide date-specific SQL.
What is missing is the ability to have a consistent state after re-running tasks. If we re-run a task using the AthenaOperator
for a single date, then the output will be appended to the existing data for that date. Further, the AthenaOperator
does not publish CloudWatch metrics for specific tasks. CloudWatch metrics enable us to easily identify expensive workflows. We needed a solution to provide a consistent state that allowed us to also publish CloudWatch metrics.
Custom Airflow operator: AthenaLoadPartitionOperator
We wrote our own Airflow operator to provide the full set of functionality that we needed. The operator does the following:
- Executes the provided SQL template as an
INSERT INTO
query. - Deletes old S3 data from the overwritten partitions after the query execution succeeds.
- Publishes CloudWatch metrics using the workflow and task name.
Example
The following script aggregates impression and click events to calculate click rates for each element and for each element type:
with DAG("element_click_rates",
schedule_interval="0 1 * * *",
start_date=datetime.datetime(2023, 10, 1)) as dag:
load_element_rates = AthenaLoadPartitionOperator(task_id="load_element_click_rates",
database="user_activity",
table="element_click_rate",
write_disposition=OVERWRITE_PARTITIONS,
query="element_click_rates.sql")
load_element_type_rates = AthenaLoadPartitionOperator(task_id="load_element_type_click_rates",
database="user_activity",
table="element_type_click_rate",
write_disposition=OVERWRITE_PARTITIONS,
query="element_type_click_rates.sql")
load_element_rates >> load_element_type_rates
Airflow infers that the values for query
for each operator refer to files and will attempt to load files with those names. This allows us to keep our SQL separate from the Python source files.
The element_impression_event
table is very large because users can view hundreds of elements in each interaction. Therefore, it is much cheaper to only incur the cost of querying that table once per workflow run.
The results of the following SQL file will be written to a partition in the table element_click_rate
. element_click_rates.sql
SELECT
impression.element_id,
impression.element_type,
COUNT(DISTINCT click.event_id) as click_cnt,
COUNT(DISTINCT impression.event_id) as impression_cnt,
COALESCE(COUNT(DISTINCT click.event_id), 0)/COUNT(DISTINCT impression.event_id) as element_click_rate,
DATE('{{ ds }}') as partition_utc_date
FROM element_impression_event impression
LEFT JOIN element_clicked_event click
ON impression.element_id = click.element_id
AND click.partition_utc_date = DATE('{{ ds }}')
WHERE impression.partition_utc_date = DATE('{{ ds }}')
GROUP BY
impression.element_id,
impression.element_type;
The results of the following SQL file will be written to a partition in the table element_type_click_rate
. element_type_click_rates.sql
SELECT
element_type,
SUM(click_cnt) as total_click_cnt,
SUM(impression_cnt) as total_impression_cnt,
SUM(click_cnt)/SUM(impression_cnt) as element_type_click_rate,
partition_utc_date
FROM element_click_rate
WHERE partition_utc_date = DATE('{{ ds }}')
GROUP BY
partition_utc_date,
element_type;
Consider the scenario in which we discover that an upstream error resulted in only a subset of element click events being loaded for a previous execution date. With the above, it is trivial to re-run the whole workflow for the affected date. The partition for that date for both element_click_rate
and element_type_click_rate
would then only contain the newest, most correct values.
Conclusion
The process we implemented to read, process, and output data has proven to be very simple to use yet quite powerful. It can also be extremely cost-effective. Our team manages 50TB of data, but our monthly Athena processing costs are less than $40 (though our S3 costs are billed separately and on a different basis). Sometimes the simple and least expensive solutions are the best ones.
Photo by Dan Roizer on Unsplash