Ускорение группы data.table за счет использования нескольких ядер и параллельного программирования

У меня большой код, и этап агрегации - это текущее узкое место с точки зрения скорости.

В моем коде я хотел бы ускорить этап группировки данных, чтобы он был быстрее. SNOTE (простой нетривиальный пример) моих данных выглядит так:

library(data.table)
a = sample(1:10000000, 50000000, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), 50000000, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e)
system.time(c.dt <- dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)])
   user  system elapsed 
 60.107   3.143  63.534

Это довольно быстро для такого большого примера данных, но в моем случае я все еще ищу дальнейшее ускорение. В моем случае у меня несколько ядер, поэтому я почти уверен, что должен быть способ использовать такие вычислительные возможности.

Я готов изменить свой тип данных на объекты data.frame или idata.frame (теоретически idata.frame предположительно быстрее, чем data.frames).

Я провел небольшое исследование и, похоже, у пакета plyr есть некоторые параллельные возможности, которые могут быть полезны, но я все еще не понимаю, как это сделать для группы, которую я пытаюсь сделать. В другом сообщении SO они обсуждают некоторые из этих идей. Я до сих пор не уверен, насколько большего я смогу достичь с помощью этого распараллеливания, поскольку он использует функцию foreach. По моему опыту, функция foreach не подходит Идея для миллионов быстрых операций, потому что коммуникационные усилия между ядрами в конечном итоге замедляют усилия по распараллеливанию.


