Skip navigation

The development version of splashr now support authenticated connections to Splash API instances. Just specify user and pass on the initial splashr::splash() call to use your scraping setup a bit more safely. For those not familiar with splashr and/or Splash: the latter is a lightweight alternative to tools like Selenium and the former is an R interface to it. Unlike xml2::read_html(), splashr renders a URL exactly as a browser does (because it uses a virtual browser) and can return far more than just the HTML from a web page. Splash does need to be running and it’s best to use it in a Docker container.

If you have a large number of sites to scrape, working with splashr and Splash “as-is” can be a bit frustrating since there’s a limit to what a single instance can handle. Sure, it’s possible to setup your own highly available, multi-instance Splash cluster and use it, but that’s work. Thankfully, the folks behind TeamHG-Memex created Aquarium which uses docker and docker-compose to stand up a multi-Splash instance behind a pre-configured HAProxy instance so you can take advantage of parallel requests the Splash API. As long as you have docker and docker-compose handy (and Python) following the steps on the aforelinked GitHub page should have you up and running with Aquarium in minutes. You use the same default port (8050) to access the Splash API and you get a bonus port of 8036 to watch in your browser (the HAProxy stats page).

This works well when combined with furrr? which is an R package that makes parallel operations very tidy.

One way to use purrr, splashr and Aquarium might look like this:

library(splashr)
library(HARtools)
library(urltools)
library(furrr)
library(tidyverse)

list_of_urls_with_unique_urls <- c("http://...", "http://...", ...)

make_a_splash <- function(org_url) {
  splash(
    host = "ip/name of system you started aquarium on", 
    user = "your splash api username", 
    pass = "your splash api password"
  ) %>% 
    splash_response_body(TRUE) %>% # we want to get all the content 
    splash_user_agent(ua_win10_ie11) %>% # splashr has many pre-configured user agents to choose from 
    splash_go(org_url) %>% 
    splash_wait(5) %>% # pick a reasonable timeout; modern web sites with javascript are bloated
    splash_har()
}

safe_splash <- safely(make_a_splash) # splashr/Splash work well but can throw errors. Let's be safe

plan(multiprocess, workers=5) # don't overwhelm the default setup or your internet connection

future_map(sites, ~{
  
  org <- safe_splash(.x) # go get it!
  
  if (is.null(org$result)) {
    sprintf("Error retrieving %s (%s)", .x, org$error$message) # this gives us good error messages
  } else {
    
    HARtools::writeHAR( # HAR format saves *everything*. the files are YUGE
      har = org$result, 
      file = file.path("/place/to/store/stuff", sprintf("%s.har", domain(.x))) # saved with the base domain; you may want to use a UUID via uuid::UUIDgenerate()
    )
    
    sprintf("Successfully retrieved %s", .x)
    
  }
  
}) -> results

(Those with a keen eye will grok why splashr supports Splash API basic authentication, now)

The parallel iterator will return a list we can flatten to a character vector (I don’t do that by default since it’s safer to get a list back as it can hold anything and map_chr() likes to check for proper objects) to check for errors with something like:

flatten_chr(results) %>% 
  keep(str_detect, "Error")
## [1] "Error retrieving www.1.example.com (Service Unavailable (HTTP 503).)"
## [2] "Error retrieving www.100.example.com (Gateway Timeout (HTTP 504).)"
## [3] "Error retrieving www.3000.example.com (Bad Gateway (HTTP 502).)"
## [4] "Error retrieving www.a.example.com (Bad Gateway (HTTP 502).)"
## [5] "Error retrieving www.z.examples.com (Gateway Timeout (HTTP 504).)"

Timeouts would suggest you may need to up the timeout parameter in your Splash call. Service unavailable or bad gateway errors may suggest you need to tweak the Aquarium configuration to add more workers or reduce your plan(…). It’s not unusual to have to create a scraping process that accounts for errors and retries a certain number of times.

If you were stuck in the splashr/Splash slow-lane before, give this a try to help save you some time and frustration.

A previous post showed how to use a different authentication provider to wire up Apache Zeppelin and Amazon Athena. As noted in that post, Zeppelin is a “notebook” alternative to Jupyter (and other) notebooks. Unlike Jupyter, I can tolerate Zeppelin and it’s got some nifty features like plug-and-play JDBC access. Plus it can do some nifty things automagically, like turn the output of a simple aggregation query into a visualization like the one shown at the top of the post.

Drill + Zeppelin (normally that’d be a deadly combination)

The steps to wire-up Apache Drill are similar to those for Athena:

  • Go to the Interpreter menu (it’s a drop down of the top right main Zeppelin menu bar)
  • Create a new one (I named mine — unimaginatively — drill)
  • Set the default.driver to org.apache.drill.jdbc.Driver
  • Fill in the necessary JDBC default.url. I use jdbc:drill:zk=localhost:2181 and you can have multiple ones if you have need to connect to more than one Drill cluster.
  • Setup authentication parameters if you need to
  • Under Dependencies, add an arifact and use the path JAR in $DRILL_HOME/jars/jdbc-driver/. In my case that’s /usr/local/drill/jars/jdbc-driver/drill-jdbc-all-1.14.0.jar

We can use one of Drill’s built-in datasets to test out our connection.

You can do the same thing in the Query box in the Drill web interface, but — even with the ACE editor now being embedded on the page — I think the Zeppelin interface is better and it’s possible to sequence a number of steps in the same notebook (i.e. use a %shell paragraph to grab some JSON data and copy it to a Drill-accessible folder then have a %drill paragraph right below it convert it to parquet and a %spark paragraph below that do some ML on the data and a %knitr block make a proper visualization with R).

Drill + PostgreSQL/etc.

By now, you’ve likely figured out it’s the same, heh, drill for other databases with JDBC support.

