Inter-operate with ‘MQTT’ Message Brokers With R (a.k.a. Live! BBC! Subtitles!)

Most of us see the internet through the lens of browsers and apps on our laptops, desktops, watches, TVs and mobile devices. These displays are showing us — for the most part — content designed for human consumption. Sure, apps handle API interactions, but even most of that communication happens over ports 80 or 443. But, there are lots of ports out there; 0:65535, in fact (at least TCP-wise). And, all of them have some kind of data, and most of that is still targeted to something for us.

What if I told you the machines are also talking to each other using a thin/efficient protocol that allows one, tiny sensor to talk to hundreds — if not thousands — of systems without even a drop of silicon-laced sweat? How can a mere, constrained sensor do that? Well, it doesn’t do it alone. Many of them share their data over a fairly new protocol dubbed MQTT (Message Queuing Telemetry Transport).

An MQTT broker watches for devices to publish data under various topics and then also watches for other systems to subscribe to said topics and handles the rest of the interchange. The protocol is lightweight enough that fairly low-powered (CPU- and literal electric-use-wise) devices can easily send their data chunks up to a broker, and the entire protocol is robust enough to support a plethora of connections and an equal plethora of types of data.

Why am I telling you all this?

Devices that publish to MQTT brokers tend to be in the spectrum of what folks sadly call the “Internet of Things”. It’s a terrible, ambiguous name, but it’s all over the media and most folks have some idea what it means. In the context of MQTT, you can think of it as, say, a single temperature sensor publishing it’s data to an MQTT broker so many other things — including programs written by humans to capture, log and analyze that data — can receive it. This is starting to sound like something that might be right up R’s alley.

There are also potential use-cases where an online data processing system might want to publish data to many clients without said clients having to poll a poor, single-threaded R server constantly.

Having MQTT connectivity for R could be really interesting.

And, now we have the beginnings of said connectivity with the mqtt? package.

Another Package? Really?

Yes, really.

Besides the huge potential for having a direct R-bridge to the MQTT world, I’m work-interested in MQTT since we’ve found over 35,000 of them on the default, plaintext port for MQTT (1883) alone:

There are enough of them that I don’t even need to show a base map.

Some of these servers require authentication and others aren’t doing much of anything. But, there are a number of them hosted by corporations and individuals that are exposing real data. OwnTracks seems to be one of the more popular self-/badly-hosted ones.

Then, there are others — like test.mosquitto.org — which deliberately run open MQTT servers for “testing”. There definitely is testing going on there, but there are also real services using it as a production broker. The mqtt package is based on the mosquitto C library, so it’s only fitting that we show a few examples from its own test site here.

For now, there’s really one function: topic_subscribe(). Eventually, R will be able to publish to a broker and do more robust data collection operations (say, to make a live MQTT dashboard in Shiny). The topic_subscribe() function is an all-in one tool that enables you to:

  • connect to a broker
  • subscribe to a topic
  • pass in R callback functions which will be executed on connect, disconnect and when new messages come in

That’s plenty of functionality to do some fun things.

(Update: if you had tried this during the ~24 hours after the blog post was up, you may have run into an issue where the client id was not unique and got wonky results. That’s fixed, now, but it’s a good idea to use your own, unique client id when making MQTT requests).

What’s the frequencytemperature, Kenneth?

The mosquitto test server has one topic — /outbox/crouton-demo/temperature — which is a fake temperature sensor that just sends data periodically so you have something to test with. Let’s capture 50 samples and plot them.

Since we’re using a callback we have to use the tricksy <<- operator to store/update variables outside the callback function. And, we should pre-allocate space for said data to avoid needlessly growing objects. Here’s a complete code-block:

library(mqtt) # devtools::install_github("hrbrmstr/mqtt")
library(jsonlite)
library(hrbrthemes)
library(tidyverse)

i <- 0 # initialize our counter
max_recs <- 50 # max number of readings to get

readings <- vector("character", max_recs)

# our callback function
temp_cb <- function(id, topic, payload, qos, retain) {

  i <<- i + 1 # update the counter
  readings[i] <<- readBin(payload, "character") # update our larger-scoped vector

  return(if (i==max_recs) "quit" else "go") # need to send at least "". "quit" == done

}

topic_subscribe(
  topic = "/outbox/crouton-demo/temperature",
  message_callback=temp_cb
)

