Ошибка сегментации многопоточности C++ 11

Введение

У меня есть вектор entities, содержащий 44 миллиона имен. Я хочу разделить его на 4 части и обрабатывать каждую часть параллельно. Класс Freebase содержит функцию loadData(), которая используется для разделения вектора и вызова функции multiThread для выполнения обработки.

  • loadEntities() читает текстовый файл, содержащий имена. Я не помещал реализацию в класс, потому что это не важно
  • loadData() разбивает вектор entities, который был инициализирован в конструкторе, на 4 части и добавляет к каждой части vector<thread> threads следующим образом:

threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
  • multiThread — это функция, в которой я обрабатываю файлы
  • i и i+right — индексы, используемые в цикле for многопоточности для перебора объектов.
  • returnValues является подфункцией multiThread и используется для вызова внешней функции.

Проблема

cout <<"Entity " << entities[i] << endl; показывает следующие результаты:

  • Сущность m.0rzf6wv (хорошо)
  • Сущность m.0rzf70 (хорошо)
  • Сущность m.068s4h9 m.0n_k8bz (НЕПРАВИЛЬНО)
  • Объект Объект m.068s5_1 (НЕВЕРНО)

Последние 2 вывода неверны. Вывод должен быть:

  • Entity name нет entity entity name и entity name name

Это вызывает ошибку сегментации, когда входные данные отправляются в функцию returnValues. Как я могу это решить?


Исходный код

#ifndef FREEBASE_H
#define FREEBASE_H

class Freebase
{
 public:
    Freebase(const std::string &, const std::string &, const std::string &, const std::string &);
    void loadData();
 private:
   std::string _serverURL;
   std::string _entities;
   std::string _xmlFile;
   void multiThread(int,int, std::vector<std::pair<std::string, std::string>> &);
   //private data members
   std::vector<std::string> entities;
};

#endif

#include "Freebase.h"
#include "queries/SparqlQuery.h"

Freebase::Freebase(const string & url, const string & e, const string & xmlFile, const string & tfidfDatabase):_serverURL(url), _entities(e), _xmlFile(xmlFile), _tfidfDatabase(tfidfDatabase)
{
  entities = loadEntities();
}

void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
  string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
  for(int i = start; i < end; i++)
  {
     cout <<"Entity " << entities[i] << endl;
     vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
     string desc = "";
     for(auto &d: description)
     {
       desc += d.first + " ";
     }
     data.push_back(make_pair(entities[i], desc));
  }
}


void Freebase::loadData()
{
  vector<pair<string, string>> data;
  vector<thread> threads;
  int Size = entities.size();
  //split database into 4 parts
  int p = 4;
  int right = round((double)Size / (double)p);
  int left = Size % p;
  float totalduration = 0;
  
  vector<pair<int, int>> coordinates;
  int counter = 0;
  for(int i = 0; i < Size; i += right)
  {

      if(i < Size - right)
      {
      threads.push_back(thread(&Freebase::multiThread, this, i, i + right, ref(data)));
      }
      else
      {
      threads.push_back(thread(&Freebase::multiThread, this, i, Size, ref(data)));
      }
      
  }//end outer for
  
   for(auto &t : threads)
   {
      t.join();
   }
   
}


vector<pair<string, string>>  Freebase::returnValues(const string & query)
{
  vector<pair<string, string>> data;
  SparqlQuery sparql(query, _serverURL);
  string result = sparql.retrieveInformations();
  istringstream str(result);
  string line;
  //skip first line
  getline(str,line);
  while(getline(str, line))
  {
    vector<string> values;
    line.erase(remove( line.begin(), line.end(), '\"' ), line.end());
    
    boost::split(values, line, boost::is_any_of("\t"));
    if(values.size() == 2)
    {
      pair<string,string> fact = make_pair(values[0], values[1]);
      data.push_back(fact);
    }
    else
    {
      data.push_back(make_pair(line, ""));
    }
  }
  
  return data;
}//end function

person Hani Goc    schedule 24.06.2015    source источник
comment
Я проверю эту ссылку @ArnonZilca. Я все еще задаюсь вопросом, передаю ли я неправильный ввод в returnValues   -  person Hani Goc    schedule 24.06.2015
comment
Похоже, вы вызываете data.push_back из своих потоков и небезопасно изменяете вектор из нескольких потоков - поправьте меня, если я ошибаюсь. Я думаю, что это причина вашей проблемы с segfault.   -  person Arnon Zilca    schedule 24.06.2015
comment
да исправить. Вот что я делаю. Я хочу иметь возможность передавать имена и результаты, которые я получаю из returnValues, в data   -  person Hani Goc    schedule 24.06.2015
comment
Как вы думаете, почему вывод на cout влияет на работу returnValues? Отсутствие синхронизации вывода приведет к искажению вывода, но это единственный эффект, который должен быть. Ваша проблема, скорее всего, связана с другой проблемой синхронизации (для другого общего ресурса).   -  person Sander De Dycker    schedule 24.06.2015
comment
Тогда я думаю, что вам лучше либо создать 4 результирующих вектора и объединить их после объединения, либо безопасно записать (используя мьютекс) результирующий вектор из каждого потока - я предполагаю, что использование нескольких векторов будет быть быстрее для небольшого количества потоков.   -  person Arnon Zilca    schedule 24.06.2015
comment
@ArnonZilca: Помимо количества потоков, здесь также будет играть роль количество элементов в результирующих векторах, верно?   -  person Timo    schedule 24.06.2015
comment
верно. Чем больше элементов, тем больше вы будете платить за накладные расходы мьютекса. Прочитав немного больше о безопасности потоков и векторах, я обнаружил, что вы можете безопасно писать в один и тот же вектор из нескольких потоков, если вы пишете в разные индексы (проверьте this out). Это решение избавит вас как от мьютексов (решения 1), так и от слияния векторов в конце (решения 2).   -  person Arnon Zilca    schedule 24.06.2015
comment
@ArnonZilca упоминается @Valdo Единственная ситуация, о которой вам нужно беспокоиться, это когда добавляются новые элементы, что невозможно в вашем случае. Таким образом, если вектор уже инициализирован, проблем не должно быть, поскольку в случае entities проблема в том, что когда я добавляю элемент в data. это правильно?   -  person Hani Goc    schedule 24.06.2015
comment
Вы вызываете push_back, который все равно добавляет элементы. Если вы их обновили, я думаю, все будет в порядке.   -  person Arnon Zilca    schedule 24.06.2015
comment
Но обновление элементов потребует от вас заранее знать, сколько элементов точно, всегда.   -  person Timo    schedule 24.06.2015