For PostgreSQL (i.e. a %postgres interpreter) you need to obtain the JDBC driver and wire its location up as an artifact; use org.postgresql.Driver for the default.driver; enter an appropriate default.url for your setup, which is more than likely jdbc:postgresql://localhost:5432/…if not (i.e. the Zeppelin node and PostgreSQL node are on different systems), then you’ll need to ensure PostgreSQL is configured to listen on an interface that is accessible to the Zeppelin node; enter authentication info and fire up a test query for data that you have. Something like:

FIN

Fear not! There shall be no more simple “wiring-up” posts unless a commenter asks for one or there’s some weird complexity with a new one that deserves some exposition, especially since there are plenty of examples over at the Zeppelin main site. Hopefully these posts have encouraged you do give Zeppelin a try if you haven’t already. I tried working with very early versions of Zeppelin a while back and left it sit for a while, so give it a chance if you did the same. Version 0.9.0 is about to come out and it’s looking even better than 0.8.0, plus it dropped in perfectly on Ubuntu and macOS (even 10.14 beta), too.

Drop a note in the comments if you have any tips/tricks for connecting to other JDBC sources.

Apache Zeppelin is a “notebook” alternative to Jupyter (and other) notebooks. It supports a plethora of kernels/interpreters and can do a ton of things that this post isn’t going to discuss (perhaps future ones will, especially since it’s the first “notebook” environment I’ve been able to tolerate for longer than a week).

One really cool feature of Zeppelin is the ability for it to wire it up to databases via JDBC and use it interactive queries. A future post will provide instructions for Apache Drill, but this one’s about wiring up Amazon Athena and Apache Zeppelin. A big reason to do this is that image at the top of the post. The query interface is far nicer than the Amazon console and — while RStudio is going to have similar features in the 1.2 release — Zeppelin has some advantages over it, especially as 0.9.0 moves to final release.

If you use basic credentials in Athena, this post can help you connect up.

At $DAYJOB we use an open source application that we developed — Awsaml? at $DAYJOB — which provides automagically rotated temporary AWS credentials every hour after a successful initial multi-factor authentication (you should think about doing this, too).

Because it uses a non default profile name we need to use a different authentication class when using the Athena JDBC interface.

To somewhat dup the aforelinked post, you’ll need to download the driver that matches your version of the JDK and the JDBC data standards.

I like to put JARs like this in /usr/local/jars (just remember where you put it).

Now, just create a Zeppelin interpreter named athena (or whatever you like). Set the default.driver to com.simba.athena.jdbc.Driver and the JDBC string to this horribly long entity:


jdbc:awsathena://athena.us-east-1.amazonaws.com:443;S3OutputLocation=s3://aws-athena-query-results-something-us-east-1;Schema=default;AwsCredentialsProviderClass=com.simba.athena.amazonaws.auth.profile.ProfileCredentialsProvider;AwsCredentialsProviderArguments="your-profile-name"

I intentionally left it un-wrapped so it’s easier to copy. Here are the individual parts (separating the bullets at the semicolons):

  • jdbc:awsathena://athena.us-east-1.amazonaws.com:443 (use what you need to here)
  • S3OutputLocation=s3://aws-athena-query-results-something-us-east-1 (wherever Athena can write to)
  • Schema=default (the schema you’ll use)
  • AwsCredentialsProviderClass=com.simba.athena.amazonaws.auth.profile.ProfileCredentialsProvider (this is the proper class to select a profile by name)
  • AwsCredentialsProviderArguments="your-profile-name" (this is the profile name you want to use)

NOTE: You can use other JDBC driver parameters as well. I just focused on the minimum ones to keep it simple.

Blank-out any username/password fields (which, in theory, won’t be referenced anyway) and then scroll down and add the JAR you’re using an artifact. In my case that’s /usr/local/jars/AthenaJDBC42_2.0.2.jar.

Now, you can use a stored profile and hopefully rotating creds to work with %athena interpreter blocks in Zeppelin.

I had to processes a bunch of emails for a $DAYJOB task this week and my “default setting” is to use R for pretty much everything (this should come as no surprise). Treating mail as data is not an uncommon task and many R packages exist that can reach out and grab mail from servers or work directly with local mail archives.

Mbox’in off the rails on a crazy tm1

This particular mail corpus is in mbox? format since it was saved via Apple Mail. It’s one big text file with each message appearing one after the other. The format has been around for decades, and R’s tm package — via the tm.plugin.mail plugin package — can process these mbox files.

To demonstrate, we’ll use an Apple Mail archive excerpt from a set of R mailing list messages as they are not private/sensitive:

library(tm)
library(tm.plugin.mail)

# point the tm corpus machinery to the mbox file and let it know the timestamp format since it varies
VCorpus(
  MBoxSource("~/Data/test.mbox/mbox"),
  readerControl = list(
    reader = readMail(DateFormat = "%a, %e %b %Y %H:%M:%S %z")
  )
) -> mbox

str(unclass(mbox), 1)
## List of 3
##  $ content:List of 198
##  $ meta   : list()
##   ..- attr(*, "class")= chr "CorpusMeta"
##  $ dmeta  :'data.frame': 198 obs. of  0 variables

str(unclass(mbox[[1]]), 1)
## List of 2
##  $ content: chr [1:476] "Try this:" "" "> library(lubridate)" "> library(tidyverse)" ...
##  $ meta   :List of 9
##   ..- attr(*, "class")= chr "TextDocumentMeta"

str(unclass(mbox[[1]]$meta), 1)
## List of 9
##  $ author       : chr "jim holtman "
##  $ datetimestamp: POSIXlt[1:1], format: "2018-08-01 15:01:17"
##  $ description  : chr(0) 
##  $ heading      : chr "Re: [R] read txt file - date - no space"
##  $ id           : chr ""
##  $ language     : chr "en"
##  $ origin       : chr(0) 
##  $ header       : chr [1:145] "Delivered-To: bob@rud.is" "Received: by 2002:ac0:e681:0:0:0:0:0 with SMTP id b1-v6csp950182imq;" "        Wed, 1 Aug 2018 08:02:23 -0700 (PDT)" "X-Google-Smtp-Source: AAOMgpcdgBD4sDApBiF2DpKRfFZ9zi/4Ao32Igz9n8vT7EgE6InRoa7VZelMIik7OVmrFCRPDBde" ...
##  $              : NULL

