I’ve been drafting a new R package — [`sergeant`](https://github.com/hrbrmstr/sergeant) — to work with [Apache Drill](http://drill.apache.org/) and have found it much easier to manage having Drill operating in a single node cluster vs `drill-embedded` mode (esp when I need to add a couple more nodes for additional capacity). That means running [Apache Zookeeper](https://zookeeper.apache.org/) and I’ve had the occasional need to ping the Zookeeper process to see various stats (especially when I add some systems to the cluster).
Yes, it’s very easy+straightforward to `nc hostname 2181` from the command line to issue commands (or use the Zookeeper CLIs) but when I’m in R I like to stay in R. To that end, I made a small reference class that makes it super-easy to query Zookeeper from within R. Said class will eventually be in a sister package to `sergeant` (and, a more detailed post on `sergeant` is forthcoming), but there may be others who want/need to poke at Zookeeper processes with [4-letter words](https://zookeeper.apache.org/doc/trunk/zookeeperAdmin.html#sc_zkCommands) and this will help you stay within R. The only command not available is `stmk` since I don’t have the pressing need to set the trace mask.
After you source the self-contained reference class you connect and then issue commands as so:
zk <- zookeeper$new(host="drill.local")
zk$ruok()
zk$conf()
zk$stat()
Drop a note in the comments (since this isn’t on github, yet) with any questions/issues.
zookeeper <- setRefClass(
Class="zookeeper",
fields=list(
host="character",
port="integer",
timeout="integer"
),
methods=list(
initialize=function(..., host="localhost", port=2181L, timeout=30L) {
host <<- host ; port <<- port ; timeout <<- timeout; callSuper(...)
},
available_commands=function() {
return(sort(c("conf", "envi", "stat", "srvr", "whcs", "wchc", "wchp", "mntr",
"cons", "crst", "srst", "dump", "ruok", "dirs", "isro", "gtmk")))
},
connect=function(host="localhost", port=2181L, timeout=30L) {
.self$host <- host ; .self$port <- port ; .self$timeout <- timeout
},
conf=function() {
res <- .self$send_cmd("conf")
res <- stringi::stri_split_fixed(res, "=", 2, simplify=TRUE)
as.list(setNames(res[,2], res[,1]))
},
envi=function() {
res <- .self$send_cmd("envi")
res <- stringi::stri_split_fixed(res[-1], "=", 2, simplify=TRUE)
as.list(setNames(res[,2], res[,1]))
},
stat=function() {
res <- .self$send_cmd("stat")
version <- stri_replace_first_regex(res[1], "^Zoo.*: ", "")
res <- res[-(1:2)]
sep <- which(res=="")
clients <- stri_trim(res[1:(sep-1)])
zstats <- stringi::stri_split_fixed(res[(sep+1):length(res)], ": ", 2, simplify=TRUE)
zstats <- as.list(setNames(zstats[,2], zstats[,1]))
list(version=version, clients=clients, stats=zstats)
},
srvr=function() {
res <- .self$send_cmd("srvr")
zstats <- stringi::stri_split_fixed(res, ": ", 2, simplify=TRUE)
as.list(setNames(zstats[,2], zstats[,1]))
},
wchs=function() {
res <- .self$send_cmd("wchs")
conn_path <- stri_match_first_regex(res[1], "^([[:digit:]]+) connections watching ([[:digit:]]+) paths")
tot_watch <- stri_match_first_regex(res[2], "Total watches:([[:digit:]]+)")
list(connections_watching=conn_path[,2], paths=conn_path[,3], total_watches=tot_watch[,2])
},
wchc=function() {
stri_trim(.self$send_cmd("wchc")) %>% discard(`==`, "") -> res
setNames(list(res[2:length(res)]), res[1])
},
wchp=function() {
.self$send_cmd("wchp") %>% stri_trim() %>% discard(`==`, "") -> res
data.frame(
path=qq[seq(1, length(qq), 2)],
address=qq[seq(2, length(qq), 2)],
stringsAsFactors=FALSE
)
},
mntr=function() {
res <- .self$send_cmd("mntr")
res <- stringi::stri_split_fixed(res, "\t", 2, simplify=TRUE)
as.list(setNames(res[,2], res[,1]))
},
cons=function() { list(clients=stri_trim(.self$send_cmd("cons") %>% discard(`==`, ""))) },
crst=function() { message(.self$send_cmd("crst")) ; invisible() },
srst=function() { message(.self$send_cmd("srst")) ; invisible() },
dump=function() { paste0(.self$send_cmd("dump"), collapse="\n") },
ruok=function() { .self$send_cmd("ruok") == "imok" },
dirs=function() { .self$send_cmd("dirs") },
isro=function() { .self$send_cmd("isro") },
gtmk=function() { R.utils::intToBin(as.integer(.self$send_cmd("gtmk"))) },
send_cmd=function(cmd) {
require(purrr)
require(stringi)
require(R.utils)
sock <- purrr::safely(socketConnection)
con <- sock(host=.self$host, port=.self$port, blocking=TRUE, open="r+", timeout=.self$timeout)
if (!is.null(con$result)) {
con <- con$result
cat(cmd, file=con)
response <- readLines(con, warn=FALSE)
a <- try(close(con))
purrr::flatten_chr(stringi::stri_split_lines(response))
} else {
warning(sprintf("Error connecting to [%s:%s]", host, port))
}
}
)
)
4 Trackbacks/Pingbacks
[…] I’ve been drafting a new R package — sergeant — to work with Apache Drill and have found it much easier to manage having Drill operating in a single node cluster vs drill-embedded mode (esp when I need to add a couple more nodes for additional capacity). That means running Apache Zookeeper and I’ve had… Continue reading → […]
[…] leave a comment for the author, please follow the link and comment on their blog: R – rud.is.R-bloggers.com offers daily e-mail updates about R news and tutorials on topics such as: Data […]
[…] recently mentioned that I’ve been working on a development version of an Apache Drill R package called sergeant. […]
[…] recently mentioned that I’ve been working on a development version of an Apache Drill R package called sergeant. […]