computación data.table y paralela

Después de esta publicación: multinúcleo y data.table en R , me preguntaba si había una forma de usar todos los núcleos cuando se usa data.table, por lo general, hacer cálculos por grupos podría ser paralelizado. Parece que plyr permite tales operaciones por diseño.

Lo primero que debe verificar es que data.table FAQ 3.1 punto 2 se ha hundido en:

Una asignación de memoria se realiza solo para el grupo más grande, luego esa memoria se reutiliza para los otros grupos. Hay muy poca basura para recolectar.

Esa es una razón por la cual la agrupación data.table es rápida. Pero este enfoque no se presta a la paralelización. Paralelizar significa copiar los datos a los otros hilos, en cambio, costando tiempo. Pero, según tengo entendido, la agrupación data.table suele ser más rápida que plyr con .parallel en cualquier caso. Depende del tiempo de cálculo de la tarea para cada grupo, y si ese tiempo de cómputo puede reducirse fácilmente o no. Mover los datos a menudo domina (cuando se comparan 1 o 3 series de grandes tareas de datos).

Más a menudo, hasta ahora, es en realidad algo que está mordiendo en la expresión j de [.data.table . Por ejemplo, recientemente vimos un bajo rendimiento de la agrupación de data.table pero el culpable resultó ser min(POSIXct) ( min(POSIXct) en R más de 80K ID únicos ). Evitar eso obtuvo más de 50 veces la aceleración.

Entonces el mantra es: Rprof , Rprof , Rprof .

Además, el punto 1 de las mismas preguntas frecuentes podría ser significativo:

Solo esa columna está agrupada, las otras 19 se ignoran porque data.table inspecciona la expresión j y se da cuenta de que no utiliza las otras columnas.

Entonces, data.table realmente no sigue el paradigma split-apply-combine en absoluto. Funciona de manera diferente. split-apply-combine se presta a la paralelización, pero en realidad no se adapta a datos de gran tamaño.

También vea la nota al pie 3 en la viñeta de introducción de data.table:

Nos preguntamos cuántas personas están implementando técnicas paralelas para codificar el escaneo vectorial.

Eso está tratando de decir “seguro, el paralelo es significativamente más rápido, pero ¿cuánto tiempo debería llevar realmente con un algoritmo eficiente?”.

PERO si tiene un perfil (usando Rprof ), y la tarea por grupo es realmente de cálculo intensivo, entonces las 3 publicaciones en datatable-help, incluida la palabra “multinúcleo”, pueden ayudar:

publicaciones multinúcleo en datatable-help

Por supuesto, hay muchas tareas en las que la paralelización sería agradable en data.table, y hay una forma de hacerlo. Pero aún no se ha hecho, ya que generalmente otros factores muerden, por lo que ha sido de baja prioridad. Si puede publicar datos ficticios reproducibles con puntos de referencia y resultados de Rprof, eso ayudaría a boost la prioridad.

He hecho algunas pruebas por el mantra previo de @matt dowle de Rprof, Rprof, Rprof.

Lo que encuentro es que la decisión de paralelizar depende del contexto; pero es probable que sea significativo. Dependiendo de las operaciones de prueba (por ejemplo, foo continuación, que se puede personalizar) y la cantidad de núcleos utilizados (lo bash con 8 y 24), obtengo resultados diferentes.

A continuación los resultados:

  1. usando 8 núcleos, veo una mejora del 21% en este ejemplo para la paralelización
  2. usando 24 núcleos, veo un 14% de mejora .

También miro algunos datos / operaciones del mundo real (no compartibles) que muestran una paralelización de mejora mayor ( 33% o 25% , dos pruebas diferentes) con 24 núcleos. Edición de mayo de 2018 Un nuevo conjunto de casos de ejemplo del mundo real muestra mejoras más cercanas al 85% de las operaciones paralelas con 1000 grupos.

 R> sessionInfo() # 24 core machine: R version 3.3.2 (2016-10-31) Platform: x86_64-pc-linux-gnu (64-bit) Running under: CentOS Linux 7 (Core) attached base packages: [1] parallel stats graphics grDevices utils datasets methods [8] base other attached packages: [1] microbenchmark_1.4-2.1 stringi_1.1.2 data.table_1.10.4 R> sessionInfo() # 8 core machine: R version 3.3.2 (2016-10-31) Platform: x86_64-apple-darwin13.4.0 (64-bit) Running under: macOS Sierra 10.12.4 attached base packages: [1] parallel stats graphics grDevices utils datasets methods base other attached packages: [1] microbenchmark_1.4-2.1 stringi_1.1.5 data.table_1.10.4 

Ejemplo a continuación:

 library(data.table) library(stringi) library(microbenchmark) set.seed(7623452L) my_grps <- stringi::stri_rand_strings(n= 5000, length= 10) my_mat <- matrix(rnorm(1e5), ncol= 20) dt <- data.table(grps= rep(my_grps, each= 20), my_mat) foo <- function(dt) { dt2 <- dt ## needed for .SD lock nr <- nrow(dt2) idx <- sample.int(nr, 1, replace=FALSE) dt2[idx,][, `:=` ( new_var1= V1 / V2, new_var2= V4 * V3 / V10, new_var3= sum(V12), new_var4= ifelse(V10 > 0, V11 / V13, 1), new_var5= ifelse(V9 < 0, V8 / V18, 1) )] return(dt2[idx,]) } split_df <- function(d, var) { base::split(d, get(var, as.environment(d))) } foo2 <- function(dt) { dt2 <- split_df(dt, "grps") require(parallel) cl <- parallel::makeCluster(min(nrow(dt), parallel::detectCores())) clusterExport(cl, varlist= "foo") clusterExport(cl, varlist= "dt2", envir = environment()) clusterEvalQ(cl, library("data.table")) dt2 <- parallel::parLapply(cl, X= dt2, fun= foo) parallel::stopCluster(cl) return(rbindlist(dt2)) } print(parallel::detectCores()) # 8 microbenchmark( serial= dt[,foo(.SD), by= "grps"], parallel= foo2(dt), times= 10L ) Unit: seconds expr min lq mean median uq max neval cld serial 6.962188 7.312666 8.433159 8.758493 9.287294 9.605387 10 b parallel 6.563674 6.648749 6.976669 6.937556 7.102689 7.654257 10 a print(parallel::detectCores()) # 24 Unit: seconds expr min lq mean median uq max neval cld serial 9.014247 9.804112 12.17843 13.17508 13.56914 14.13133 10 a parallel 10.732106 10.957608 11.17652 11.06654 11.30386 12.28353 10 a 

Perfilando:

Podemos usar esta respuesta para proporcionar una respuesta más directa a los comentarios originales de @matt dowle a los perfiles.

Como resultado, vemos que la mayoría del tiempo de cómputo es manejado por base y no data.table . data.table operaciones de data.table son, como se esperaba, excepcionalmente rápidas. Si bien algunos podrían argumentar que esto es evidencia de que no hay necesidad de paralelismo dentro de data.table , data.table que este workflow / operation-set no es atípico. Es decir, tengo la firme sospecha de que la mayoría de la agregación de grandes data.table datos implica una cantidad sustancial de código no data.table ; y que esto está correlacionado con el uso interactivo versus el uso de desarrollo / producción. Por lo tanto, concluyo que el paralelismo sería valioso dentro de los data.table para grandes agregaciones.

 library(profr) prof_list <- replicate(100, profr::profr(dt[,foo(.SD), by= "grps"], interval = 0.002), simplify = FALSE) pkg_timing <- fun_timing <- vector("list", length= 100) for (i in 1:100) { fun_timing[[i]] <- tapply(prof_list[[i]]$time, paste(prof_list[[i]]$source, prof_list[[i]]$f, sep= "::"), sum) pkg_timing[[i]] <- tapply(prof_list[[i]]$time, prof_list[[i]]$source, sum) } sort(sapply(fun_timing, sum)) # no large outliers fun_timing2 <- rbindlist(lapply(fun_timing, function(x) { ret <- data.table(fun= names(x), time= x) ret[, pct_time := time / sum(time)] return(ret) })) pkg_timing2 <- rbindlist(lapply(pkg_timing, function(x) { ret <- data.table(pkg= names(x), time= x) ret[, pct_time := time / sum(time)] return(ret) })) fun_timing2[, .(total_time= sum(time), avg_time= mean(time), avg_pct= round(mean(pct_time), 4)), by= "fun"][ order(avg_time, decreasing = TRUE),][1:10,] pkg_timing2[, .(total_time= sum(time), avg_time= mean(time), avg_pct= round(mean(pct_time), 4)), by= "pkg"][ order(avg_time, decreasing = TRUE),] 

Resultados:

  fun total_time avg_time avg_pct 1: base::[ 670.362 6.70362 0.2694 2: NA::[.data.table 667.350 6.67350 0.2682 3: .GlobalEnv::foo 335.784 3.35784 0.1349 4: base::[[ 163.044 1.63044 0.0655 5: base::[[.data.frame 133.790 1.33790 0.0537 6: base::%in% 120.512 1.20512 0.0484 7: base::sys.call 86.846 0.86846 0.0348 8: NA::replace_dot_alias 27.824 0.27824 0.0112 9: base::which 23.536 0.23536 0.0095 10: base::sapply 22.080 0.22080 0.0089 pkg total_time avg_time avg_pct 1: base 1397.770 13.97770 0.7938 2: .GlobalEnv 335.784 3.35784 0.1908 3: data.table 27.262 0.27262 0.0155 

crossposted en github / data.table

Intereting Posts