We’re using unclass() since the str() output gets a bit crowded with all of the tm class attributes stuck in the output display.

The tm suite is designed for text mining. My task had nothing to do with text mining and I really just needed some header fields and body content in a data frame. If you’ve been working with R for a while, some things in the str() output will no doubt cause a bit of angst. For instance:

  • datetimestamp: POSIXlt[1:1], : POSIXlt ? and data frames really don’t mix well
  • description : chr(0) / origin : chr(0): zero-length character vectors ☹️
  • $ : NULL : Blank element name with a NULL value…I Don’t Even ??‍♀️2

The tm suite is also super opinionated and “helpfully” left out a ton of headers (though it did keep the source for the complete headers around). Still, we can roll up our sleeves and turn that into a data frame:

# helper function for cleaner/shorter code
`%|0|%` <- function(x, y) { if (length(x) == 0) y else x }

# might as well stay old-school since we're using tm
do.call(
  rbind.data.frame,
  lapply(mbox, function(.x) {

    # we have a few choices, but this one is pretty explicit abt what it does
    # so we'll likely be able to decipher it quickly in 2 years when/if we come
    # back to it

    data.frame(
      author = .x$meta$author %|0|% NA_character_,
      datetimestamp = as.POSIXct(.x$meta$datetimestamp %|0|% NA),
      description = .x$meta$description %|0|% NA_character_,
      heading = .x$meta$heading %|0|% NA_character_,
      id = .x$meta$id %|0|% NA_character_,
      language = .x$meta$language %|0|% NA_character_,
      origin = .x$meta$origin %|0|% NA_character_,
      header = I(list(.x$meta$header %|0|% NA_character_)),
      body = I(list(.x$content %|0|% NA_character_)),
      stringsAsFactors = FALSE
    )

  })
) %>%
  glimpse()
## Observations: 198
## Variables: 9
## $ author         "jim holtman ", "PIKAL Petr ...
## $ datetimestamp  2018-08-01 15:01:17, 2018-08-01 13:09:18, 2018-...
## $ description    NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, ...
## $ heading        "Re: [R] read txt file - date - no space", "Re: ...
## $ id             " "en", "en", "en", "en", "en", "en", "en", "en", ...
## $ origin         NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, NA, ...
## $ header         Delivere...., Delivere...., Delivere...., De...
## $ body           Try this...., SGkNCg0K...., Dear Pik...., De... 

That wasn’t a huge effort, but we would now have to re-process the headers and/or write a custom version of tm.plugin.mail::readMail() (the function source is very readable and extendable) to get any extra data out. Here’s what that might look like:

# Custom msg reader
read_mail <- function(elem, language, id) {

  # extract header val
  hdr_val <- function(src, pat) {
    gsub(
      sprintf("%s: ", pat), "",
      grep(sprintf("^%s:", pat), src, "", value = TRUE, useBytes = TRUE)
    ) %|0|% NA
  }

  mail <- elem$content

  index <- which(mail == "")[1]
  header <- mail[1:index]
  mid <- hdr_val(header, "Message-ID")

  PlainTextDocument(
    x = mail[(index + 1):length(mail)],
    author = hdr_val(header, "From"),

    spam_score = hdr_val(header, "X-Spam-Score"), ### <<==== an extra header!

    datetimestamp = as.POSIXct(hdr_val(header, "Date"), format = "%a, %e %b %Y %H:%M:%S %z", tz = "GMT"),
    description = NA_character_,
    header = header,
    heading = hdr_val(header, "Subject"),
    id = if (length(mid)) mid[1] else id,
    language = language,
    origin = hdr_val(header, "Newsgroups"),
    class = "MailDocument"
  )

}

VCorpus(
  MBoxSource("~/Data/test.mbox/mbox"),
  readerControl = list(reader = read_mail)
) -> mbox

str(unclass(mbox[[1]]$meta), 1)
## List of 9
##  $ author       : chr "jim holtman "
##  $ datetimestamp: POSIXct[1:1], format: "2018-08-01 15:01:17"
##  $ description  : chr NA
##  $ heading      : chr "Re: [R] read txt file - date - no space"
##  $ id           : chr ""
##  $ language     : chr "en"
##  $ origin       : chr NA
##  $ spam_score   : chr "-3.631"
##  $ header       : chr [1:145] "Delivered-To: bob@rud.is" "Received: by 2002:ac0:e681:0:0:0:0:0 with SMTP id b1-v6csp950182imq;" "        Wed, 1 Aug 2018 08:02:23 -0700 (PDT)" "X-Google-Smtp-Source: AAOMgpcdgBD4sDApBiF2DpKRfFZ9zi/4Ao32Igz9n8vT7EgE6InRoa7VZelMIik7OVmrFCRPDBde" ...

If we wanted all the headers, there are even more succinct ways to solve for that use case.

Packaging up emails with a reticulated message.mbox

Since the default functionality of tm.plugin.mail::readMail() forced us to work a bit to get what we needed there’s some justification in seeking out an alternative path. I’ve written about reticulate before and am including it in this post as the Python standard library module mailbox? can also make quick work of mbox files.

Two pieces of advice I generally reiterate when I talk about reticulate is that I highly recommend using Python 3 (remember, it’s a fragmented ecosystem) and that I prefer specifying the specific target Python to use via the RETICULATE_PYTHON environment variable that I have in ~/.Renviron as RETICULATE_PYTHON=/usr/local/bin/python3.

Let’s bring the mailbox module into R:

library(reticulate)
library(tidyverse)

mailbox <- import("mailbox")

If you're unfamiliar with a Python module or object, you can get help right in R via reticulate::py_help(). Et sequitur3: py_help(mailbox) will bring up the text help for that module and py_help(mailbox$mbox) (remember, we swap out dots for dollars when referencing Python object components in R) will do the same for the mailbox.mbox class.