person Dnaiel    schedule 29.09.2013    source источник
comment
Пожалуйста, уточните, что означают слова «объединять» и «агрегировать». При этом на ум приходят 3 функции: list, c и paste. И какова функция этого кода. Извлекаем ли мы столбцы из фреймов данных или работаем над data.tables? Каковы структуры 'block.read.parent.cigar' и других входных переменных ..... объясните эту проблему лучше! (Очевидно, кто-то еще согласен. Это не мой голос против.)   -  person IRTFM    schedule 30.09.2013
comment
@Dwin, спасибо! Я не уверен, достаточно ли я разъяснил в вопросе Q, но основной вопрос заключается в том, как ускорить операцию агрегирования для большой таблицы данных, подобной той, что в приведенном выше примере. Также нужно иметь в виду, что я могу использовать несколько ядер, поэтому могут быть некоторые умные идеи параллелизации, которые могут значительно ускорить такую ​​операцию. Надеюсь, это поможет, я добавил пример   -  person Dnaiel    schedule 30.09.2013
comment
Я тоже не голосую против, но похоже, что хранение ваших данных таким образом (с символьными векторами), как правило, будет медленным, а их объединение только замедлит вас (если вы не экспортируете для использования в другом программном обеспечении), поскольку вам нужно будет снова и снова разбивать строки для анализа. Вероятно, вам следует использовать специализированный пакет для сигар ... Я ничего об этом не знаю, но вы уже были перенаправлены на один из предыдущих вопросов ... stackoverflow.com/q/18969698/1191259   -  person Frank    schedule 30.09.2013
comment
Я не голосовал против. Но я бы сделал это потому, что вы не предоставили никакой информации о данных. Если read.index является индексом строки, то, конечно, группировка каждой строки в строку сама по себе будет медленной. Вы будете звонить paste миллионы раз. Вы использовали Rprof? Вы использовали verbose=TRUE? И вы используете такие слова, как "слишком медленно", не называя цифр. Фактически, я уговорил себя проголосовать против него сейчас. Это можно изменить, если вы улучшите вопрос.   -  person Matt Dowle    schedule 30.09.2013
comment
@MatthewDowle, спасибо, вы видели игрушечный пример? Из игрушечного примера видно, что индекс чтения - это идентификатор, а не индекс строки. Это правда, что в среднем данный индекс чтения может появляться несколько раз, поэтому на практике я вызываю пасту миллионы раз, и в этом суть: как я могу использовать НЕСКОЛЬКО ЯДЕР для Ускорения такой операции? Как я могу использовать возможности параллельных вычислений, чтобы такая линия работала намного быстрее? Может быть, есть способ сделать это, или, может быть, для этого нужно кардинально изменить код. Не уверен в ответе, поэтому я задал этот вопрос.   -  person Dnaiel    schedule 30.09.2013
comment
@MatthewDowle относительно формулировки, я думаю, вы правы, не нужно говорить супер медленно или что-то в этом роде, поскольку все это связано с используемой структурой data.table, я не хотел критиковать data.table, на самом деле Мне нравится этот пакет, и я использую его регулярно, я просто искал способы ускорить эту строку в моем коде.   -  person Dnaiel    schedule 30.09.2013
comment
Без проблем. Вопрос улучшается. Если вы сделаете воспроизводимый большой пример и укажете время, которое вы получите, тогда я посмотрю (и, возможно, другие тоже будут привлечены). Покажите небольшой пример n = 3, а затем скажите, что установите n = 1e6, и у меня это займет 2 часа, есть ли более быстрый способ ?. Что-то такое.   -  person Matt Dowle    schedule 30.09.2013
comment
@MatthewDowle, спасибо, я подготовлю пример и опубликую его сегодня же, хотя я понял серьезную ошибку в моем коде. Я загружал слишком большие таблицы данных в свой код и использовал 300 ГБ ОЗУ ... изменяя код и получая вернемся к этому вопросу позже сегодня ... большое спасибо!   -  person Dnaiel    schedule 30.09.2013
comment
Ха, мы все там были. Если это так, просто удалите вопрос.   -  person Matt Dowle    schedule 30.09.2013
comment
@MattDowle Я только что обновил вопрос примером. Теперь я читаю свои огромные данные кусками меньшего размера, поэтому больше не нужно злоупотреблять ОЗУ. Это все равно поможет мне ускорить эту часть, но я не знаю, как это сделать.   -  person Dnaiel    schedule 01.10.2013
comment
@Dnaiel Это отличный вопрос. +1. Я попробую посмотреть. Думаю, у некоторых респондентов просто есть новые каналы вопросов, поэтому, чтобы привлечь больше внимания, можно было бы предложить вознаграждение.   -  person Matt Dowle    schedule 02.10.2013
comment
@MattDowle большое спасибо, я рад, что улучшил такой запутанный вопрос :-) Не уверен, насколько это здорово, но это проблема, с которой я имею дело. Я все больше учусь, как задавать вопросы получше, так что это полезно для меня.   -  person Dnaiel    schedule 02.10.2013
comment
@Dnaiel, вы уверены, что предоставленная вами ЗАПИСКА дает вам ожидаемый результат. Вы это проверяли? Кроме того, вы уверены, что S в SNOTE;)   -  person Ricardo Saporta    schedule 06.10.2013


Ответы (2)


Если вам доступно несколько ядер, почему бы не воспользоваться тем фактом, что вы можете быстро фильтровать и группировать строки в таблице data.table с помощью ее ключа:

library(doMC)
registerDoMC(cores=4)


setkey(dt, "a")

finalRowOrderMatters = FALSE # FALSE can be faster
foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=finalRowOrderMatters) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]

Обратите внимание, что если количество уникальных групп (например, length(unique(a))) относительно невелико, будет быстрее отбросить аргумент .combine, вернуть результаты в список, а затем вызвать rbindlist для результатов. В моем тестировании на двух ядрах и 8 ГБ ОЗУ порог составлял около 9000 уникальных значений. Вот что я использовал для тестирования:

# (otion a)
round(rowMeans(replicate(3, system.time({
# ------- #
  foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar% 
     dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
# ------- #
}))), 3) 
# [1]  1.243 elapsed for N ==  1,000
# [1] 11.540 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 57.404 elapsed for N == 50,000



# (otion b)
round(rowMeans(replicate(3, system.time({
# ------- #
    results <- 
      foreach(x=unique(dt[["a"]])) %dopar% 
         dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    rbindlist(results)
# ------- #
}))), 3)
# [1]  1.117 elapsed for N ==  1,000
# [1] 10.567 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617
# [1] 76.613 elapsed for N == 50,000


