Ошибка Broken Pipe приводит к сбою потокового задания Elastic MapReduce на AWS

Все работает нормально локально, когда я делаю следующее:

cat input | python mapper.py | sort | python reducer.py

Однако когда я запускаю потоковое задание MapReduce на AWS Elastic Mapreduce, задание не завершается успешно. mapper.py проходит часть пути (я знаю это, потому что писал stderr по пути). Mapper прерывается ошибкой «Broken Pipe», которую я могу получить из системного журнала попытки задачи после ее сбоя:

java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
    at java.io.DataOutputStream.write(DataOutputStream.java:90)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)


2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
    at java.io.DataOutputStream.flush(DataOutputStream.java:106)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:124)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished
2012-03-26 07:19:05,400 WARN org.apache.hadoop.streaming.PipeMapRed (main): java.io.IOException: Bad file descriptor
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.flush(BufferedOutputStream.java:123)
    at java.io.DataOutputStream.flush(DataOutputStream.java:106)
    at org.apache.hadoop.streaming.PipeMapRed.mapRedFinished(PipeMapRed.java:579)
    at org.apache.hadoop.streaming.PipeMapper.close(PipeMapper.java:135)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:57)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)

2012-03-26 07:19:05,400 INFO org.apache.hadoop.streaming.PipeMapRed (main): mapRedFinished
2012-03-26 07:19:05,405 INFO org.apache.hadoop.streaming.PipeMapRed (Thread-13): MRErrorThread done
2012-03-26 07:19:05,408 INFO org.apache.hadoop.mapred.TaskLogsTruncater (main): Initializing logs' truncater with mapRetainSize=-1 and reduceRetainSize=-1
2012-03-26 07:19:05,519 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Initialized cache for UID to User mapping with a cache timeout of 14400 seconds.
2012-03-26 07:19:05,520 INFO org.apache.hadoop.io.nativeio.NativeIO (main): Got UserName hadoop for UID 106 from the native implementation
2012-03-26 07:19:05,522 WARN org.apache.hadoop.mapred.Child (main): Error running child
java.io.IOException: log:null
R/W/S=7018/3/0 in:NA [rec/s] out:NA [rec/s]
minRecWrittenToEnableSkip_=9223372036854775807 LOGNAME=null
HOST=null
USER=hadoop
HADOOP_USER=null
last Hadoop input: |null|
last tool output: |text/html    1|
Date: Mon Mar 26 07:19:05 UTC 2012
java.io.IOException: Broken pipe
    at java.io.FileOutputStream.writeBytes(Native Method)
    at java.io.FileOutputStream.write(FileOutputStream.java:282)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:105)
    at java.io.BufferedOutputStream.flushBuffer(BufferedOutputStream.java:65)
    at java.io.BufferedOutputStream.write(BufferedOutputStream.java:109)
    at java.io.DataOutputStream.write(DataOutputStream.java:90)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeUTF8(TextInputWriter.java:72)
    at org.apache.hadoop.streaming.io.TextInputWriter.writeValue(TextInputWriter.java:51)
    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:109)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)


    at org.apache.hadoop.streaming.PipeMapper.map(PipeMapper.java:125)
    at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:50)
    at org.apache.hadoop.streaming.PipeMapRunner.run(PipeMapRunner.java:36)
    at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:441)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:377)
    at org.apache.hadoop.mapred.Child$4.run(Child.java:255)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:396)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1059)
    at org.apache.hadoop.mapred.Child.main(Child.java:249)
2012-03-26 07:19:05,525 INFO org.apache.hadoop.mapred.Task (main): Runnning cleanup for the task
2012-03-26 07:19:05,526 INFO org.apache.hadoop.mapred.DirectFileOutputCommitter (main): Nothing to clean up on abort since there are no temporary files written

Вот mapper.py. Обратите внимание, что я пишу в stderr, чтобы предоставить себе отладочную информацию:

#!/usr/bin/env python

import sys
from warc import ARCFile

