Как правильно обрабатывать исключение spark.sql.AnalysisException

Я использую Spark Dataset API для выполнения операций с JSON для извлечения определенных полей по мере необходимости. Однако, когда спецификация, которую я предоставляю, чтобы искра знала, какое поле извлекать, идет не так, как надо, искра выплевывает

org.apache.spark.sql.AnalysisException

Как можно обрабатывать непроверенные исключения во время выполнения в подобном сценарии распределенной обработки? Я понимаю, что бросок try-catch приведет к сортировке вещей, но каков рекомендуемый способ справиться с таким сценарием?

dataset = dataset.withColumn(current, functions.explode(dataset.col(parent + Constants.PUNCTUATION_PERIOD + child.substring(0, child.length() - 2))));

comment
а если поймают, что будешь делать?   -  person thebluephantom    schedule 02.07.2018
comment
Оберните исключение в одно из моих пользовательских исключений и выбросьте его оттуда, чтобы вызывающий объект использовал его для обработки исключения.   -  person Infamous    schedule 02.07.2018
comment
Но на практике это не означает остановиться, исправить и запустить заново. Это было больше моей точкой зрения.   -  person thebluephantom    schedule 02.07.2018
comment
Хм, да, это один из способов, но обертывание исключений времени выполнения, таких как это, моими собственными исключениями - хорошая идея ??   -  person Infamous    schedule 02.07.2018
comment
Нет, не спорю - это нормально, но в результате нам все равно, вероятно, придется перезапускаться после исправления. По крайней мере, в BI и DWH так было всегда.   -  person thebluephantom    schedule 02.07.2018


Ответы (1)


В scala вы должны просто обернуть вызов в Try и управлять сбоем. Что-то вроде:

val result = Try(executeSparkCode()) match {
    case s: Success(_) => s;
    case Failure(error: AnalysisException) => Failure(new MyException(error));
}

Примечание 1. Если ваш вопрос подразумевает, как управлять исключениями в scala, есть много документов и сообщений на эту тему (т.е. не бросайте). Например, вы можете проверить этот ответ (мой)

Примечание 2: у меня нет scala dev env прямо здесь, поэтому я не тестировал этот код)


Однако в java существует сложная ситуация: компилятор не ожидает исключения AnalysisException, которое не проверяется, поэтому вы не можете специально поймать это исключение. Вероятно, какое-то недопонимание scala/java, потому что scala не отслеживает проверенные исключения. Я сделал следующее:

try{
    return executeSparkCode();
} catch (Exception ex) {
    if(ex instanceOf AnalysisException){
        throw new MyException(ex);
    } else {
        throw ex; // unmanaged exceptions
    }
}

Примечание. В моем случае я также проверил содержимое сообщения об ошибке для определенного исключения, которым я должен управлять (т. е. путь не существует), и в этом случае я возвращаю пустой набор данных вместо того, чтобы создавать другое исключение. Я искал лучшее решение и случайно попал сюда...

person Juh_    schedule 13.11.2020
comment
Привет. Это было совсем недавно, когда я только что поступил на свою первую работу. Я использовал мульти-улов, последний улов был классом Exception. Он уже довольно давно работает в prod, выдает довольно приличную информацию/следы стека, когда что-то идет не так. - person Infamous; 14.11.2020
comment
Я пытаюсь ловить try {/*spark dataframe filtering query*/} catch { case e: Throwable => println(e.getMessage)} и вижу, что он все еще выбрасывает AnalysisException и вылетает :( Разве это не Throwable? - person Rimer; 14.04.2021
comment
Судя по вашим словам, должно работать. Возможно, проблема в другом, но мне нужно увидеть больше кода, чтобы знать. Вы проверили трассировку стека, чтобы убедиться, что исключение вызвано внутри try/catch? Пример: искра ленива, ошибки возникают в методах призыва к действию (запись, сбор), а не в вызове фильтра, даже если проблема в вызове фильтра. - person Juh_; 15.04.2021