Как мне обрабатывать несколько потоков в Java?

Я пытаюсь запустить процесс и делать что-то с его потоками ввода, вывода и ошибок. Очевидный способ сделать это — использовать что-то вроде select(), но единственное, что я могу найти в Java, — это Selector.select(), которое принимает Channel. Похоже, невозможно получить Channel из InputStream или OutputStreamFileStream есть метод getChannel(), но он здесь не помогает)

Итак, вместо этого я написал код для опроса всех потоков:

while( !out_eof || !err_eof )
{
    while( out_str.available() )
    {
        if( (bytes = out_str.read(buf)) != -1 )
        {
            // Do something with output stream
        }
        else
            out_eof = true;
    }
    while( err_str.available() )
    {
        if( (bytes = err_str.read(buf)) != -1 )
        {
            // Do something with error stream
        }
        else
            err_eof = true;
    }
    sleep(100);
}

который работает, за исключением того, что он никогда не заканчивается. Когда один из потоков достигает конца файла, available() возвращает ноль, поэтому read() не вызывается, и мы никогда не получаем возврат -1, который указывал бы на EOF.

Одним из решений может быть неблокирующий способ обнаружения EOF. Нигде в документах не вижу. В качестве альтернативы есть лучший способ сделать то, что я хочу сделать?

Я вижу этот вопрос здесь: текст ссылки, и хотя это не совсем то, что я хочу, я, вероятно, могу использовать эту идею создания отдельных потоков для каждого потока для конкретной проблемы, с которой я столкнулся сейчас. Но ведь это не единственный способ сделать это? Наверняка должен быть способ чтения из нескольких потоков без использования потока для каждого?


person Mark Baker    schedule 24.09.2008    source источник


Ответы (2)


Как вы сказали, решение описано в этом ответе — это традиционный способ чтения stdout и stderr из процесса. Поток на поток — это то, что нужно, хотя это немного раздражает.

person Matt Quail    schedule 24.09.2008

Вам действительно придется пойти по пути создания потока для каждого потока, который вы хотите отслеживать. Если ваш вариант использования позволяет комбинировать как stdout, так и stderr рассматриваемого процесса, вам нужен только один поток, в противном случае нужны два.

Мне потребовалось довольно много времени, чтобы сделать это правильно в одном из наших проектов, где я должен запустить внешний процесс, получить его вывод и что-то с ним сделать, в то же время ища ошибки и завершение процесса, а также имея возможность его завершить. когда пользователь приложения Java отменяет операцию.

Я создал довольно простой класс для инкапсуляции наблюдающей части, чей метод run() выглядит примерно так:

public void run() {
    BufferedReader tStreamReader = null;
    try {
        while (externalCommand == null && !shouldHalt) {
            logger.warning("ExtProcMonitor("
                           + (watchStdErr ? "err" : "out")
                           + ") Sleeping until external command is found");
            Thread.sleep(500);
        }
        if (externalCommand == null) {
            return;
        }
        tStreamReader =
                new BufferedReader(new InputStreamReader(watchStdErr ? externalCommand.getErrorStream()
                        : externalCommand.getInputStream()));
        String tLine;
        while ((tLine = tStreamReader.readLine()) != null) {
            logger.severe(tLine);
            if (filter != null) {
                if (filter.matches(tLine)) {
                    informFilterListeners(tLine);
                    return;
                }
            }
        }
    } catch (IOException e) {
        logger.logExceptionMessage(e, "IOException stderr");
    } catch (InterruptedException e) {
        logger.logExceptionMessage(e, "InterruptedException waiting for external process");
    } finally {
        if (tStreamReader != null) {
            try {
                tStreamReader.close();
            } catch (IOException e) {
                // ignore
            }
        }
    }
}

На вызывающей стороне это выглядит так:

    Thread tExtMonitorThread = new Thread(new Runnable() {

        public void run() {
            try {
                while (externalCommand == null) {
                    getLogger().warning("Monitor: Sleeping until external command is found");
                    Thread.sleep(500);
                    if (isStopRequested()) {
                        getLogger()
                                .warning("Terminating external process on user request");
                        if (externalCommand != null) {
                            externalCommand.destroy();
                        }
                        return;
                    }
                }
                int tReturnCode = externalCommand.waitFor();
                getLogger().warning("External command exited with code " + tReturnCode);
            } catch (InterruptedException e) {
                getLogger().logExceptionMessage(e, "Interrupted while waiting for external command to exit");
            }
        }
    }, "ExtCommandWaiter");

    ExternalProcessOutputHandlerThread tExtErrThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdErr", getLogger(), true);
    ExternalProcessOutputHandlerThread tExtOutThread =
            new ExternalProcessOutputHandlerThread("ExtCommandStdOut", getLogger(), true);
    tExtMonitorThread.start();
    tExtOutThread.start();
    tExtErrThread.start();
    tExtErrThread.setFilter(new FilterFunctor() {

        public boolean matches(Object o) {
            String tLine = (String)o;
            return tLine.indexOf("Error") > -1;
        }
    });

    FilterListener tListener = new FilterListener() {
        private boolean abortFlag = false;

        public boolean shouldAbort() {
            return abortFlag;
        }

        public void matched(String aLine) {
            abortFlag = abortFlag || (aLine.indexOf("Error") > -1);
        }

    };

    tExtErrThread.addFilterListener(tListener);
    externalCommand = new ProcessBuilder(aCommand).start();
    tExtErrThread.setProcess(externalCommand);
    try {
        tExtMonitorThread.join();
        tExtErrThread.join();
        tExtOutThread.join();
    } catch (InterruptedException e) {
        // when this happens try to bring the external process down 
        getLogger().severe("Aborted because auf InterruptedException.");
        getLogger().severe("Killing external command...");
        externalCommand.destroy();
        getLogger().severe("External command killed.");
        externalCommand = null;
        return -42;
    }
    int tRetVal = tListener.shouldAbort() ? -44 : externalCommand.exitValue();

    externalCommand = null;
    try {
        getLogger().warning("command exit code: " + tRetVal);
    } catch (IllegalThreadStateException ex) {
        getLogger().warning("command exit code: unknown");
    }
    return tRetVal;

К сожалению, мне не нужно для автономного исполняемого примера, но, возможно, это поможет. Если бы мне пришлось сделать это снова, я бы еще раз взглянул на использование метода Thread.interrupt() вместо самодельного флага остановки (не забудьте объявить его volatile!), но я оставлю это на другой раз. :)

person Daniel Schneller    schedule 24.09.2008