Text help is great and all, but we can also render it to HTML with this helper function:

py_doc <- function(x) {
  require("htmltools")
  require("reticulate")
  pydoc <- reticulate::import("pydoc")
  htmltools::html_print(
    htmltools::HTML(
      pydoc$render_doc(x, renderer=pydoc$HTMLDoc())
    )
  )
}

Here's what the text and HTML help for mailbox.mbox look like side-by-side:

We can also use a helper function to view the online documentation:

readthedocs <- function(obj, py_ver=3, check_keywords = "yes") {
  require("glue")
  query <- obj$`__name__`
  browseURL(
    glue::glue(
      "https://docs.python.org/{py_ver}/search.html?q={query}&check_keywords={check_keywords}"
    )
  )
}

Et sequitur: readthedocs(mailbox$mbox) will take us to this results page

Going back to the task at hand, we need to cycle through the messages and make a data frame for the bits we (well, I) care about). The reticulate package does an amazing job making Python objects first-class citizens in R, but Python objects may feel "opaque" to R users since we have to use the $ syntax to get to methods and values and — very often — familiar helpers such as str() are less than helpful on these objects. Let's try to look at the first message (remember, Python is 0-indexed):

msg1 <- mbox$get(0)

str(msg1)

msg1

The output for those last two calls is not shown because they both are just a large text dump of the message source. #unhelpful

We can get more details, and we'll wrap some punctuation-filled calls in two, small helper functions that have names that will sound familiar:

pstr <- function(obj, ...) { str(obj$`__dict__`, ...) } # like 'str()`

pnames <- function(obj) { import_builtins()$dir(obj) } # like 'names()' but more complete

Lets see them in action:

pstr(msg1, 1) # we can pass any params str() will take
## List of 10
##  $ _from        : chr "jholtman@gmail.com Wed Aug 01 15:02:23 2018"
##  $ policy       :Compat32()
##  $ _headers     :List of 56
##  $ _unixfrom    : NULL
##  $ _payload     : chr "Try this:\n\n> library(lubridate)\n> library(tidyverse)\n> input <- read.csv(text =3D \"date,str1,str2,str3\n+ "| __truncated__
##  $ _charset     : NULL
##  $ preamble     : NULL
##  $ epilogue     : NULL
##  $ defects      : list()
##  $ _default_type: chr "text/plain"

pnames(msg1)
##  [1] "__bytes__"                 "__class__"                
##  [3] "__contains__"              "__delattr__"              
##  [5] "__delitem__"               "__dict__"                 
##  [7] "__dir__"                   "__doc__"                  
##  [9] "__eq__"                    "__format__"               
## [11] "__ge__"                    "__getattribute__"         
## [13] "__getitem__"               "__gt__"                   
## [15] "__hash__"                  "__init__"                 
## [17] "__init_subclass__"         "__iter__"                 
## [19] "__le__"                    "__len__"                  
## [21] "__lt__"                    "__module__"               
## [23] "__ne__"                    "__new__"                  
## [25] "__reduce__"                "__reduce_ex__"            
## [27] "__repr__"                  "__setattr__"              
## [29] "__setitem__"               "__sizeof__"               
## [31] "__str__"                   "__subclasshook__"         
## [33] "__weakref__"               "_become_message"          
## [35] "_charset"                  "_default_type"            
## [37] "_explain_to"               "_from"                    
## [39] "_get_params_preserve"      "_headers"                 
## [41] "_payload"                  "_type_specific_attributes"
## [43] "_unixfrom"                 "add_flag"                 
## [45] "add_header"                "as_bytes"                 
## [47] "as_string"                 "attach"                   
## [49] "defects"                   "del_param"                
## [51] "epilogue"                  "get"                      
## [53] "get_all"                   "get_boundary"             
## [55] "get_charset"               "get_charsets"             
## [57] "get_content_charset"       "get_content_disposition"  
## [59] "get_content_maintype"      "get_content_subtype"      
## [61] "get_content_type"          "get_default_type"         
## [63] "get_filename"              "get_flags"                
## [65] "get_from"                  "get_param"                
## [67] "get_params"                "get_payload"              
## [69] "get_unixfrom"              "is_multipart"             
## [71] "items"                     "keys"                     
## [73] "policy"                    "preamble"                 
## [75] "raw_items"                 "remove_flag"              
## [77] "replace_header"            "set_boundary"             
## [79] "set_charset"               "set_default_type"         
## [81] "set_flags"                 "set_from"                 
## [83] "set_param"                 "set_payload"              
## [85] "set_raw"                   "set_type"                 
## [87] "set_unixfrom"              "values"                   
## [89] "walk"

names(msg1)
##  [1] "add_flag"                "add_header"             
##  [3] "as_bytes"                "as_string"              
##  [5] "attach"                  "defects"                
##  [7] "del_param"               "epilogue"               
##  [9] "get"                     "get_all"                
## [11] "get_boundary"            "get_charset"            
## [13] "get_charsets"            "get_content_charset"    
## [15] "get_content_disposition" "get_content_maintype"   
## [17] "get_content_subtype"     "get_content_type"       
## [19] "get_default_type"        "get_filename"           
## [21] "get_flags"               "get_from"               
## [23] "get_param"               "get_params"             
## [25] "get_payload"             "get_unixfrom"           
## [27] "is_multipart"            "items"                  
## [29] "keys"                    "policy"                 
## [31] "preamble"                "raw_items"              
## [33] "remove_flag"             "replace_header"         
## [35] "set_boundary"            "set_charset"            
## [37] "set_default_type"        "set_flags"              
## [39] "set_from"                "set_param"              
## [41] "set_payload"             "set_raw"                
## [43] "set_type"                "set_unixfrom"           
## [45] "values"                  "walk"

# See the difference between pnames() and names()

