Как задачи Spark в одном и том же исполнителе совместно используют переменные (NumberFormatException с SimpleDateFormat)?

Документы искры говорят, что

По умолчанию, когда Spark выполняет функцию параллельно как набор задач на разных узлах, он отправляет копию каждой переменной, используемой в функции, для каждой задачи.

Если я создам Java SimpleDateFormat и использую его в операциях RDD, я получу исключение NumberFormatException: multiple points.

Я знаю, что SimpleDateFormat не является потокобезопасным. Но, как сказано в документации spark, этот объект SimpleDateFormat копируется в каждую задачу, поэтому не должно быть нескольких потоков, обращающихся к этому объекту.

Я предполагаю, что все задачи в одном исполнителе используют один и тот же объект SimpleDateFormate, я прав?


Эта программа печатает тот же объект java.text.SimpleDateFormat@f82ede60

object NormalVariable {

  // create dateFormat here doesn't change
  // val dateFormat = new SimpleDateFormat("yyyy.MM.dd")

  def main(args: Array[String]) {

    val dateFormat = new SimpleDateFormat("yyyy.MM.dd")

    val conf = new SparkConf().setAppName("Spark Test").setMaster("local[*]")
    val spark = new SparkContext(conf)

    val dates = Array[String]("1999.09.09", "2000.09.09", "2001.09.09", "2002.09.09", "2003.09.09")

    println(dateFormat)

    val resultes = spark.parallelize(dates).map { i =>
      println(dateFormat)
      dateFormat.parse(i)
    }.collect()

    println(resultes.mkString(" "))
    spark.stop()
  }
}

person moshangcheng    schedule 23.03.2015    source источник
comment
Не могли бы вы добавить код к вопросу?   -  person maasg    schedule 23.03.2015
comment
Я добавил свой код, не могли бы вы проверить мой код, спасибо.   -  person moshangcheng    schedule 23.03.2015
comment
Я запустил ваш код в оболочке Spark без исключений. Где именно вы получаете исключение?   -  person pzecevic    schedule 23.03.2015
comment
@pzecevic, это 'heisenbug', одновременное изменение непоточного безопасного режима SimpleDateFormat   -  person maasg    schedule 23.03.2015
comment
Я предполагаю, что это связано с тем, как Spark сериализует замыкания, ClosureCleaner и одноэлементные объекты Scala в JVM, но то, что происходит, выходит за рамки моего понимания. Вероятно, @joshrosen мог бы дать авторитетный ответ   -  person maasg    schedule 24.03.2015
comment
Тем временем, я думаю, вы хотите создать экземпляр SimpleDateFormat внутри замыкания, чтобы решить эту проблему. SimpleDateFormat использует изменяемый экземпляр Calendar внутри. Это замаскированное «изменчивое состояние».   -  person maasg    schedule 24.03.2015
comment
Спасибо за ответ @maasg. Мы решили проблему таким образом. Мне просто любопытно поведение Spark, которое не соответствует его документу.   -  person moshangcheng    schedule 24.03.2015
comment
Я обнаружил, что этот вопрос + ответ очень полезен для понимания того, что происходит: stackoverflow.com/questions/26369916/   -  person maasg    schedule 24.03.2015


Ответы (1)


Как вы знаете, SimpleDateFormat не является потокобезопасным.

Если Spark использует одно ядро ​​для каждого исполнителя (--executor-cores 1), все должно работать нормально. Но как только вы настраиваете более одного ядра для каждого исполнителя, ваш код теперь работает в многопоточном режиме, SimpleDateFormat совместно используется несколькими задачами Spark одновременно и может повредить данные и вызвать различные исключения.

Чтобы исправить это, вы можете использовать один из тех же подходов, что и для кода, отличного от Spark, а именно ThreadLocal, что гарантирует получение одной копии SimpleDateFormat для каждого потока.

В Java это выглядит так:

public class DateFormatTest {

  private static final ThreadLocal<DateFormat> df = new ThreadLocal<DateFormat>(){
    @Override
    protected DateFormat initialValue() {
        return new SimpleDateFormat("yyyyMMdd");
    }
  };

  public Date convert(String source) throws ParseException{
    Date d = df.get().parse(source);
    return d;
  }
}

и эквивалентный код в Scala работает точно так же, показанный здесь как сеанс spark-shell:

import java.text.SimpleDateFormat

object SafeFormat extends ThreadLocal[SimpleDateFormat] {
  override def initialValue = {
    new SimpleDateFormat("yyyyMMdd HHmmss")
  }
}

sc.parallelize(Seq("20180319 162058")).map(SafeFormat.get.parse(_)).collect

    res6: Array[java.util.Date] = Array(Mon Mar 19 16:20:58 GMT 2018)

Таким образом, вы должны определить ThreadLocal на верхнем уровне вашего задания class или object, а затем вызвать df.get для получения SimpleDateFormat в рамках операций RDD.

Видеть:

person DNA    schedule 19.03.2018