Skip navigation

Category Archives: drill

(For first-timers, R⁶ tagged posts are short & sweet with minimal expository; R⁶ feed)

At work-work I mostly deal with medium-to-large-ish data. I often want to poke at new or existing data sets w/o working across billions of rows. I also use Apache Drill for much of my exploratory work.

Here’s how to uniformly sample data from Apache Drill using the sergeant package:

library(sergeant)

db <- src_drill("sonar")
tbl <- tbl(db, "dfs.dns.`aaaa.parquet`")

summarise(tbl, n=n())
## # Source:   lazy query [?? x 1]
## # Database: DrillConnection
##          n
##      <int>
## 1 19977415

mutate(tbl, r=rand()) %>% 
  filter(r <= 0.01) %>% 
  summarise(n=n())
## # Source:   lazy query [?? x 1]
## # Database: DrillConnection
##        n
##    <int>
## 1 199808

mutate(tbl, r=rand()) %>% 
  filter(r <= 0.50) %>% 
  summarise(n=n())
## # Source:   lazy query [?? x 1]
## # Database: DrillConnection
##         n
##     <int>
## 1 9988797

And, for groups (using a different/larger “database”):

fdns <- tbl(db, "dfs.fdns.`201708`")

summarise(fdns, n=n())
## # Source:   lazy query [?? x 1]
## # Database: DrillConnection
##            n
##        <int>
## 1 1895133100

filter(fdns, type %in% c("cname", "txt")) %>% 
  count(type)
## # Source:   lazy query [?? x 2]
## # Database: DrillConnection
##    type        n
##   <chr>    <int>
## 1 cname 15389064
## 2   txt 67576750

filter(fdns, type %in% c("cname", "txt")) %>% 
  group_by(type) %>% 
  mutate(r=rand()) %>% 
  ungroup() %>% 
  filter(r <= 0.15) %>% 
  count(type)
## # Source:   lazy query [?? x 2]
## # Database: DrillConnection
##    type        n
##   <chr>    <int>
## 1 cname  2307604
## 2   txt 10132672

I will (hopefully) be better at cranking these bite-sized posts more frequently in 2018.

Putting this here to make it easier for others who try to Google this topic to find it w/o having to find and tediously search through other UDFs (user-defined functions).

I was/am making a custom UDF for base64 decoding/encoding and ran into:

SYSTEM ERROR: IndexOutOfBoundsException: index: 0, length: #### (expected: range(0, 256))

It’s incredibly easy to “fix” (and, if my Java weren’t so rusty I’d likely have seen it sooner) but I found this idiom in the spatial UDFs for Drill that enables increasing the default buffer size:

buffer = out.buffer = buffer.reallocIfNeeded(outputSize);

Hopefully this will prevent someone else from spinning a few minutes trying to tackle this use-case. I even had looked at the source for the DrillBuf class and did not manage to put 2 + 2 together for some reason.

It’s no secret that I’m a fan of Apache Drill. One big strength of the platform is that it normalizes the access to diverse data sources down to ANSI SQL calls, which means that I can pull data from parquet, Hie, HBase, Kudu, CSV, JSON, MongoDB and MariaDB with the same SQL syntax. This also means that I get access to all those platforms in R centrally through the sergeant package that rests atop d[b]plyr. However, it further means that when support for a new file type is added, I get that same functionality without any extra effort.

Why am I calling this out?

Well, the intrepid Drill developers are in the process of finalizing the release candidate for version 1.11.0 and one feature they’ve added is the ability to query individual and entire directories full of PCAP files from within Drill. While I provided a link to the Wikipedia article on PCAP files, the TL;DR on them is that it’s an optimized binary file format for recording network activity. If you’re on macOS or a linux-ish system go do something like this:

sudo tcpdump -ni en0 -s0 -w capture01.pcap

And, wait a bit.

NOTE: Some of you may have to change the en0 to your main network interface name (a quick google for that for your platform should get you to the right one to use).

That command will passively record all network activity on your system until you ctrl-c it. The longer it goes the larger it gets.

When you’ve recorded a minute or two of packets, ctrl-c the program and then try to look at the PCAP file. It’s a binary mess. You can re-read it with tcpdump or Wireshark and there are many C[++] libraries and other utilities that can read them. You can even convert them to CSV or XML, but the PCAP itself requires custom tools to work with them effectively. I had started creating crafter to work with these files but my use case/project dried up and haven’t gone back to it.