setdiff(pnames(msg1), names(msg1))
##  [1] "__bytes__"                 "__class__"                
##  [3] "__contains__"              "__delattr__"              
##  [5] "__delitem__"               "__dict__"                 
##  [7] "__dir__"                   "__doc__"                  
##  [9] "__eq__"                    "__format__"               
## [11] "__ge__"                    "__getattribute__"         
## [13] "__getitem__"               "__gt__"                   
## [15] "__hash__"                  "__init__"                 
## [17] "__init_subclass__"         "__iter__"                 
## [19] "__le__"                    "__len__"                  
## [21] "__lt__"                    "__module__"               
## [23] "__ne__"                    "__new__"                  
## [25] "__reduce__"                "__reduce_ex__"            
## [27] "__repr__"                  "__setattr__"              
## [29] "__setitem__"               "__sizeof__"               
## [31] "__str__"                   "__subclasshook__"         
## [33] "__weakref__"               "_become_message"          
## [35] "_charset"                  "_default_type"            
## [37] "_explain_to"               "_from"                    
## [39] "_get_params_preserve"      "_headers"                 
## [41] "_payload"                  "_type_specific_attributes"
## [43] "_unixfrom"

Using just names() excludes the "hidden" builtins for Python objects, but knowing they are there and what they are can be helpful, depending on the program context.

Let's continue on the path to our messaging goal and see what headers are available. We'll use some domain knowledge about the _headers component, though we won't end up going that route to build a data frame:

map_chr(msg1$`_headers`, ~.x[[1]])
##  [1] "Delivered-To"               "Received"                  
##  [3] "X-Google-Smtp-Source"       "X-Received"                
##  [5] "ARC-Seal"                   "ARC-Message-Signature"     
##  [7] "ARC-Authentication-Results" "Return-Path"               
##  [9] "Received"                   "Received-SPF"              
## [11] "Authentication-Results"     "Received"                  
## [13] "X-Virus-Scanned"            "Received"                  
## [15] "Received"                   "Received"                  
## [17] "X-Virus-Scanned"            "X-Spam-Flag"               
## [19] "X-Spam-Score"               "X-Spam-Level"              
## [21] "X-Spam-Status"              "Received"                  
## [23] "Received"                   "Received"                  
## [25] "Received"                   "DKIM-Signature"            
## [27] "X-Google-DKIM-Signature"    "X-Gm-Message-State"        
## [29] "X-Received"                 "MIME-Version"              
## [31] "References"                 "In-Reply-To"               
## [33] "From"                       "Date"                      
## [35] "Message-ID"                 "To"                        
## [37] "X-Tag-Only"                 "X-Filter-Node"             
## [39] "X-Spam-Level"               "X-Spam-Status"             
## [41] "X-Spam-Flag"                "Content-Disposition"       
## [43] "Subject"                    "X-BeenThere"               
## [45] "X-Mailman-Version"          "Precedence"                
## [47] "List-Id"                    "List-Unsubscribe"          
## [49] "List-Archive"               "List-Post"                 
## [51] "List-Help"                  "List-Subscribe"            
## [53] "Content-Type"               "Content-Transfer-Encoding" 
## [55] "Errors-To"                  "Sender"

The mbox object does provide a get() method to retrieve header values so we'll go that route to build our data frame but we'll make yet-another helper since doing something like msg1$get("this header does not exist") will return NULL just like list(a=1)$b would. We'll actually make two new helpers since we want to be able to safely work with the payload content and that means ensuring it's in UTF-8 encoding (mail systems are horribly diverse beasts and the R community is international and, remember, we're using R mailing list messages):

# execute an object's get() method and return a character string or NA if no value was present for the key
get_chr <- function(.x, .y) { as.character(.x[["get"]](.y)) %|0|% NA_character_ }

# get the object's value as a valid UTF-8 string
utf8_decode <- function(.x) { .x[["decode"]]("utf-8", "ignore") %|0|% NA_character_ }

We're also doing this because I get really tired of using the $ syntax.

We also want the message content or payload. Modern mail messages can be really complex structures with many multiple part entities. To put it a different way, there may be HTML, RTF and plaintext versions of a message all in the same envelope. We want the plaintext ones so we'll have to iterate through any multipart messages to (hopefully) get to a plaintext version. Since this post is already pretty long and we ignored errors in the tm portion, I'll refrain from including any error handling code here as well.

map_df(1:py_len(mbox), ~{

  m <- mbox$get(.x-1) # python uses 0-index lists

  list(
    date = as.POSIXct(get_chr(m, "date"), format = "%a, %e %b %Y %H:%M:%S %z"),
    from = get_chr(m, "from"),
    to = get_chr(m, "to"),
    subj = get_chr(m, "subject"),
    spam_score = get_chr(m, "X-Spam-Score")
  ) -> mdf

  content_type <-  m$get_content_maintype() %|0|% NA_character_

  if (content_type[1] == "text") { # we don't want images
    while (m$is_multipart()) m <- m$get_payload()[[1]] # cycle through until we get to something we can use
    mtmp <- m$get_payload(decode = TRUE) # get the message text
    mdf$body <- utf8_decode(mtmp) # make it safe to use
  }

  mdf

}) -> mbox_df

glimpse(mbox_df)
## Observations: 198
## Variables: 7
## $ date          2018-08-01 11:01:17, 2018-08-01 09:09:18, 20...
## $ from          "jim holtman ", "PIKAL Pe...
## $ to            "diego.avesani@gmail.com, R mailing list  "Re: [R] read txt file - date - no space", "R...
## $ spam_score    "-3.631", "-3.533", "-3.631", "-3.631", "-3.5...
## $ content_type  "text", "text", "text", "text", "text", "text...
## $ body          "Try this:\n\n library(lubridate)\n library...

FIN

By now, you've likely figured out this post really had nothing to do with reading mbox files. I mean, it did — and this was a task I had to do this week — but the real goal was to use a fairly basic task to help R folks edge a bit closer to becoming more friendly with Python in R. There hundreds of thousands of Python packages out there and, while I'm one to wax poetic about having R or C[++]-backed R-native packages — and am wont to point out Python's egregiously prolific flaws — sometimes you just need to get something done quickly and wish to avoid reinventing the wheel. The reticulate package makes that eminently possible.

I'll be wrapping up some of the reticulate helper functions into a small package soon, so keep your eyes on RSS.


: You might want to read this even if you're not interested in mbox files. FIN (right above this note) might have some clues as to why.
1: yes, the section title was a stretch
2: am I doing this right, Mara? ;-)
3: Make Latin Great Again

