A previous post explored how to deal with Amazon Athena queries asynchronously. The function presented is a beast, though it is on purpose (to provide options for folks).
In reality, nobody really wants to use rJava
wrappers much anymore and dealing with icky Python library calls directly just feels wrong, plus Python functions often return truly daft/ugly data structures. R users deserve better than that.
R is not a first-class citizen in Amazon-land and while the cloudyr
project does a fine job building native-R packages for various Amazon services, the fact remains that the official Amazon SDKs are in other languages. The reticulate
package provides an elegant interface to Python so it seemed to make sense to go ahead and wrap the boto3
Athena client into something more R-like and toss in the collect_async()
function for good measure.
Dependencies
I forced a dependency on Python 3.5 because friends don’t let friends rely on dated, fragmented ecosystems. Python versions can gracefully (mostly) coexist so there should be no pain/angst associated with keeping an updated Python 3 environment around. As noted in the package, I highly recommend adding RETICULATE_PYTHON=/usr/local/bin/python3
to your R environment (~/.Renviron
is a good place to store it) since it will help reticulate
find the proper target.
If boto3
is not installed, you will need to do pip3 install boto3
to ensure you have the necessary Python module available and associated with your Python 3 installation.
It may seem obvious, but an Amazon AWS account is also required and you should be familiar with the Athena service and AWS services in general. Most of the roto.athena
functions have a set of optional parameters:
aws_access_key_id
aws_secret_access_key
aws_session_token
region_name
profile_name
Ideally, these should be in setup in the proper configuration files and you should let boto3
handle the details of retrieving them. One parameter you will see used in many of my examples is profile_name = "personal"
. I have numerous AWS accounts and manage them via the profile ids. By ensuring the AWS configuration files are thoroughly populated, I avoid the need to load and pass around the various keys and/or tokens most AWS SDK API calls require. You can read more about profile management in the official docs: 1, 2.
Usage
The project README and package manual pages are populated and have a smattering of usage examples. It is likely you will really just want to execute a manually prepared SQL query and retrieve the results or do the dplyr
dance and collect the results asynchronously. We’ll cover both of those use-cases now, starting with a manual SQL query.
If you have not deleted it, your Athena instance comes with a sampledb
that contains an elb_logs
table. We’ll use that for our example queries. First, let’s get the packages we’ll be using out of the way:
library(odbc)
library(DBI) # for dplyr access later
library(odbc) # for dplyr access later
library(roto.athena) # hrbrmstr/roto.athena on gh or gl
library(tidyverse) # b/c it rocks
Now, we’ll prepare and execute the query. This is a super-simple one:
query <- "SELECT COUNT(requestip) AS ct FROM elb_logs"
start_query_execution(
query = query,
database = "sampledb",
output_location = "s3://aws-athena-query-results-redacted",
profile = "personal"
) -> qex_id
The qex_id
contains the query execution id. We can pass that along to get information on the status of the query:
get_query_execution(qex_id, profile = "personal") %>%
glimpse()
## Observations: 1
## Variables: 10
## $ query_execution_id "7f8d8bd6-9fe6-4a26-a021-ee10470c1048"
## $ query "SELECT COUNT(requestip) AS ct FROM elb_logs"
## $ output_location "s3://aws-athena-query-results-redacted/7f...
## $ database "sampledb"
## $ state "RUNNING"
## $ state_change_reason NA
## $ submitted "2018-07-20 11:06:06.468000-04:00"
## $ completed NA
## $ execution_time_ms NA
## $ bytes_scanned NA
If the state
is not SUCCEEDED
then you’ll need to be patient before trying to retrieve the results.
get_query_results(qex_id, profile = "personal")
## # A tibble: 1 x 1
## ct
##
## 1 4229
Now, we’ll use dplyr
via the Athena ODBC driver:
DBI::dbConnect(
odbc::odbc(),
driver = "/Library/simba/athenaodbc/lib/libathenaodbc_sbu.dylib",
Schema = "sampledb",
AwsRegion = "us-east-1",
AwsProfile = "personal",
AuthenticationType = "IAM Profile",
S3OutputLocation = "s3://aws-athena-query-results-redacted"
) -> con
elb_logs <- tbl(con, "elb_logs")
I've got the ODBC DBI fragment in a parameterized RStudio snippet and others may find that as a time-saver if you're not doing that already.
Now to build and submit the query:
mutate(elb_logs, tsday = substr(timestamp, 1, 10)) %>%
filter(tsday == "2014-09-29") %>%
select(requestip, requestprocessingtime) %>%
collect_async(
database = "sampledb",
output_location = "s3://aws-athena-query-results-redacted",
profile_name = "personal"
) -> qex_id
As noted in the previous blog post, collect_async()
turn the dplyr
chain into a SQL query then fires off the whole thing to start_query_execution()
for you and returns the query execution id:
get_query_execution(qex_id, profile = "personal") %>%
glimpse()
## Observations: 1
## Variables: 10
## $ query_execution_id "95bd158b-7790-42ba-aa83-e7436c3470fe"
## $ query "SELECT \"requestip\", \"requestprocessing...
## $ output_location "s3://aws-athena-query-results-redacted/95...
## $ database "sampledb"
## $ state "RUNNING"
## $ state_change_reason NA
## $ submitted "2018-07-20 11:06:12.817000-04:00"
## $ completed NA
## $ execution_time_ms NA
## $ bytes_scanned NA
Again, you'll need to be patient and wait for the state
to be SUCCEEDED
to retrieve the results.
get_query_results(qex_id, profile = "personal")
## # A tibble: 774 x 2
## requestip requestprocessingtime
##
## 1 255.48.150.122 0.0000900
## 2 249.213.227.93 0.0000970
## 3 245.108.120.229 0.0000870
## 4 241.112.203.216 0.0000940
## 5 241.43.107.223 0.0000760
## 6 249.117.98.137 0.0000830
## 7 250.134.112.194 0.0000630
## 8 250.200.171.222 0.0000540
## 9 248.193.76.218 0.0000820
## 10 250.57.61.131 0.0000870
## # ... with 764 more rows
You can also use the query execution id to sync the resultant CSV from S3. Which one is more performant is definitely something you'll need to test since it varies with AWS region, result set size, your network connection and other environment variables. One benefit of using get_query_results()
is that it uses the column types to set the data frame column types appropriately (I still need to setup a full test of all possible types so not all are handled yet).
Kick the tyres
The package is up on both GitLab and GitHub and any and all feedback (i.e. Issues) or tweaks (i.e. PRs) are most welcome.