Adding the capability into Drill means I don’t really have to work any further on that specialized package as I can do this:

library(sergeant)
library(iptools)
library(tidyverse)
library(cymruservices)

db <- src_drill("localhost")

my_pcaps <- tbl(db, "dfs.caps.`/capture02.pcap`")

glimpse(my_pcaps)
## Observations: 25
## Variables: 12
## $ src_ip          <chr> "192.168.10.100", "54.159.166.81", "192.168.10...
## $ src_port        <int> 60025, 443, 60025, 443, 60025, 58976, 443, 535...
## $ tcp_session     <dbl> -2.082796e+17, -2.082796e+17, -2.082796e+17, -...
## $ packet_length   <int> 129, 129, 66, 703, 66, 65, 75, 364, 65, 65, 75...
## $ data            <chr> "...g9B..c.<..O..@=,0R.`........K..EzYd=.........
## $ src_mac_address <chr> "78:4F:43:77:02:00", "D4:8C:B5:C9:6C:1B", "78:...
## $ dst_port        <int> 443, 60025, 443, 60025, 443, 443, 58976, 5353,...
## $ type            <chr> "TCP", "TCP", "TCP", "TCP", "TCP", "UDP", "UDP...
## $ dst_ip          <chr> "54.159.166.81", "192.168.10.100", "54.159.166...
## $ dst_mac_address <chr> "D4:8C:B5:C9:6C:1B", "78:4F:43:77:02:00", "D4:...
## $ network         <int> 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1...
## $ timestamp       <dttm> 2017-07-27 23:54:58, 2017-07-27 23:54:59, 201...

summarise(my_pcaps, max = max(timestamp), min = min(timestamp)) %>% 
  collect() %>% 
  summarise(max - min)
## # A tibble: 1 x 1
##     `max - min`
##          <time>
## 1 1.924583 mins

count(my_pcaps, type)
## # Source:   lazy query [?? x 2]
## # Database: DrillConnection
##    type     n
##   <chr> <int>
## 1   TCP  4974
## 2   UDP   774

filter(my_pcaps, type=="TCP") %>% 
  count(dst_port, sort=TRUE)
## # Source:     lazy query [?? x 2]
## # Database:   DrillConnection
## # Ordered by: desc(n)
##    dst_port     n
##       <int> <int>
##  1      443  2580
##  2    56202   476
##  3    56229   226
##  4    56147   169
##  5    56215   103
##  6    56143    94
##  7    56085    89
##  8    56203    56
##  9    56205    39
## 10    56209    39
## # ... with more rows

filter(my_pcaps, type=="TCP") %>% 
  count(dst_ip, sort=TRUE) %>% 
  collect() -> dst_ips

filter(dst_ips, !is.na(dst_ip)) %>%
  left_join(ips_in_cidrs(.$dst_ip, c("10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16")),
            by = c("dst_ip"="ips")) %>%
  filter(!in_cidr) %>%
  left_join(distinct(bulk_origin(.$dst_ip), ip, .keep_all=TRUE), c("dst_ip" = "ip")) %>%
  select(dst_ip, n, as_name)
## # A tibble: 37 x 3
##            dst_ip     n                              as_name
##             <chr> <int>                                <chr>
##  1   104.244.42.2   862           TWITTER - Twitter Inc., US
##  2 104.244.46.103   556           TWITTER - Twitter Inc., US
##  3  104.20.60.241   183 CLOUDFLARENET - CloudFlare, Inc., US
##  4     31.13.80.8   160        FACEBOOK - Facebook, Inc., US
##  5  52.218.160.76   100     AMAZON-02 - Amazon.com, Inc., US
##  6  104.20.59.241    79 CLOUDFLARENET - CloudFlare, Inc., US
##  7  52.218.160.92    66     AMAZON-02 - Amazon.com, Inc., US
##  8  199.16.156.81    58           TWITTER - Twitter Inc., US
##  9 104.244.42.193    47           TWITTER - Twitter Inc., US
## 10  52.86.113.212    42    AMAZON-AES - Amazon.com, Inc., US
## # ... with 27 more rows

No custom R code. No modification to the sergeant package. Just query it like any other data source.

One really cool part of this is that — while similar functionality has been available in various Hadoop contexts for a few years — we’re doing this query from a local file system outside of a Hadoop context.