After reading this interesting analysis of “How Often Are Americans’ Accounts Breached?” by Gaurav Sood (which we need more of in cyber-land) I gave in to the impulse to do some gg-doodling with the “Have I Been Pwnd” JSON data he used.

It’s just some basic data manipulation with some heavy ggplot2 styling customization, so no real need for exposition beyond noting that there are many other ways to view the data. I just settled on centered segments early on and went from there. If you do a bit of gg-doodling yourself, drop a note in the comments with a link.

You can see a full-size version of the image via this link.

library(hrbrthemes) # use github or gitlab version
library(tidyverse)

# get the data

dat_url <- "https://raw.githubusercontent.com/themains/pwned/master/data/breaches.json"

jsonlite::fromJSON(dat_url) %>% 
  mutate(BreachDate = as.Date(BreachDate)) %>% 
  tbl_df() -> breaches

# selected breach labels df
group_by(breaches, year = lubridate::year(BreachDate)) %>% 
  top_n(1, wt=PwnCount) %>% 
  ungroup() %>% 
  filter(year %in% c(2008, 2015, 2016, 2017)) %>% # pick years where labels will fit nicely
  mutate(
    lab = sprintf("%s\n%sM accounts", Name, as.integer(PwnCount/1000000))
  ) %>% 
  arrange(year) -> labs

# num of known breaches in that year for labels
count(breaches, year = lubridate::year(BreachDate)) %>% 
  mutate(nlab = sprintf("n=%s", n)) %>% 
  mutate(lab_x = as.Date(sprintf("%s-07-02", year))) -> year_cts

mutate(breaches, p_half = PwnCount/2) %>% # for centered segments
  ggplot() +
  geom_segment( # centered segments
    aes(BreachDate, p_half, xend=BreachDate, yend=-p_half), 
    color = ft_cols$yellow, size = 0.3
  ) +
  geom_text( # selected breach labels
    data = labs, aes(BreachDate, PwnCount/2, label = lab),
    lineheight = 0.875, size = 3.25, family = font_rc,
    hjust = c(0, 1, 1, 0), vjust = 1, nudge_x = c(25, -25, -25, 25),
    nudge_y = 0,  color = ft_cols$slate
  ) +
  geom_text( # top year labels
    data = year_cts, aes(lab_x, Inf, label = year), family = font_rc, 
    size = 4, vjust = 1, lineheight = 0.875, color = ft_cols$gray
  ) +
  geom_text( # bottom known breach count totals
    data = year_cts, aes(lab_x, -Inf, label = nlab, size = n), 
    vjust = 0, lineheight = 0.875, color = ft_cols$peach,
    family = font_rc, show.legend = FALSE
  ) +
  scale_x_date( # break on year
    name = NULL, date_breaks = "1 year", date_labels = "%Y"
  ) +
  scale_y_comma(name = NULL, limits = c(-450000000, 450000000)) + # make room for labels
  scale_size_continuous(range = c(3, 4.5)) + # tolerable font sizes 
  labs(
    title = "HIBP (Known) Breach Frequency & Size",
    subtitle = "Segment length is number of accounts; n=# known account breaches that year",
    caption = "Source: HIBP via "
  ) +
  theme_ft_rc(grid="X") +
  theme(axis.text.y = element_blank()) +
  theme(axis.text.x = element_blank())

Continuing the blog’s UDF theme of late, there are two new UDF kids in town:

Now, if you’re an Apache Drill fanatic, you’re likely thinking “Hey hrbrmstr: don’t you know that Drill has a parse_url()? function already?” My answer is “Sure, but it’s based on java.net.URL which is fundamentally broken.”

Slicing & dicing URLs and IDNs is a large part of the $DAYJOB and they go together pretty well, hence the joint UDF release.

Rather than just use boring SQL for an example, we’ll start with some SQL and use R for a decent example of working with the two, new UDFs.

Counting Lying Lock Icons

SSL/TLS is all the craze these days, so let’s see how many distinct sites in the GDELT Global Front Page (GFG) data set use port 443 vs port 80 (a good indicator, plus it will help show how the URL tools pick up ports even when they’re not there).

If you go to the aforementioned URL it instructs us that the most current GFG dataset URL can be retrieved by inspecting the contents of this metadata URL

There are over a million records in that data set but — as we’ll see — not nearly as many distinct hosts.

Let’s get the data:

library(sergeant)
library(tidyverse)

read_delim(
  file = "http://data.gdeltproject.org/gdeltv3/gfg/alpha/lastupdate.txt", 
  delim = " ", 
  col_names = FALSE,
  col_types = "ccc"
) -> gfg_update

dl_path <- file.path("~/Data/gfg_links.tsv.gz")

if (!file.exists(dl_path)) download.file(gfg_update$X3[1], dl_path)

Those operations have placed the GFG data set in a place where my local Drill instance can get to them. It's a tab separated file (TSV) which — while not a great data format — is workable with Drill.

Now we'll setup a SQL query that will parse the URLs and domains, giving us a nice rectangular structure for R & dbplyr. We'll use the second column since a significant percentage of the URLs in column 6 are malformed:

db <- src_drill()

tbl(db, "(
SELECT 
  b.host,
  port,
  b.rec.hostname AS hostname,
  b.rec.assigned AS assigned,
  b.rec.tld AS tld,
  b.rec.subdomain AS subdomain
FROM
  (SELECT
    host, port, suffix_extract(host) AS rec             -- break the hostname into components
  FROM
    (SELECT
      a.rec.host AS host, a.rec.port AS port
    FROM
      (SELECT 
        columns[1] AS url, url_parse(columns[1]) AS rec -- break the URL into components
      FROM dfs.d.`/gfg_links.tsv.gz`) a
    WHERE a.rec.port IS NOT NULL                        -- filter out URL parsing failures
    )
  ) b
WHERE b.rec.tld IS NOT NULL                             -- filter out domain parsing failures
)") -> gfg_df

