Пул потоков PHP

РЕДАКТИРОВАТЬ: Чтобы уточнить и упростить: я ищу «хороший» способ отправлять больше объектов Stackable в пул всякий раз, когда Stackable заканчивается (используя данные из этого первого Stackable для добавления второго). У меня есть идеи опроса объектов до тех пор, пока один из них не закончится (неэффективный и уродливый), и передачи ссылок на объект Pool (я не смог заставить его работать). Базовый код следующий: https://github.com/krakjoe/pthreads/blob/master/examples/Pooling.php

А теперь полное описание:

Я работаю над приложением на PHP, которое сильно разрослось и требует много времени. Из-за этого я пытаюсь выполнить многопоточность этого приложения, используя пул потоков (я знаю, что PHP - не лучший вариант, но я не хочу и не могу менять язык на данном этапе).

Проблема в том, что есть 2 этапа приложения, которые должны выполняться по порядку, и на каждом из них есть множество подзадач, которые могут выполняться одновременно. Итак, это процесс в моей голове:

  • На этапе 1 будет N подзадач, эти подзадачи будут объектами Stackable.
  • Когда подзадача i завершается, «основная» (та, которая создает пул, Stackables и т. Д.) Должна быть уведомлена и выполнить этап 2 для подзадачи i с некоторыми данными из подзадачи i (другой объект Stackable). На этом этапе будет M подзадач для каждой из подзадач этапа 1.

Я хотел бы использовать один и тот же пул потоков для потоков на этапе 1 и этапе 2, и единственное решение, которое я могу придумать для перехода от этапа 1 к этапу 2, - это опрос каждой из N подзадач, пока одна из них не завершится, а затем вызвать этап 2 для той, которая закончилась, и повторять, пока не закончатся все N подзадач.

Я использую в качестве базового кода пример пула потоков, включенного в исходный код pthreads, Джо Уоткинс.


person markmb    schedule 13.01.2014    source источник
comment
PHP - не лучший вариант - это мягко сказано.   -  person Niels Keurentjes    schedule 13.01.2014
comment
Я не понимал, что PHP стал потокобезопасным.   -  person Markus Malkusch    schedule 13.01.2014
comment
Я ищу способ перейти от этапа 1 ко этапу 2   -  person markmb    schedule 13.01.2014
comment
Этот PHP не является потокобезопасным, но сообщение в блоге было написано в 2008   -  person Joe Watkins    schedule 15.01.2014
comment
@JoeWatkins. Итак, PHP стал официально потокобезопасным? Вы можете добавить этот факт в качестве ответа на этот вопрос.   -  person Markus Malkusch    schedule 16.01.2014
comment
@MarkusMalkusch gist.github.com/krakjoe/6437782   -  person Joe Watkins    schedule 20.01.2014


Ответы (2)


Вам следует начать с чтения: https://gist.github.com/krakjoe/6437782

<?php
/**
* Normal worker
*/
class PooledWorker extends Worker {
    public function run(){}
}


/**
* Don't descend from pthreads, normal objects should be used for pools
*/
class Pool {
    protected $size;
    protected $workers;

    /**
    * Construct a worker pool of the given size
    * @param integer $size
    */  
    public function __construct($size) {
        $this->size = $size;
    }

    /**
    * Start worker threads
    */
    public function start() {
        while (@$worker++ < $this->size) {
            $this->workers[$worker] = new PooledWorker();
            $this->workers[$worker]->start();
        }
        return count($this->workers);
    }

    /**
    * Submit a task to pool
    */
    public function submit(Stackable $task) {
        $this->workers[array_rand($this->workers)]
            ->stack($task);
        return $task;
    }

    /**
    * Shutdown worker threads
    */
    public function shutdown() {
        foreach ($this->workers as $worker)
            $worker->shutdown();
    }
}

class StageTwo extends Stackable {
    /**
    * Construct StageTwo from a part of StageOne data
    * @param int $data
    */
    public function __construct($data) {
        $this->data = $data;
    }

    public function run(){
        printf(
            "Thread %lu got data: %d\n", 
            $this->worker->getThreadId(), $this->data);
    }
}

class StageOne extends Stackable {
    protected $done;

    /**
    * Construct StageOne with suitable storage for data
    * @param StagingData $data
    */
    public function __construct(StagingData $data) {
        $this->data = $data;
    }

    public function run() {
        /* create dummy data array */
        while (@$i++ < 100) {
            $this->data[] = mt_rand(
                20, 1000);
        }
        $this->done = true;
    }
}

/**
* StagingData to hold data from StageOne
*/
class StagingData extends Stackable {
    public function run() {}
}

/* stage and data reference arrays */
$one = [];
$two = [];
$data = [];

$pool = new Pool(8);
$pool->start();

