Нахождение среднего с помощью сокращения и сбора

Я пытаюсь понять новые API-интерфейсы Java 8 Stream.

http://docs.oracle.com/javase/tutorial/collections/streams/reduction.html

Я нашел пример нахождения среднего числа с помощью API сбора. Но я чувствовал, что то же самое можно сделать и с помощью функции reduce().

public class Test {

    public static void main(String[] args) {
        // Using collect
        System.out.println(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .collect(Averager::new, Averager::accept, Averager::combine)
            .average());

        // Using reduce
        System.out.println(Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .reduce(new Averager(), (t, u) -> {
                t.accept(u);
                return t;
            }, (t, u) -> {
                t.combine(u);
                return t;
            }).average());
    }

    private static class Averager {
        private int total = 0;
        private int count = 0;

        public Averager() {
            // System.out.println("Creating averager");
        }

        public double average() {
            // System.out.println("Finding average");
            return count > 0 ? ((double) total) / count : 0;
        }

        public void accept(int i) {
            // System.out.println("Accepting " + i);
            total += i;
            count++;
        }

        public void combine(Averager other) {
            // System.out.println("Combining the averager : " + other);
            total += other.total;
            count += other.count;
        }

        @Override
        public String toString() {
            return "[total : " + total + ", count: " + count + "]";
        }
    }
}

1) Есть ли какая-то причина, по которой я должен использовать здесь сбор вместо уменьшения?
2) Если я включу все системные выходы отладки, я увижу, что операции, выполняемые между сбором и уменьшением, абсолютно одинаковы. И объединитель вообще не использовался в обоих случаях.
3) Если я делаю потоки параллельными, сбор всегда возвращает мне правильный результат. Метод reduce() каждый раз дает разные результаты.
4) Не следует ли мне использовать метод reduce в параллельных потоках?

Спасибо,
Пол


person Paul Nibin    schedule 14.05.2014    source источник


Ответы (1)


Разница между reduce и collect заключается в том, что collect — это расширенная форма сокращения, которая может работать с изменяемыми объектами параллельно. Алгоритм collect ограничивает потоки различных объектов результатов, чтобы их можно было безопасно изменять, даже если они не являются потокобезопасными. Вот почему Averager работает с collect. Для последовательных вычислений с использованием reduce это обычно не имеет значения, но для параллельных вычислений, как вы заметили, это даст неправильные результаты.

Ключевым моментом является то, что reduce работает до тех пор, пока имеет дело с значениями, но не с изменяемыми объектами. Вы можете увидеть это, взглянув на первый аргумент reduce. Код примера передает new Averager(), который представляет собой один объект, который используется в качестве значения идентификатора несколькими потоками при параллельном сокращении. Принцип работы параллельных потоков заключается в том, что рабочая нагрузка разбивается на сегменты, которые обрабатываются отдельными потоками. Если несколько потоков изменяют один и тот же (не потокобезопасный) объект, должно быть ясно, почему это приведет к неправильным результатам.

Можно использовать reduce для вычисления среднего, но вам нужно сделать ваш объект накопления неизменяемым. Рассмотрим объект ImmutableAverager:

static class ImmutableAverager {
    private final int total;
    private final int count;

    public ImmutableAverager() {
        this.total = 0;
        this.count = 0;
    }
    
    public ImmutableAverager(int total, int count) {
        this.total = total;
        this.count = count;
    }

    public double average() {
        return count > 0 ? ((double) total) / count : 0;
    }

    public ImmutableAverager accept(int i) {
        return new ImmutableAverager(total + i, count + 1);
    }

    public ImmutableAverager combine(ImmutableAverager other) {
        return new ImmutableAverager(total + other.total, count + other.count);
    }
}

Обратите внимание, что я изменил сигнатуры accept и combine, чтобы вернуть новый ImmutableAverager вместо мутации this. (Эти изменения также заставляют методы сопоставлять аргументы функций с reduce, поэтому мы можем использовать ссылки на методы.) Вы должны использовать ImmutableAverager следующим образом:

    double average = Stream.of(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
            .parallel()
            .reduce(new ImmutableAverager(), 
                    ImmutableAverager::accept,
                    ImmutableAverager::combine)
            .average();
    System.out.println("Average: "+average);

Использование неизменяемых объектов значений с reduce должно давать правильные результаты параллельно.

Наконец, обратите внимание, что IntStream и DoubleStream имеют summaryStatistics() методов, а Collectors имеет averagingDouble, averagingInt и averagingLong методы, которые могут выполнять эти вычисления за вас. Однако я думаю, что вопрос больше о механике сбора и редукции, чем о том, как сделать усреднение наиболее лаконичным.

person Stuart Marks    schedule 14.05.2014
comment
Спасибо Вам за такой подробный ответ. - person Paul Nibin; 15.05.2014
comment
Одно маленькое уточнение: сбор на самом деле не является специализацией редукции, а наоборот. Любая редукция может быть выражена как коллекция, в то время как не существует общего способа выразить коллекцию как редакцию (или, по крайней мере, нет способа без принуждения клиентского кода к управлению параллелизмом). Так что фактически редукция является специализированной формой сбора. - person Maurice Naftalin; 22.05.2014