## And used the following to create the dt
N <- 5e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
person Ricardo Saporta    schedule 06.10.2013
comment
Нужно ли каждому подпроцессу копировать всю таблицу data.table или все они обращаются к основному объекту data.table? - person Zach; 02.09.2014

Можете ли вы распараллелить агрегирование с data.table? да.

Стоит ли оно того? НЕТ. Это ключевой момент, который не удалось выделить в предыдущем ответе.

Как объясняет Мэтт Доул в data.table и parallel computing, копии (" фрагменты ") должны быть созданы перед распределением при параллельном выполнении операций. Это замедляет работу. В некоторых случаях, когда вы не можете использовать data.table (например, выполняете множество линейных регрессий), стоит разделить задачи между ядрами. Но не агрегация - по крайней мере, когда задействовано data.table.

Короче (и пока не будет доказано обратное) агрегируйте с помощью data.table и не беспокойтесь о потенциальном увеличении скорости с помощью doMC. data.table уже работает быстро по сравнению со всем остальным, когда дело доходит до агрегирования, даже если оно не многоядерное!


Вот несколько тестов, которые вы можете запустить для себя, сравнивая data.table внутреннее агрегирование с использованием by с foreach и mclapply. Результаты указаны первыми.

#-----------------------------------------------

# TL;DR FINAL RESULTS (Best to Worst)
# 3 replications, N = 10000:
# (1)  0.007 -- data.table using `by`
# (2)  3.548 -- mclapply with rbindlist
# (3)  5.557 -- foreach with rbindlist
# (4)  5.959 -- foreach with .combine = "rbind"
# (5) 14.029 -- lapply

# ----------------------------------------------

library(data.table)

## And used the following to create the dt
N <- 1e4
set.seed(1)
a = sample(1:N, N*2, replace = TRUE)
b = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
d = sample(c("3m","2m2d2m","3m2d1i3s2d","5m","4m","9m","1m"), N*2, replace = TRUE)
e = a
dt = data.table(a = a, b = b, d = d, e = e, key="a")
setkey(dt, "a")

# TEST AGGREGATION WITHOUT PARALLELIZATION ---------------------------
## using data.tables `by` to aggregate
round(rowMeans(replicate(3, system.time({
    dt[,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1], by=a)]
}))), 3)
# [1] 0.007 elapsed for N == 10,000, length(unique(dt[["a"]])) == 8617

## using `lapply`
round(rowMeans(replicate(3, system.time({
    results <- lapply(unique(dt[["a"]]), function(x) {
        dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[1])]
    })
    rbindlist(results)
}))), 3)
# [1] 14.029 elapsed for N == 10,000

# USING `mclapply` FORKING ---------------------------------
## use mclapply
round(rowMeans(replicate(3, system.time({
    results <- mclapply(unique(dt[["a"]]),
    function(x) {
        dt[.(x), list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    }, mc.cores=4)
    rbindlist(results)
}))), 3)
# [1] 3.548 elapsed for N == 10,000


# PARALLELIZATION USING `doMC` PACKAGE ---------------------------------
library(doMC)
mc = 4
registerDoMC(cores=mc)
getDoParWorkers()
# [1] 4

## (option a) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
    foreach(x=unique(dt[["a"]]), .combine="rbind", .inorder=FALSE) %dopar%
    dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
}))), 3)
# [1] 5.959 elapsed for N == 10,000

## (option b) by Ricardo Saporta
round(rowMeans(replicate(3, system.time({
    results <-
      foreach(x=unique(dt[["a"]])) %dopar%
        dt[.(x) ,list(b = paste(b, collapse=""), d = paste(d, collapse=""), e = e[[1]])]
    rbindlist(results)
}))), 3)
# [1] 5.557 elapsed for N == 10,000

registerDoSEQ()
getDoParWorkers()
# [1] 1
person Danton Noriega    schedule 23.06.2015