/* construct stage one */
while (count($one) < 10) {
    $staging = new StagingData();
    /* maintain reference counts by storing return value in normal array in local scope */
    $one[] = $pool
        ->submit(new StageOne($staging));
    /* maintain reference counts */
    $data[] = $staging;
}

/* construct stage two */
while (count($one)) {

    /* find completed StageOne objects */
    foreach ($one as $id => $job) {
        /* if done is set, the data from this StageOne can be used */
        if ($job->done) {
            /* use each element of data to create new tasks for StageTwo */
            foreach ($job->data as $chunk) {
                /* submit stage two */
                $two[] = $pool
                    ->submit(new StageTwo($chunk));
            }

            /* no longer required */
            unset($one[$id]);
        }
    }

    /* in the real world, it is unecessary to keep polling the array */
    /* you probably have some work you want to do ... do it :) */
    if (count($one)) {
        /* everyone likes sleep ... */
        usleep(1000000);
    }
}

/* all tasks stacked, the pool can be shutdown */
$pool->shutdown();
?>

Выведет:

Thread 140012266239744 got data: 612
Thread 140012275222272 got data: 267
Thread 140012257257216 got data: 971
Thread 140012033140480 got data: 881
Thread 140012257257216 got data: 1000
Thread 140012016355072 got data: 261
Thread 140012257257216 got data: 510
Thread 140012016355072 got data: 148
Thread 140012016355072 got data: 501
Thread 140012257257216 got data: 767
Thread 140012024747776 got data: 504
Thread 140012033140480 got data: 401
Thread 140012275222272 got data: 20
<-- trimmed from 1000 lines -->
Thread 140012041533184 got data: 285
Thread 140012275222272 got data: 811
Thread 140012041533184 got data: 436
Thread 140012257257216 got data: 977
Thread 140012033140480 got data: 830
Thread 140012275222272 got data: 554
Thread 140012024747776 got data: 704
Thread 140012033140480 got data: 50
Thread 140012257257216 got data: 794
Thread 140012024747776 got data: 724
Thread 140012033140480 got data: 624
Thread 140012266239744 got data: 756
Thread 140012284204800 got data: 997
Thread 140012266239744 got data: 708
Thread 140012266239744 got data: 981

Поскольку вы хотите использовать один пул, у вас нет другого выбора, кроме как создавать задачи в основном контексте, в котором создается пул. Я могу представить себе другие решения, но вы специально просили о таком решении.

В зависимости от оборудования, которое у меня было в моем распоряжении, и характера задач и данных, которые должны быть обработаны, у меня вполне может быть несколько [небольших] пулов потоков, по одному для каждого рабочего, это позволит StageOne создавать объекты StageTwo в Worker контекст, который их выполняет, это может быть что-то для рассмотрения.

person Joe Watkins    schedule 15.01.2014
comment
Спасибо за ответ, очень интересное чтение. Но это не совсем то, что мне нужно. Когда Stackables из этапа 1 заканчивается, данные должны быть обработаны и должны быть созданы новые Stackables. Для каждого Stackable из этапа 1 может быть около 1000 Stackable на этапе 2. - person markmb; 15.01.2014
comment
Сегодня я второй по загруженности парень на земле, позвольте мне вернуться к этому завтра и подать вам лучший пример ... - person Joe Watkins; 15.01.2014
comment
Похоже на то, что я пытаюсь сделать: я хочу, чтобы задача или работник решали, какая следующая задача будет выполнена для этого бита данных. В конце концов, я выбрал механизм очередей, который определяет, какую задачу нужно выполнять с каким битом данных, - основной цикл программы, выбирающий задачи из очереди и отправляющий их в пул. Однако до сих пор не устранены все ошибки - самая важная из них: это не работает. - person Gralgrathor; 25.03.2016

Кажется, это тоже работает:

//Simple example with Collectable (basically Thread meant for Pool) and Pool

<?php

class job extends Collectable {
  public $val;

  public function __construct($val){
    // init some properties
    $this->val = $val;
  }
  public function run(){
    // do some work
    $this->val = $this->val . file_get_contents('http://www.example.com/', null, null, 3, 20);
    $this->setGarbage();
  }
}

// At most 3 threads will work at once
$p = new Pool(3);

$tasks = array(
  new job('0'),
  new job('1'),
  new job('2'),
  new job('3'),
  new job('4'),
  new job('5'),
  new job('6'),
  new job('7'),
  new job('8'),
  new job('9'),
  new job('10'),
);
// Add tasks to pool queue
foreach ($tasks as $task) {
  $p->submit($task);
}

// shutdown will wait for current queue to be completed
$p->shutdown();
// garbage collection check / read results
$p->collect(function($checkingTask){
  echo $checkingTask->val;
  return $checkingTask->isGarbage();
});

?>
person MSS    schedule 23.06.2015