I had to add "pcap": { "type": "pcap" } to the formats section of the dfs storage configuration (#ty to the Drill community for helping me figure that out) and, I setup a directory that defaults to the pcap type. But after that, it just works.

Well, kinda.

The Java code that the plugin is based on doesn’t like busted PCAP files (which we get quite a bit of in infosec- & honeypot-lands) and it seems to bork on IPv6 packets a bit. And, my sergeant package (for now) can’t do much with the data component (neither can Drill-proper, either). But, it’s a great start and I can use it to do bulk parquet file creation of basic protocols & connection information or take a quick look at some honeypot captures whenever I need to, right from R, without converting them first.

Drill 1.11.0 is only at RC0 right now, so some of these issues may be gone by the time the full release is baked. Some fixes may have to wait for 1.12.0. And, much work needs to be done on the UDF-side and sergeant side to help make the data element more useful.

Even with the issues and limitations, this is an amazing new feature that’s been added to an incredibly useful tool and much thanks goes out to the Drill dev team for sneaking this in to 1.11.0.

If you have cause to work with PCAP files, give this a go and see if it helps speed up parts of your workflow.

I’m extremely pleased to announce that the sergeant package is now on CRAN or will be hitting your local CRAN mirror soon.

sergeant provides JDBC, DBI and dplyr/dbplyr interfaces to Apache Drill. I’ve also wrapped a few goodies into the dplyr custom functions that work with Drill and if you have Drill UDFs that don’t work “out of the box” with sergeant‘s dplyr interface, file an issue and I’ll make a special one for it in the package.

I’ve written about drill on the blog before so check out those posts for some history and stay tuned for more examples. The README should get you started using sergeant and/or Drill (if you aren’t running Drill now, take a look and you’ll likely get hooked).

I’d like to take a moment to call out special thanks to Edward Visel for bootstrapping the dbplyr update to sergeant when the dplyr/dbplyr interfaces split. It saved me loads of time and really helped the progress of this package move faster towards a CRAN release.

The Apache Drill folks have a nice walk-through tutorial on how to analyze the Yelp Academic Dataset with Drill. It’s a bit out of date (the current Yelp data set structure is different enough that the tutorial will error out at various points), but it’s a great example of how to work with large, nested JSON files as a SQL data source. By ‘large’ I mean around 4GB of JSON data spread across 5 files.

If you have enough memory and wanted to work with “flattened” versions of the files in R you could use my ndjson package (there are other JSON “flattener” packages as well, and a new one — corpus::read_ndjson — is even faster than mine, but it fails to read this file). Drill doesn’t necessarily load the entire JSON structure into memory (you can check out the query profiles after the fact to see how much each worker component ended up using) and I’m only mentioning that “R can do this w/o Drill” to stave off some of those types of comments.

The main reasons for replicating their Yelp example was to both have a more robust test suite for sergeant (it’s hitting CRAN soon now that dplyr 0.7.0 is out) and to show some Drill SQL to R conversions. Part of the latter reason is also to show how to use SQL calls to create a tbl that you can then use dplyr verbs to manipulate.

The full tutorial replication is at https://rud.is/rpubs/yelp.html but also iframe’d below.

I used reading a directory of CSVs as the foundational example in my recent post on idioms.

During my exchange with Matt, Hadley and a few others — in the crazy Twitter thread that spawned said post — I mentioned that I’d personally “just use Drill.

I’ll use this post as a bit of a teaser trailer for the actual post (or, more likely, series of posts) that goes into detail on where to get Apache Drill, basic setup of Drill for standalone workstation use and then organizing data with it.

You can get ahead of those posts by doing two things:

  1. Download, install and test your Apache Drill setup (it’s literally 10 minutes on any platform)
  2. Review the U.S. EPA annual air quality data archive (they have individual, annual CSVs that are perfect for the example)

My goals for this post are really to just to pique your interest enough in Drill and parquet files (yes, I’m ultimately trying to socially engineer you into using parquet files) to convince you to read the future post(s) and show that it’s worth your time to do Step #1 above.

Getting EPA Air Quality Data

The EPA has air quality data going back to 1990 (so, 27 files as of this post). They’re ~1-4MB ZIP compressed and ~10-30MB uncompressed.

You can use the following code to grab them all with the caveat that the libcurl method of performing simultaneous downloads caused some pretty severe issues — like R crashing — for some of my students who use Windows. There are plenty of examples for doing sequential downloads of a list of URLs out there that folks should be able to get all the files even if this succinct method does not work on your platform.

dir.create("airq")

urls <- sprintf("https://aqsdr1.epa.gov/aqsweb/aqstmp/airdata/annual_all_%d.zip", 1990L:2016L)
fils <- sprintf("airq/%s", basename(urls))

download.file(urls, fils, method = "libcurl")

I normally shy away from this particular method since it really hammers the remote server, but this is a beefy U.S. government server, the files are relatively small in number and size and I’ve got a super-fast internet connection (no long-lived sockets) so it should be fine.

Putting all those files under the “control” of Drill is what the next post is for. For now, i’m going to show the basic code and benchmarks for reading in all those files and performing a basic query for all the distinct years. Yes, we know that information already, but it’s a nice, compact task that’s easy to walk through and illustrates the file reading and querying in all three idioms: Drill, tidyverse and data.table.

Data Setup

I’ve converted the EPA annual ZIP files into bzip2 format. ZIP is fine for storage and downloads but it’s not a great format for data analysis tasks. gzip would be slightly faster but it’s not easily splittable and — even though I’m not using the data in a Hadoop context — I think it’s wiser to not have to re-process data later on if I ever had to move raw CSV or JSON data into Hadoop. Uncompressed CSVs are the most portable, but there’s no need to waste space.

All the following files are in a regular filesystem directory accessible to both Drill and R:

> (epa_annual_fils <- dir("~/Data/csv/epa/annual", "*.csv.bz2"))
 [1] "annual_all_1990.csv.bz2" "annual_all_1991.csv.bz2" "annual_all_1992.csv.bz2"
 [4] "annual_all_1993.csv.bz2" "annual_all_1994.csv.bz2" "annual_all_1995.csv.bz2"
 [7] "annual_all_1996.csv.bz2" "annual_all_1997.csv.bz2" "annual_all_1998.csv.bz2"
[10] "annual_all_1999.csv.bz2" "annual_all_2000.csv.bz2" "annual_all_2001.csv.bz2"
[13] "annual_all_2002.csv.bz2" "annual_all_2003.csv.bz2" "annual_all_2004.csv.bz2"
[16] "annual_all_2005.csv.bz2" "annual_all_2006.csv.bz2" "annual_all_2007.csv.bz2"
[19] "annual_all_2008.csv.bz2" "annual_all_2009.csv.bz2" "annual_all_2010.csv.bz2"
[22] "annual_all_2011.csv.bz2" "annual_all_2012.csv.bz2" "annual_all_2013.csv.bz2"
[25] "annual_all_2014.csv.bz2" "annual_all_2015.csv.bz2" "annual_all_2016.csv.bz2"

Drill can directly read plain or compressed JSON, CSV and Apache web server log files plus can treat a directory tree of them as a single data source. It can also read parquet & avro files (both are used frequently in distributed “big data” setups) and access MySQL, MongoDB and other JDBC resources as well as query data stored in Amazon S3 and HDFS (I’ve already mentioned it works fine in plain ‘ol filesystems, too).

I’ve tweaked my Drill configuration to support reading column header info from .csv files (which I’ll show in the next post). In environments like Drill or even Spark, CSV columns are usually queried with some type of column index (e.g. COLUMN[0]) so having named columns makes for less verbose query code.

I turned those individual bzip2 files into parquet format with one Drill query:

CREATE TABLE dfs.pq.`/epa/annual.parquet` AS 
  SELECT * FROM dfs.csv.`/epa/annual/*.csv.bz2`

Future posts will explain the dfs... component but they are likely familiar path specifications for folks used to Spark and are pretty straightforward. The first bit (up to the back-tick) is an internal Drill shortcut to the actual storage path (which is a plain directory in this test) followed by the tail end path spec to the subdirectories and/or target files. That one statement said ‘take all the CSV files in that directory and make one big table out of them”.

The nice thing about parquet files is that they work much like R data frames in that they can be processed on the column level. We’ll see how that speeds up things in a bit.

Benchmark Setup

The tests were performed on a maxed out 2016 13″ MacBook Pro.

There are 55 columns of data in the EPA annual summary files.

To give both read_csv and fread some benchmark boosts, we’ll define the columns up-front and pass those in to each function on data ingestion and I’ll leave them out of this post for brevity (they’re just a cols() specification and colClasses vector). Drill gets no similar help for this at least when it comes to CSV processing.

I’m also disabling progress & verbose reporting in both fread and read_csv despite not stopping Drill from writing out log messages.

Now, we need some setup code to connect to drill and read in the list of files, plus we’ll setup the five benchmark functions to read in all the files and get the list of distinct years from each.

library(sergeant)
library(data.table)
library(tidyverse)

(epa_annual_fils <- dir("~/Data/csv/epa/annual", "*.csv.bz2", full.names = TRUE))

db <- src_drill("localhost")

# Remember, defining ct & ct_dt - the column types specifications - have been left out for brevity

mb_drill_csv <- function() {
  epa_annual <- tbl(db, "dfs.csv.`/epa/annual/*.csv.bz2`")
  select(epa_annual, Year) %>% 
    distinct(Year) %>% 
    collect()
}

mb_drill_parquet <- function() {
  epa_annual_pq <- tbl(db, "dfs.pq.`/epa/annual.parquet`")
  select(epa_annual_pq, Year) %>% 
    distinct(Year) %>% 
    collect()
}

mb_tidyverse <- function() {
  map_df(epa_annual_fils, read_csv, col_types = ct, progress = FALSE) -> tmp
  unique(tmp$Year)
}

mb_datatable <- function() {
  rbindlist(
    lapply(
      epa_annual_fils, function(x) { 
        fread(sprintf("bzip2 -c -d %s", x), 
              colClasses = ct_dt, showProgress = FALSE, 
              verbose = FALSE) })) -> tmp
  unique(tmp$Year)
}

mb_rda <- function() {
  read_rds("~/Data/rds/epa/annual.rds") -> tmp
  unique(tmp$Year)
}

microbenchmark(
  csv = { mb_drill_csv()     },
   pq = { mb_drill_parquet() },
   df = { mb_tidyverse()     },
   dt = { mb_datatable()     },
  rda = { mb_rda()           },
  times = 5
) -> mb

Yep, it’s really as simple as:

tbl(db, "dfs.csv.`/epa/annual/*.csv.bz2`")

to have Drill treat a directory tree as a single table. It’s also not necessary for all the columns to be in all the files (i.e. you get the bind_rows/map_df/rbindlist behaviour for “free”).

I’m only doing 5 evaluations here since I don’t want to make you wait if you’re going to try this at home now or after the Drill series. I’ve run it with a more robust benchmark configuration and the results are aligned with this one.

Unit: milliseconds
 expr        min         lq       mean     median         uq        max neval
  csv 15473.5576 16851.0985 18445.3905 19586.1893 20087.1620 20228.9450     5
   pq   493.7779   513.3704   616.2634   550.5374   732.6553   790.9759     5
   df 41666.1929 42361.1423 42701.2682 42661.9521 43110.3041 43706.7498     5
   dt 37500.9351 40286.2837 41509.0078 42600.9916 43105.3040 44051.5247     5
  rda  9466.6506  9551.7312 10012.8560  9562.9114  9881.8351 11601.1517     5

The R data route, which is the closest to the parquet route, is definitely better than slurping up CSVs all the time. Both parquet and R data files require pre-processing, so they’re not as flexible as having individual CSVs (that may get added hourly or daily to a directory).

Drill’s CSV slurping handily beats the other R methods even with some handicaps the others did not have.

This particular example is gamed a bit, which helped parquet to ultimately “win”. Since Drill can target the singular column (Year) that was asked for, it doesn’t need to read all the extra columns just to compute the final product (the distinct list of years).

IMO both the Drill CSV ingestion and Drill parquet access provide compelling enough use-cases to use them over the other three methods, especially since they are easily transferrable to remote Drill servers or clusters with virtually no code changes. A single node Drillbit (like R) is constrained by the memory on that individual system, so it’s not going to get you out of a memory jam, but it may make it easier to organize and streamline your core data operations before other analysis and visualization tasks.

FIN

I’m sure some member of some other tribe will come up with an example that proves superiority of their particular tribal computations. I’m hoping one of those tribes is the R/Spark tribe so that can get added into the mix (using Spark standalone is much like using Drill, but with more stats/ML functions directly available).

I’m hopeful that this post has showcased enough of Drill’s utility to general R users that you’ll give it a go and consider adding it to your R data analysis toolbox. It can be beneficial having both a precision tools as well as a Swiss Army knife — which is what Drill really is — handy.

You can find the sergeant package on GitHub.

2021-11-04 UPDATE: Just use {arrow}.


Apache Drill is a nice tool to have in the toolbox as it provides a SQL front-end to a wide array of database and file back-ends and runs in standalone/embedded mode on every modern operating system (i.e. you can get started with or play locally with Drill w/o needing a Hadoop cluster but scale up almost effortlessly). It’s also a bit more lightweight than Spark and a nice alternative to Spark if you only need data wrangling and not the functionality in Spark’s MLlib.

When you’re in this larger-data world, parquet files are one of the core data storage formats. They’re designed to be compact and are optimized for columnar operations. Unlike CSV, JSON files or even R Data files, it’s not necessary to read or scan an entire parquet file to filter, select, aggregate, etc across columns. Unfortunately, parquet files aren’t first-class citizens in R. Well, they aren’t now, but thanks to this project it might not be too difficult to make an R interface to them. But, for now, you have to use some other means to convert or read parquet files.

Spark and sparklyr can help you write parquet files but I don’t need to run Spark all the time.

If you’re already a Drill user, you already know how easy it is to make parquet files with Drill:

CREATE TABLE dfs.tmp.sampleparquet AS 
  (SELECT trans_id, 
   cast(`date` AS date) transdate, 
   cast(`time` AS time) transtime, 
   cast(amount AS double) amountm,
   user_info, marketing_info, trans_info 
   FROM dfs.`/Users/drilluser/sample.json`);

If you’re not used to SQL, that may seem very ugly/foreign/verbose to you and you can thank Hadley for designing a better grammar of tidyness that seamlessly builds SQL queries like that behind the scenes for you. That SQL statement uses a JSON file as a data source (which you can do with Drill) make sure the field data types are correct by explicitly casting them to SQL data types (which is a good habit to get into even if it is verbose) and then tells Drill to make a parquet file (it’s actually a directory of parquet files) from it.

I’ve been working on an R package — sergeant — that provides RJDBC, direct REST and dplyr interfaces to Apache Drill for a while now. There are a number of complexities associated with creating a function to help users make parquet files from R data frames in Drill (which is why said function still does not exist in sergeant):

  • Is Drill installed or does there need to be a helper set of functions for installing and running Drill in embedded mode?
  • Even if there’s a Drill cluster running, does the user — perhaps — want to do the conversion locally in embedded mode? Embedded is way easier since all the files are local. The only real way to convert a data frame to Drill is to save a data frame to a temporary, interim file and them have Drill read it in. In a cluster mode where your local filesystem is not part of the cluster, that would mean finding the right way to get the file to the cluster. Which leads to the next item…
  • Where does the user want the necessary temporary files stored? Local dfs. file system? HDFS?
  • Do we need two different methods? One for quick conversion and one that forces explicit column data type casting?
  • Do we need to support giving the user explicit casting control and column selection capability?
  • Who put the bomp in the bomp, bomp, bomp?

OK, perhaps not that last one (but I think it still remains a mystery despite claims by Jan and Dean).

It’s difficult to wrap something like that up in a simple package that will make 80% of the possible user-base happy (having Drill and Spark operate behind the scenes like “magic” seems like a bad idea to me despite how well sparklyr masks the complexity).

As I continue to work that out (you are encouraged to file an issue with your opines on it at the gh repo) here’s a small R script that you can use it to turn R data frames into parquet files:

library(sergeant)
library(tidyverse)

# make a place to hold our temp files
# this is kinda super destructive. make sure you have the path right
unlink("/tmp/pqtrans", recursive=TRUE, force=TRUE)
dir.create("/tmp/pqtrans", showWarnings=FALSE)

# save off a large-ish tibble
write_csv(nycflights13::flights, "/tmp/pqtrans/flights.csvh")

# connect to drill
db <- src_drill("localhost")

# make the parquet file
dbGetQuery(db$con, "
CREATE TABLE dfs.tmp.`/pqtrans/flights.parquet` AS SELECT * FROM dfs.tmp.`/pqtrans/flights.csvh`
")
## # A tibble: 1 × 2
##   `Number of records written` Fragment
## *                       <int>    <chr>
## 1                      336776      0_0

# prove we did it
list.files("/tmp/pqtrans", recursive=TRUE, include.dirs=TRUE)
## [1] "flights.csvh"                  "flights.parquet"              
## [3] "flights.parquet/0_0_0.parquet"

# prove it again
flights <- tbl(db, "dfs.tmp.`/pqtrans/flights.parquet`")

flights
## Source:   query [?? x 19]
## Database: Drill 1.9.0 [localhost:8047] [8GB direct memory]
## 
##    flight arr_delay distance  year tailnum dep_time sched_dep_time origin
##     <int>     <dbl>    <dbl> <int>   <chr>    <int>          <int>  <chr>
## 1    1545        11     1400  2013  N14228      517            515    EWR
## 2    1714        20     1416  2013  N24211      533            529    LGA
## 3    1141        33     1089  2013  N619AA      542            540    JFK
## 4     725       -18     1576  2013  N804JB      544            545    JFK
## 5     461       -25      762  2013  N668DN      554            600    LGA
## 6    1696        12      719  2013  N39463      554            558    EWR
## 7     507        19     1065  2013  N516JB      555            600    EWR
## 8    5708       -14      229  2013  N829AS      557            600    LGA
## 9      79        -8      944  2013  N593JB      557            600    JFK
## 10    301         8      733  2013  N3ALAA      558            600    LGA
## # ... with more rows, and 11 more variables: sched_arr_time <int>,
## #   dep_delay <dbl>, dest <chr>, minute <dbl>, carrier <chr>, month <int>,
## #   hour <dbl>, arr_time <int>, air_time <dbl>, time_hour <dttm>,
## #   day <int>

# work with the drill parquet file
count(flights, year, origin) %>%
  collect()
## Source: local data frame [3 x 3]
## Groups: year [1]
## 
##    year origin      n
## * <int>  <chr>  <int>
## 1  2013    EWR 120835
## 2  2013    LGA 104662
## 3  2013    JFK 111279

That snippet:

  • assumes Drill is running, which is really as easy as entering drill-embedded at a shell prompt, but try out Drill in 10 Minutes if you don’t believe me
  • dfs.tmp points to /tmp (i.e. you need to modify that if yours doesn’t…see, I told you this wasn’t simple)
  • assumes we’re OK with letting Drill figure out column types
  • assumes we want ALL THE COLUMNS
  • uses the .csvh extension which tells Drill to read the column names from the first line so we don’t have to create the schema from scratch
  • is slow because of ↑ due to the need to create the csvh file first
  • exploits the fact that we can give dplyr the cold shoulder and talk directly to Drill anytime we feel like it with DBI calls by using the $con list field (the dbGetQuery(db$con, …) line).

It’s a naive and destructive snippet, but does provide a means to get your data frames into parquet and into Drill.

Most of my Drill parquet needs are converting ~20-100K JSON files a day into parquet, which is why I haven’t focused on making a nice interface for this particular use case (data frame to parquet) in R. Ultimately, I’ll likely go the “wrap parquet-cpp route” (unless you’re working on that, which — if you are — you should @-ref me in that gh-repo of yours so I can help out). But, if having a sergeant function to do this conversion would help you, drop an issue in the repo.

The rest of the month is going to be super-hectic and it’s unlikely I’ll be able to do any more to help the push to CRAN 10K, so here’s a breakdown of CRAN and GitHub new packages & package updates that I felt were worth raising awareness on:

epidata

I mentioned this one last week but it wasn’t really a package announcement post. epidata is now on CRAN and is a package to pull data from the Economic Policy Institute (U.S. gov economic data, mostly). Their “hidden” API is well thought out and the data has been nicely curated (and seems to update monthly). It makes it super easy to do things like the following:

library(epidata)
library(tidyverse)
library(stringi)
library(hrbrmisc) # devtools::install_github("hrbrmstr/hrbrmisc")

us_unemp <- get_unemployment("e")

glimpse(us_unemp)
## Observations: 456
## Variables: 7
## $ date            <date> 1978-12-01, 1979-01-01, 1979-02-01, 1979-03-0...
## $ all             <dbl> 0.061, 0.061, 0.060, 0.060, 0.059, 0.059, 0.05...
## $ less_than_hs    <dbl> 0.100, 0.100, 0.099, 0.099, 0.099, 0.099, 0.09...
## $ high_school     <dbl> 0.055, 0.055, 0.054, 0.054, 0.054, 0.053, 0.05...
## $ some_college    <dbl> 0.050, 0.050, 0.050, 0.049, 0.049, 0.049, 0.04...
## $ college         <dbl> 0.032, 0.031, 0.031, 0.030, 0.030, 0.029, 0.03...
## $ advanced_degree <dbl> 0.021, 0.020, 0.020, 0.020, 0.020, 0.020, 0.02...

us_unemp %>%
  gather(level, rate, -date) %>%
  mutate(level=stri_replace_all_fixed(level, "_", " ") %>%
           stri_trans_totitle() %>%
           stri_replace_all_regex(c("Hs$"), c("High School")),
         level=factor(level, levels=unique(level))) -> unemp_by_edu

col <- ggthemes::tableau_color_pal()(10)

ggplot(unemp_by_edu, aes(date, rate, group=level)) +
  geom_line(color=col[1]) +
  scale_y_continuous(labels=scales::percent, limits =c(0, 0.2)) +
  facet_wrap(~level, scales="free") +
  labs(x=NULL, y="Unemployment rate",
       title=sprintf("U.S. Monthly Unemployment Rate by Education Level (%s)", paste0(range(format(us_unemp$date, "%Y")), collapse=":")),
       caption="Source: EPI analysis of basic monthly Current Population Survey microdata.") +
  theme_hrbrmstr(grid="XY")

us_unemp %>%
  select(date, high_school, college) %>%
  mutate(date_num=as.numeric(date)) %>%
  ggplot(aes(x=high_school, xend=college, y=date_num, yend=date_num)) +
  geom_segment(size=0.125, color=col[1]) +
  scale_x_continuous(expand=c(0,0), label=scales::percent, breaks=seq(0, 0.12, 0.02), limits=c(0, 0.125)) +
  scale_y_reverse(expand=c(0,100), label=function(x) format(as_date(x), "%Y")) +
  labs(x="Unemployment rate", y="Year ↓",
       title=sprintf("U.S. monthly unemployment rate gap (%s)", paste0(range(format(us_unemp$date, "%Y")), collapse=":")),
       subtitle="Segment width shows the gap between those with a high school\ndegree and those with a college degree",
       caption="Source: EPI analysis of basic monthly Current Population Survey microdata.") +
  theme_hrbrmstr(grid="X") +
  theme(panel.ontop=FALSE) +
  theme(panel.grid.major.x=element_line(size=0.2, color="#2b2b2b25")) +
  theme(axis.title.x=element_text(family="Arial", face="bold")) +
  theme(axis.title.y=element_text(family="Arial", face="bold", angle=0, hjust=1, margin=margin(r=-14)))

(right edge is high school, left edge is college…I’ll annotate it better next time)

censys

Censys is a search engine by one of the cybersecurity research partners we publish data to at work (free for use by all). The API is moderately decent (it’s mostly a thin shim authentication layer to pass on Google BigQuery query strings to the back-end) and the R package to interface to it censys is now on CRAN.

waffle

The seminal square pie chart package waffle has been updated on CRAN to work better with recent ggplot2 2.x changes and has some additional parameters you may want to check out.

cdcfluview

The viral package cdcfluview has had some updates on the GitHub version to add saner behaviour when specifying dates and had to be updated as the CDC hidden API switched to all https URLs (major push in .gov-land to do that to get better scores on their cyber report cards). I’ll be adding some features before the next CRAN push to enable retrieval of additional mortality data.

sergeant

If you work with Apache Drill (if you don’t, you should), the sergeant package (GitHub) will help you whip it into shape. I’ve mentioned it before on the blog but it has a nigh-complete dplyr interface now that works pretty well. It also has a direct REST API interface and RJDBC interface plus many helper utilities that help you avoid typing SQL strings to get cluster status info. Once I add the ability to create parquet files with it I’ll push it up to CRAN.

The one thing I’d like to do with this package is support any user-defined functions (UDFs in Drill-speak) folks have written. So, if you have a UDF you’ve written or use and you want it wrapped in the package, just drop an issue and I’ll layer it in. I’ll be releasing some open source cybersecurity-related UDFs via the work github in a few weeks.

zkcmd

Drill (in non-standalone mode) relies on Apache Zookeeper to keep everything in sync and it’s sometimes necessary to peek at what’s happening inside the zookeeper cluster, so sergeant has a sister package zkcmd that provides an R interface to zookeeper instances.

ggalt

Some helpful folks tweaked ggalt for better ggplot2 2.x compatibility (#ty!) and I added a new geom_cartogram() (before you ask if it makes warped shapefiles: it doesn’t) that restores the old (and what I believe to be the correct/sane/proper) behaviour of geom_map(). I need to get this on CRAN soon as it has both fixes and many new geoms folks will want to play with in a non-GitHub context.

FIN

There have been some awesome packages released by others in the past month+ and you should add R Weekly to your RSS feeds if you aren’t following it already (there are other things you should have there for R updates as well, but that’s for another blog). I’m definitely looking forward to new packages, visualizations, services and utilities that will be coming this year to the R community.