Любая причина, по которой CachedThreadPool будет работать на полную мощность, но не инициирует второй поток?

Представьте себе ресурсоемкое программное обеспечение, которое берет кучу текстовых файлов (по 100+ МБ каждый), обрабатывает их и помещает в БД. Я пытаюсь немного оптимизировать его, используя больше ядер (точно 8 для этой машины, четырехъядерный i7 с гиперпоточностью).

Рассмотрим следующий фрагмент кода:

    ExecutorService es = Executors.newCachedThreadPool(
            new ThreadFactory() {
                private final AtomicInteger threadNumber = new AtomicInteger(1);
                private final String namePrefix = "awesome-thread-";

                public Thread newThread(Runnable r) {
                    Thread t = new Thread(r, namePrefix + threadNumber.getAndIncrement());
                    if (t.isDaemon()) 
                        t.setDaemon(false);
                    return t;
        }
    });

    while((e = upp.getNextEntry()) != null){

        // start time-consuming process in a separate thread to speed up
        Future<Set<Fragment>> fut = es.submit(new FragmentTask(e.getSomeProperty()));       

        /* do other stuff #sequentially# with entry e
         * it may or may not take as long as previous step
         * depending on e 
         */

        Set<Fragment> set = fut.get(); 
        for(Fragment frag : set){
            // do stuff with frag
        }                       
    }

Здесь FragmentTask содержит рекурсивный алгоритм, выполнение которого занимает от пары до нескольких тысяч миллисекунд, в зависимости от e.

Первоначально я реализовал пул потоков как FixedThreadPool, но когда я визуально проверил, как работают потоки (через JVisualVM), я понял, что чаще всего потоки простаивали. Я решил попробовать CachedThreadPool в качестве альтернативы, но похоже, что пул представляет собой один поток, который работает почти на 100% на протяжении всего цикла while. В ходе этого процесса вторичный поток для пула не создается, и другие ядра также в значительной степени бездействуют. Что действительно интересно, так это то, что «основной» рабочий поток, который выполняет остальную часть цикла while, «ожидает» практически все время.

Это я нахожу немного странным, так как я ожидаю, что по крайней мере два потока должны работать с более высокой эффективностью, один из которых выполняет FragmentTask, а другой выполняет остальную часть цикла while, вплоть до fut.get().

Есть идеи, что может происходить за кулисами? Является ли код «слишком последовательным» для использования пула потоков?


person posdef    schedule 20.02.2014    source источник
comment
Вы запускаете только одну задачу за раз, поэтому она должна создавать только один поток.   -  person Peter Lawrey    schedule 20.02.2014
comment
@PeterLawrey уверен, но тогда не будет ли у потока, выполняющего задачи, некоторое время простоя? Я имею в виду почти 100% загрузку в этом потоке и почти 0% в другом заставляет меня думать, что есть что-то подозрительное.   -  person posdef    schedule 20.02.2014
comment
Это может зависеть от того, что делает ваша задача. Я предлагаю вам профилировать приложение, чтобы увидеть, делает ли оно то, что вы ожидаете.   -  person Peter Lawrey    schedule 20.02.2014
comment
На основе ваших данных должно быть довольно ясно, что то, что вы делаете в основном потоке, требует гораздо меньше работы, чем то, что вы отправляете в пул.   -  person Marko Topolnik    schedule 20.02.2014


Ответы (2)


Проблема не в реализации пула потоков. Вы пытаетесь получить по одному Future за раз, так что ваша программа по существу является однопоточной.

Что вам нужно сделать, так это создать Collection ваших Callable и использовать:

final List<Future<Set<Fragment>>> results
    = executor.invokeAll(yourCollectionOfCallables);

Затем зациклите свой results. Пул потоков сделает все возможное, чтобы запустить потоки с новыми задачами после завершения одной задачи; более того, вам гарантируется, что все задачи завершены (успешно или нет), когда вы перебираете весь список.

person fge    schedule 20.02.2014
comment
Интересный подход... Однако он привел меня к серьезным проблемам с памятью; практически куча переполняется, и JVM резко отстает из-за всплесков GC. :/ - person posdef; 20.02.2014
comment
Это еще одна проблема, да... Вы читаете свои файлы в память или вы их сопоставляете (см. FileChannel.map())? - person fge; 20.02.2014

Вы неправильно используете фьючерсы для параллельного выполнения. Вам нужно сначала отправить все задачи и сохранить их фьючерсы, прежде чем звонить в любой фьючерс. Вызов get ожидает завершения задачи.

Сейчас вы отправляете задачу, которая выполняется в собственном потоке, а затем основной поток ожидает завершения задачи. Промыть и повторить.

Вы говорите, что ожидаете два потока. Это действительно то, что у вас есть - основной поток и один поток-исполнитель.

person K Erlandsson    schedule 20.02.2014