Ответы (1)


РЕДАКТИРОВАТЬ: Арнон Зилка прав в своих комментариях. Вы пишете в один вектор из нескольких потоков (в Freebase::multiThread()), рецепт катастрофы. Вы можете использовать мьютекс, как описано ниже, для защиты операции push_back.

Для получения дополнительной информации о безопасности потоков в контейнерах см. безопасно?.

So:

mtx.lock();
data.push_back(make_pair(entities[i], desc));
mtx.unlock();

Другой вариант — использовать ту же стратегию, что и в returnValues, создавая локальный вектор в multiThread и помещая содержимое в вектор данных только после завершения обработки потока.

So:

void Freebase::multiThread(int start, int end, vector<pair<string,string>> & data)
{
  vector<pair<string,string>> threadResults;
  string basekb = "PREFIX basekb:<http://rdf.basekb.com/ns/> ";
  for(int i = start; i < end; i++)
  {
     cout <<"Entity " << entities[i] << endl;
     vector<pair<string, string>> description = returnValues(basekb + "select ?description where {"+ entities[i] +" basekb:common.topic.description ?description. FILTER (lang(?description) = 'en') }");
     string desc = "";
     for(auto &d: description)
     {
       desc += d.first + " ";
     }
     threadResults.push_back(make_pair(entities[i], desc));
  }
  mtx.lock()
  data.insert(data.end(), threadResults.begin(), threadResults.end());
  mtx.unlock()
}

Примечание. Я бы предложил использовать мьютекс, отличный от того, который вы используете для cout. Общий результирующий вектор data отличается от ресурса cout. Таким образом, потоки, которые хотят использовать cout, не должны ждать, пока другой поток завершит работу с data.

/РЕДАКТИРОВАТЬ

Вы можете использовать мьютекс вокруг

cout <<"Entity " << entities[i] << endl;

Это предотвратило бы одновременное использование cout несколькими потоками. Таким образом, вы можете быть уверены, что все сообщение будет напечатано потоком до того, как другой поток напечатает сообщение. Обратите внимание, что это повлияет на вашу производительность, поскольку потокам придется ждать, пока мьютекс станет доступным, прежде чем им будет разрешено печатать.

Примечание. Защита cout очистит только ваш вывод в потоке, но не повлияет на поведение остального кода, см. выше.

См. http://www.cplusplus.com/reference/mutex/mutex/lock/ для примера.

// mutex::lock/unlock
#include <iostream>       // std::cout
#include <thread>         // std::thread
#include <mutex>          // std::mutex

std::mutex mtx;           // mutex for critical section

void print_thread_id (int id) {
  // critical section (exclusive access to std::cout signaled by locking mtx):
  mtx.lock();
  std::cout << "thread #" << id << '\n';
  mtx.unlock();
}

int main ()
{
  std::thread threads[10];
  // spawn 10 threads:
  for (int i=0; i<10; ++i)
    threads[i] = std::thread(print_thread_id,i+1);

  for (auto& th : threads) th.join();

  return 0;
}
person Timo    schedule 24.06.2015
comment
относительно функции returnValues, которую я вызываю в многопоточности. Должен ли он также быть окружен блокировкой и разблокировкой мьютекса? - person Hani Goc; 24.06.2015
comment
Нет, returnValues ​​использует только локальные переменные, которые должны быть потокобезопасными. Что касается ввода, это константная локальная переменная, поэтому функция имеет доступ только для чтения, что также не должно быть проблемой. - person Timo; 24.06.2015
comment
У меня есть вопрос относительно std::mutex mtx; должен ли он быть частным членом класса? - person Hani Goc; 24.06.2015
comment
Для cout это должна быть глобальная переменная, так как cout не является членом класса. Для вектора data он может быть частным членом класса FreeBase, поскольку каждый экземпляр FreeBase будет иметь свой собственный data. - person Timo; 24.06.2015
comment
Я к этому шел, просто слишком рано нажал Enter ;) - person Timo; 24.06.2015
comment
у меня есть вопрос относительно передачи shared_ptr в качестве параметров. я выложу, если все в порядке - person Hani Goc; 24.06.2015
comment
Я думаю, вам лучше создать новый вопрос для этого. - person Timo; 24.06.2015