gfg_df
## # Database: DrillConnection
##    hostname  port host              subdomain assigned      tld  
##                                    
##  1 www         80 www.eestikirik.ee NA        eestikirik.ee ee   
##  2 www         80 www.eestikirik.ee NA        eestikirik.ee ee   
##  3 www         80 www.eestikirik.ee NA        eestikirik.ee ee   
##  4 www         80 www.eestikirik.ee NA        eestikirik.ee ee   
##  5 www         80 www.eestikirik.ee NA        eestikirik.ee ee   
##  6 www         80 www.eestikirik.ee NA        eestikirik.ee ee   
##  7 www         80 www.eestikirik.ee NA        eestikirik.ee ee   
##  8 www         80 www.eestikirik.ee NA        eestikirik.ee ee   
##  9 www         80 www.eestikirik.ee NA        eestikirik.ee ee   
## 10 www         80 www.eestikirik.ee NA        eestikirik.ee ee   
## # ... with more rows

While we could have done it all in SQL, we saved some bits for R:

distinct(gfg_df, assigned, port) %>% 
  count(port) %>% 
  collect() -> port_counts

port_counts
# A tibble: 2 x 2
   port     n
*  
1    80 20648
2   443 22178

You'd think more news-oriented sites would be HTTPS by default given the current global political climate (though those lock icons are no safety panacea by any stretch of the imagination).

FIN

Now, R can do URL & IDN slicing, but Drill can operate at-scale. That is, R's urltools package may be fine for single-node, in-memory ops, but Drill can process billions of URLs when part of a cluster.

I'm not 100% settled on the galimatias library for URL parsing (I need to do some extended testing) and I may add some less-strict IDN slicing & dicing functions as well.

Kick the tyres & file issues & PRs as necessary.

There are many ways to gather Twitter data for analysis and many R and Python (et al) libraries make full use of the Twitter API when building a corpus to extract useful metadata for each tweet along with the text of each tweet. However, many corpus archives are minimal and only retain a small portion of the metadata — often just tweet timestamp, the tweet creator and the tweet text — leaving to the analyst the trudging work of re-extracting hashtags, mentions, URLs (etc).

Twitter provides a tweet-text processing library for many languages. One of these languages is Java. Since it make sense to perform at-scale data operations in Apache Drill, it also seemed to make sense that Apache Drill could use a tweet metadata extraction set of user-defined functions (UDFs). Plus, there just aren’t enough examples of Drill UDFs out there. Thus begat drill-twitter-text?.

What’s Inside the Tin?

There are five UDF functions in the package:

  • tw_parse_tweet(string): Parses the tweet text and returns a map column with the following named values:
    • weightedLength: (int) the overall length of the tweet with code points weighted per the ranges defined in the configuration file
    • permillage: (int) indicates the proportion (per thousand) of the weighted length in comparison to the max weighted length. A value > 1000 indicates input text that is longer than the allowable maximum.
    • isValid: (boolean) indicates if input text length corresponds to a valid result.
    • display_start / display_end: (int) indices identifying the inclusive start and exclusive end of the displayable content of the Tweet.
    • valid_start / valid_end: (int) indices identifying the inclusive start and exclusive end of the valid content of the Tweet.
  • tw_extract_hashtags(string): Extracts all hashtags in the tweet text into a list which can be FLATTEN()ed.
  • tw_extract_screennames(string): Extracts all screennames in the tweet text into a list which can be FLATTEN()ed.
  • tw_extract_urls(string): Extracts all URLs in the tweet text into a list which can be FLATTEN()ed.
  • tw_extract_reply_screenname(): Extracts the reply screenname (if any) from the tweet text into a VARCHAR.

The repo has all the necessary bits and info to help you compile and load the necessary JARs, but those in a hurry can just copy all the files in the target directory to your local jars/3rparty directory and restart Drill.

Usage

Here’s an example of how to call each UDF along with the output:

SELECT 
  tw_extract_screennames(tweetText) AS mentions,
  tw_extract_hashtags(tweetText) AS tags,
  tw_extract_urls(tweetText) AS urls,
  tw_extract_reply_screenname(tweetText) AS reply_to,
  tw_parse_tweet(tweetText) AS tweet_meta
FROM
  (SELECT 
     '@youThere Load data from #Apache Drill to @QlikSense - #Qlik Tuesday Tips and Tricks #ApacheDrill #BigData https://t.co/fkAJokKF5O https://t.co/bxdNCiqdrE' AS tweetText
   FROM (VALUES((1))))

+----------+------+------+----------+------------+
| mentions | tags | urls | reply_to | tweet_meta |
+----------+------+------+----------+------------+
| ["youThere","QlikSense"] | ["Apache","Qlik","ApacheDrill","BigData"] | ["https://t.co/fkAJokKF5O","https://t.co/bxdNCiqdrE"] | youThere | {"weightedLength":154,"permillage":550,"isValid":true,"display_start":0,"display_end":153,"valid_start":0,"valid_end":153} |
+----------+------+------+----------+------------+

FIN

Kick the tyres and file issues and PRs as needed.

A previous post explored how to deal with Amazon Athena queries asynchronously. The function presented is a beast, though it is on purpose (to provide options for folks).

In reality, nobody really wants to use rJava wrappers much anymore and dealing with icky Python library calls directly just feels wrong, plus Python functions often return truly daft/ugly data structures. R users deserve better than that.

R is not a first-class citizen in Amazon-land and while the cloudyr project does a fine job building native-R packages for various Amazon services, the fact remains that the official Amazon SDKs are in other languages. The reticulate package provides an elegant interface to Python so it seemed to make sense to go ahead and wrap the boto3 Athena client into something more R-like and toss in the collect_async() function for good measure.