# each reading looks like this:
# {"update": {"labels":[4631],"series":[[68]]}}
map(readings, fromJSON) %>%
  map(unlist) %>%
  map_df(as.list) %>%
  ggplot(aes(update.labels, update.series)) +
  geom_line() +
  geom_point() +
  labs(x="Reading", y="Temp (F)", title="Temperature via MQTT") +
  theme_ipsum_rc(grid="XY")

We setup temp_cb() to be our callback and topic_subscribe() ensures that the underlying mosquitto library will call it every time a new message is published to that topic. The chart really shows how synthetic the data is.

Subtitles from the Edge

Temperature sensors are just the sort of thing that MQTT was designed for. But, we don’t need to be stodgy about our use of MQTT.

Just about a year ago from this post, the BBC launched live subtitles for iPlayer. Residents of the Colonies may not know what iPlayer is, but it’s the “app” that lets UK citizens watch BBC programmes on glowing rectangles that aren’t proper tellys. Live subtitles are hard to produce well (and get right) and the BBC making the effort to do so also on their digital platform is quite commendable. We U.S. folks will likely be charged $0.99 for each set of digital subtitles now that net neutrality is gone.

Now, some clever person(s) wired up some of these live subtitles to MQTT topics. We can wire up our own code in R to read them live:

bbc_callback <- function(id, topic, payload, qos, retain) {
  cat(crayon::green(readBin(payload, "character")), "\n", sep="")
  return("") # ctrl-c will terminate
}

mqtt::topic_subscribe(topic = "bbc/subtitles/bbc_news24/raw",
                      connection_callback=mqtt::mqtt_silent_connection_callback,
                      message_callback=bbc_callback)

In this case, control-c terminates things (cleanly).

You could easily modify the above code to have a bot that monitors for certain keywords then sends windowed chunks of subtitled text to some other system (Slack, database, etc). Or, create an online tidy text analysis workflow from them.

Shiny MQTT

This is an update to the post. I had posited (below) the potential for future use in a Shiny context. If some missed points are acceptable, you can use this in Shiny now. Here’s code for a live viewer of the first temperature data example (above). (NOTE: I heavily borrowedstole from Miles/Alicia’s cool webrockets example for this:

library(shiny)
library(mqtt)
library(hrbrthemes)
library(tidyverse)

ui <- fluidPage(
  plotOutput('plot')
)

server <- function(input, output) {

  get_temps <- function(n) {

    i <- 0
    max_recs <- n
    readings <- vector("character", max_recs)

    temp_cb <- function(id, topic, payload, qos, retain) {
      i <<- i + 1
      readings[i] <<- readBin(payload, "character")
      return(if (i==max_recs) "quit" else "go")
    }

    mqtt::topic_subscribe(topic = "/outbox/crouton-demo/temperature",
                    connection_callback = mqtt::mqtt_silent_connection_callback,
                    message_callback = temp_cb)

    purrr::map(readings, jsonlite::fromJSON) %>%
      purrr::map(unlist) %>%
      purrr::map_df(as.list)

  }

  values <- reactiveValues(x = NULL, y = NULL)

  observeEvent(invalidateLater(450), {
    new_response <- get_temps(1)
    if (length(new_response) != 0) {
      values$x <- c(values$x, new_response$update.labels)
      values$y <- c(values$y, new_response$update.series)
    }
  }, ignoreNULL = FALSE)

  output$plot <- renderPlot({
    xdf <- data.frame(xval = values$x, yval = values$y)
    ggplot(xdf, aes(x = xval, y=yval)) + 
      geom_line() +
      geom_point() +
      theme_ipsum_rc(grid="XY")
  })

}

shinyApp(ui = ui, server = server)

FIN

The code is on GitHub and all input/contributions are welcome and encouraged. Some necessary TBDs are authentication & encryption. But, how would you like the API to look for using it, say, in Shiny apps? What should publishing look like? What helper functions would be useful (ones to slice & dice topic names or another to convert raw message text more safely)? Should there be an R MQTT “DSL”? Lots of things to ponder and so many sites to “test”!

P.S.

In case you are concerned about the unusually boring R package name, I wanted to use RIoT (lower-cased, of course) but riot is, alas, already taken.

Cover image from Data-Driven Security
Amazon Author Page

1 Comment Inter-operate with ‘MQTT’ Message Brokers With R (a.k.a. Live! BBC! Subtitles!)

  1. Pingback: Inter-operate with ‘MQTT’ Message Brokers With R (a.k.a. Live! BBC! Subtitles!) – Mubashir Qasim

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.