Skip navigation

I’ve been drafting a new R package — [`sergeant`](https://github.com/hrbrmstr/sergeant) — to work with [Apache Drill](http://drill.apache.org/) and have found it much easier to manage having Drill operating in a single node cluster vs `drill-embedded` mode (esp when I need to add a couple more nodes for additional capacity). That means running [Apache Zookeeper](https://zookeeper.apache.org/) and I’ve had the occasional need to ping the Zookeeper process to see various stats (especially when I add some systems to the cluster).

Yes, it’s very easy+straightforward to `nc hostname 2181` from the command line to issue commands (or use the Zookeeper CLIs) but when I’m in R I like to stay in R. To that end, I made a small reference class that makes it super-easy to query Zookeeper from within R. Said class will eventually be in a sister package to `sergeant` (and, a more detailed post on `sergeant` is forthcoming), but there may be others who want/need to poke at Zookeeper processes with [4-letter words](https://zookeeper.apache.org/doc/trunk/zookeeperAdmin.html#sc_zkCommands) and this will help you stay within R. The only command not available is `stmk` since I don’t have the pressing need to set the trace mask.

After you source the self-contained reference class you connect and then issue commands as so:

zk <- zookeeper$new(host="drill.local")

zk$ruok()

zk$conf()
zk$stat()

Drop a note in the comments (since this isn’t on github, yet) with any questions/issues.

zookeeper <- setRefClass(

  Class="zookeeper",

  fields=list(
    host="character",
    port="integer",
    timeout="integer"
  ),

  methods=list(

    initialize=function(..., host="localhost", port=2181L, timeout=30L) {
      host <<- host ; port <<- port ; timeout <<- timeout; callSuper(...)
    },

    available_commands=function() {
      return(sort(c("conf", "envi", "stat", "srvr", "whcs", "wchc", "wchp", "mntr",
                    "cons", "crst", "srst", "dump", "ruok", "dirs", "isro", "gtmk")))
    },

    connect=function(host="localhost", port=2181L, timeout=30L) {
      .self$host <- host ; .self$port <- port ; .self$timeout <- timeout
    },

    conf=function() {
      res <- .self$send_cmd("conf")
      res <- stringi::stri_split_fixed(res, "=", 2, simplify=TRUE)
      as.list(setNames(res[,2], res[,1]))
    },

    envi=function() {
      res <- .self$send_cmd("envi")
      res <- stringi::stri_split_fixed(res[-1], "=", 2, simplify=TRUE)
      as.list(setNames(res[,2], res[,1]))
    },

    stat=function() {
      res <- .self$send_cmd("stat")
      version <- stri_replace_first_regex(res[1], "^Zoo.*: ", "")
      res <- res[-(1:2)]
      sep <- which(res=="")
      clients <- stri_trim(res[1:(sep-1)])
      zstats <- stringi::stri_split_fixed(res[(sep+1):length(res)], ": ", 2, simplify=TRUE)
      zstats <- as.list(setNames(zstats[,2], zstats[,1]))
      list(version=version, clients=clients, stats=zstats)
    },

    srvr=function() {
      res <- .self$send_cmd("srvr")
      zstats <- stringi::stri_split_fixed(res, ": ", 2, simplify=TRUE)
      as.list(setNames(zstats[,2], zstats[,1]))
    },

    wchs=function() {
      res <- .self$send_cmd("wchs")
      conn_path <- stri_match_first_regex(res[1], "^([[:digit:]]+) connections watching ([[:digit:]]+) paths")
      tot_watch <- stri_match_first_regex(res[2], "Total watches:([[:digit:]]+)")
      list(connections_watching=conn_path[,2], paths=conn_path[,3], total_watches=tot_watch[,2])
    },

    wchc=function() {
      stri_trim(.self$send_cmd("wchc")) %>% discard(`==`, "") -> res
      setNames(list(res[2:length(res)]), res[1])
    },

    wchp=function() {
      .self$send_cmd("wchp") %>% stri_trim() %>% discard(`==`, "") -> res
      data.frame(
        path=qq[seq(1, length(qq), 2)],
        address=qq[seq(2, length(qq), 2)],
        stringsAsFactors=FALSE
      )
    },

    mntr=function() {
      res <- .self$send_cmd("mntr")
      res <- stringi::stri_split_fixed(res, "\t", 2, simplify=TRUE)
      as.list(setNames(res[,2], res[,1]))
    },

    cons=function() { list(clients=stri_trim(.self$send_cmd("cons") %>% discard(`==`, ""))) },
    crst=function() { message(.self$send_cmd("crst")) ; invisible() },
    srst=function() { message(.self$send_cmd("srst")) ; invisible() },
    dump=function() { paste0(.self$send_cmd("dump"), collapse="\n") },
    ruok=function() { .self$send_cmd("ruok") == "imok" },
    dirs=function() { .self$send_cmd("dirs") },
    isro=function() { .self$send_cmd("isro") },
    gtmk=function() { R.utils::intToBin(as.integer(.self$send_cmd("gtmk"))) },

    send_cmd=function(cmd) {
      require(purrr)
      require(stringi)
      require(R.utils)
      sock <- purrr::safely(socketConnection)
      con <- sock(host=.self$host, port=.self$port, blocking=TRUE, open="r+", timeout=.self$timeout)
      if (!is.null(con$result)) {
        con <- con$result
        cat(cmd, file=con)
        response <- readLines(con, warn=FALSE)
        a <- try(close(con))
        purrr::flatten_chr(stringi::stri_split_lines(response))
      } else {
        warning(sprintf("Error connecting to [%s:%s]", host, port))
      }
    }

  )

)

4 Trackbacks/Pingbacks

  1. By Minding the zoo[keeper] with R – sec.uno on 16 Dec 2016 at 2:59 pm

    […] I’ve been drafting a new R package — sergeant — to work with Apache Drill and have found it much easier to manage having Drill operating in a single node cluster vs drill-embedded mode (esp when I need to add a couple more nodes for additional capacity). That means running Apache Zookeeper and I’ve had… Continue reading → […]

  2. By Minding the zoo[keeper] with R - Use-R!Use-R! on 17 Dec 2016 at 1:12 am

    […] leave a comment for the author, please follow the link and comment on their blog: R – rud.is.R-bloggers.com offers daily e-mail updates about R news and tutorials on topics such as: Data […]

  3. […] recently mentioned that I’ve been working on a development version of an Apache Drill R package called sergeant. […]

  4. By sergeant : An R Boot Camp for Apache Drill - Use-R!Use-R! on 20 Dec 2016 at 12:11 pm

    […] recently mentioned that I’ve been working on a development version of an Apache Drill R package called sergeant. […]

Leave a Reply

This site uses Akismet to reduce spam. Learn how your comment data is processed.