def main():
    warc_file = ARCFile(fileobj=sys.stdin)
    for web_page in warc_file:
        print >> sys.stderr, '%s\t%s' % (web_page.header.content_type, 1) #For debugging
        print '%s\t%s' % (web_page.header.content_type, 1)
    print >> sys.stderr, 'done' #For debugging
if __name__ == "__main__":
    main()

Вот что я получаю в stderr для попытки задачи при запуске mapper.py:

text/html   1
text/html   1
text/html   1

По сути, цикл выполняется 3 раза, а затем резко останавливается, и Python не выдает никаких ошибок. (Примечание: он должен выводить тысячи строк). Даже неперехваченное исключение должно появиться в stderr.

Поскольку MapReduce отлично работает на моем локальном компьютере, я предполагаю, что это проблема того, как Hadoop обрабатывает вывод, который я печатаю из mapper.py. Но я понятия не имею, в чем может быть проблема.


person bgcode    schedule 26.03.2012    source источник


Ответы (3)


Ваш процесс потоковой передачи (ваш скрипт Python) завершается преждевременно. Это может быть связано с тем, что ввод завершен (например, интерпретация EOF) или проглочено исключение. В любом случае Hadoop пытается подключиться через STDIN к вашему сценарию, но, поскольку приложение завершило работу (и, следовательно, STDIN больше не является допустимым файловым дескриптором), вы получаете ошибку BrokenPipe. Я бы предложил добавить трассировку stderr в ваш скрипт, чтобы увидеть, какая строка ввода вызывает проблему. Удачного кодирования,

-Джефф

person ghayes    schedule 29.03.2012
comment
babonk, можете ли вы предоставить подробную информацию о том, как вы решили свою проблему, используя этот совет? - person Eric Conner; 26.07.2013
comment
Такой же. У меня, по-видимому, есть аналогичная ошибка здесь: " title="aws elastic mapreduce, похоже, неправильно преобразует потоковую передачу в j">stackoverflow.com/questions/18556270/, и, учитывая, что она работает при передаче по каналу, я не знаю, как исправить это для потоковой передачи. - person Mittenchops; 09.09.2013

Об этом говорится в принятой ошибке, но позвольте мне попытаться уточнить - вы должны блокировать на стандартном вводе, даже если вам это не нужно! Это не то же самое, что и каналы Linux, так что не позволяйте этому обмануть вас. Интуитивно происходит то, что Streaming запускает ваш исполняемый файл, а затем говорит: «Подожди здесь, пока я поищу информацию для тебя». Если ваш исполняемый файл останавливается по какой-либо причине до того, как Streaming отправит вам 100% входных данных, Streaming говорит: «Эй, куда делся тот исполняемый файл, который я встал?» ... Хмммм ... канал сломан, позвольте мне вызвать это исключение! Итак, вот некоторый код Python, все, что он делает, это то, что делает cat, но вы заметите, что этот код не завершится, пока не будет обработан весь ввод, и это ключевой момент:

#!/usr/bin/python
import sys

while True:
    s = sys.stdin.readline()
    if not s:
        break
    sys.stdout.write(s)
person James Madison    schedule 14.08.2014
comment
Я получал эту ошибку, потому что ничего не делал с вводом. Я добавил этот код (хотя он мне ничего не дает) и ошибка пошла. - person schoon; 20.08.2014

У меня нет опыта работы с Hadoop на AWS, но у меня была такая же ошибка в обычном кластере Hadoop — и в моем случае проблема заключалась в том, как я начал, python -mapper ./mapper.py -reducer ./reducer.py работал, а -mapper python mapper.py — нет.

Вы также, кажется, используете нестандартный пакет Python warc, вы отправляете необходимые файлы в потоковую работу? -cacheFiles или -cacheArchive могут быть полезны.

person kei1aeh5quahQu4U    schedule 29.03.2012
comment
Как вы включаете нестандартные пакеты Python? Эластичная карта AWS, в частности, похоже, не делает доступными такие параметры, как файлы кеша. - person Mittenchops; 09.09.2013