Dependencies

I forced a dependency on Python 3.5 because friends don’t let friends rely on dated, fragmented ecosystems. Python versions can gracefully (mostly) coexist so there should be no pain/angst associated with keeping an updated Python 3 environment around. As noted in the package, I highly recommend adding RETICULATE_PYTHON=/usr/local/bin/python3 to your R environment (~/.Renviron is a good place to store it) since it will help reticulate find the proper target.

If boto3 is not installed, you will need to do pip3 install boto3 to ensure you have the necessary Python module available and associated with your Python 3 installation.

It may seem obvious, but an Amazon AWS account is also required and you should be familiar with the Athena service and AWS services in general. Most of the roto.athena functions have a set of optional parameters:

  • aws_access_key_id
  • aws_secret_access_key
  • aws_session_token
  • region_name
  • profile_name

Ideally, these should be in setup in the proper configuration files and you should let boto3 handle the details of retrieving them. One parameter you will see used in many of my examples is profile_name = "personal". I have numerous AWS accounts and manage them via the profile ids. By ensuring the AWS configuration files are thoroughly populated, I avoid the need to load and pass around the various keys and/or tokens most AWS SDK API calls require. You can read more about profile management in the official docs: 1, 2.

Usage

The project README and package manual pages are populated and have a smattering of usage examples. It is likely you will really just want to execute a manually prepared SQL query and retrieve the results or do the dplyr dance and collect the results asynchronously. We’ll cover both of those use-cases now, starting with a manual SQL query.

If you have not deleted it, your Athena instance comes with a sampledb that contains an elb_logs table. We’ll use that for our example queries. First, let’s get the packages we’ll be using out of the way:

library(odbc)
library(DBI) # for dplyr access later
library(odbc) # for dplyr access later
library(roto.athena) # hrbrmstr/roto.athena on gh or gl
library(tidyverse) # b/c it rocks

Now, we’ll prepare and execute the query. This is a super-simple one:

query <- "SELECT COUNT(requestip) AS ct FROM elb_logs"

start_query_execution(
  query = query,
  database = "sampledb",
  output_location = "s3://aws-athena-query-results-redacted",
  profile = "personal"
) -> qex_id

The qex_id contains the query execution id. We can pass that along to get information on the status of the query:

get_query_execution(qex_id, profile = "personal") %>%
  glimpse()
## Observations: 1
## Variables: 10
## $ query_execution_id   "7f8d8bd6-9fe6-4a26-a021-ee10470c1048"
## $ query                "SELECT COUNT(requestip) AS ct FROM elb_logs"
## $ output_location      "s3://aws-athena-query-results-redacted/7f...
## $ database             "sampledb"
## $ state                "RUNNING"
## $ state_change_reason  NA
## $ submitted            "2018-07-20 11:06:06.468000-04:00"
## $ completed            NA
## $ execution_time_ms    NA
## $ bytes_scanned        NA

If the state is not SUCCEEDED then you’ll need to be patient before trying to retrieve the results.

get_query_results(qex_id, profile = "personal")
## # A tibble: 1 x 1
##   ct             
##   
## 1 4229

Now, we’ll use dplyr via the Athena ODBC driver:

DBI::dbConnect(
  odbc::odbc(),
  driver = "/Library/simba/athenaodbc/lib/libathenaodbc_sbu.dylib",
  Schema = "sampledb",
  AwsRegion = "us-east-1",
  AwsProfile = "personal",
  AuthenticationType = "IAM Profile",
  S3OutputLocation = "s3://aws-athena-query-results-redacted"
) -> con

elb_logs <- tbl(con, "elb_logs")

I've got the ODBC DBI fragment in a parameterized RStudio snippet and others may find that as a time-saver if you're not doing that already.

Now to build and submit the query:

mutate(elb_logs, tsday = substr(timestamp, 1, 10)) %>%
  filter(tsday == "2014-09-29") %>%
  select(requestip, requestprocessingtime) %>%
  collect_async(
    database = "sampledb",
    output_location = "s3://aws-athena-query-results-redacted",
    profile_name = "personal"
  ) -> qex_id

As noted in the previous blog post, collect_async() turn the dplyr chain into a SQL query then fires off the whole thing to start_query_execution() for you and returns the query execution id:

get_query_execution(qex_id, profile = "personal") %>%
  glimpse()
## Observations: 1
## Variables: 10
## $ query_execution_id   "95bd158b-7790-42ba-aa83-e7436c3470fe"
## $ query                "SELECT \"requestip\", \"requestprocessing...
## $ output_location      "s3://aws-athena-query-results-redacted/95...
## $ database             "sampledb"
## $ state                "RUNNING"
## $ state_change_reason  NA
## $ submitted            "2018-07-20 11:06:12.817000-04:00"
## $ completed            NA
## $ execution_time_ms    NA
## $ bytes_scanned        NA

Again, you'll need to be patient and wait for the state to be SUCCEEDED to retrieve the results.

get_query_results(qex_id, profile = "personal")
## # A tibble: 774 x 2
##    requestip       requestprocessingtime
##                               
##  1 255.48.150.122              0.0000900
##  2 249.213.227.93              0.0000970
##  3 245.108.120.229             0.0000870
##  4 241.112.203.216             0.0000940
##  5 241.43.107.223              0.0000760
##  6 249.117.98.137              0.0000830
##  7 250.134.112.194             0.0000630
##  8 250.200.171.222             0.0000540
##  9 248.193.76.218              0.0000820
## 10 250.57.61.131               0.0000870
## # ... with 764 more rows

You can also use the query execution id to sync the resultant CSV from S3. Which one is more performant is definitely something you'll need to test since it varies with AWS region, result set size, your network connection and other environment variables. One benefit of using get_query_results() is that it uses the column types to set the data frame column types appropriately (I still need to setup a full test of all possible types so not all are handled yet).

Kick the tyres

The package is up on both GitLab and GitHub and any and all feedback (i.e. Issues) or tweaks (i.e. PRs) are most welcome.