Запустите 100 потоков параллельно и запустите отсутствующие потоки, если некоторые предыдущие завершены.

Например, мне нужно всегда запускать 100 потоков, чтобы выполнить какое-то действие. У меня есть класс, который называется ThreadsWorker, который ищет количество потоков и запускает отсутствующие потоки, если некоторые предыдущие завершены. Итак, это таблица, описывающая ситуацию:

1 second: 100 threads
2 second: 92 threads (ThreadsWorker generates new 8 threads)
3 second: 100 theads
4 second: 72 threads (ThreadsWorker generates 28 threads)

И так далее. Мои потоки являются анонимными вызовами (просто new Thread(new Runnable(...)).start()), потому что я не знаю, как правильно сохранить их в массив Threads[], потому что, хотя ThreadsWorker сохранит threads[i] = new Threads(), некоторые потоки могут быть завершены, и тогда будет некоторая коллизия с индексами массива.

Из-за анонимных вызовов я теперь использую переменную threadsCount и увеличиваю ее в начале тела потока и уменьшаю в конце тела потока (используя synchronized). Хорошо, это работает правильно, и мой единственный способ - использовать цикл while(), который проверяет, threadsCount == 0 ли, когда прогресс завершен.

Я думаю, что это в стиле C, но не в стиле Java :) Итак, вы можете помочь мне сделать это в стиле Java?


person Clark    schedule 01.04.2011    source источник


Ответы (5)


Если ваша цель состоит в том, чтобы просто иметь 100 активных потоков, я предлагаю взглянуть на Java пулы потоковИсполнители в более общем смысле).

Мне неясно, хотите ли вы сохранить все 100 потоков или дождаться их завершения. Ваш вопрос ссылается на оба (ThreadsWorker порождает 28 новых потоков, threadsCount == 0), и они кажутся противоречивыми.

person Michael Brewer-Davis    schedule 01.04.2011
comment
+1 за использование для этого утилит параллелизма, а не необработанных потоков. См. мой ответ здесь для небольшого количества шаблонного кода. stackoverflow.com/questions/5115940/ - person andersoj; 01.04.2011
comment
Итак, если я буду использовать «newFixedThreadPool()», то мне нужно будет использовать некоторый Worker, который будет бесконечно создавать новые потоки в этой очереди для пула? - person Clark; 02.04.2011

Поместите все потоки в массив или коллекцию.

Затем прокрутите коллекцию, вызывая Thread.join() для каждого. Когда этот цикл завершается, все потоки выполняются.

ArrayList threads = new ArrayList();
for (int i = 0; i < 5; i++) {
  Thread t = new AweseomeThread();
  t.start();
  threads.add(t);
}

for (Thread t : threads) {
  t.join();
}

Вам также понадобится обработка исключений (например, InterruptedException). Но я оставлю это как упражнение для читателя... :)

person squawknull    schedule 01.04.2011
comment
Да, я знаю о методе соединения и исключениях :) Но я не знаю, как правильно работать с индексами и массивом потоков в этой ситуации. Я имею в виду, что если я оставлю некоторые индексы пустыми, потому что ThreadsWorker будет думать, что threadsCount = 15, но пока он будет запускать и сохранять дескрипторы потоков, некоторые другие потоки умрут и f.e. threads[71] и threads[75] будут с дескрипторами, но threads[72] до threads[74] будут иметь пустые дескрипторы. Так что мне придется приостановить все потоки, пока ThreadsWorker будет сохранять индексы в массив, чтобы избежать пустых индексов? - person Clark; 01.04.2011
comment
Я бы, вероятно, предложил не пытаться использовать массив, а вместо этого использовать коллекцию. Теперь, когда вы упомянули об этом, и я перечитал ваш вопрос, я неправильно понял. Я думал, вы просто хотите знать, когда все потоки будут завершены, но, похоже, вы хотите сохранить 100 потоков. У меня есть еще одна идея ... Я отправлю ее как отдельный ответ, хотя для ясности и на случай, если я все еще неправильно понимаю ваш вопрос. - person squawknull; 01.04.2011

http://download.oracle.com/javase/6/docs/api/java/util/concurrent/CountDownLatch.html

Вы можете попробовать класс CountDownLatch jdk API

private CountDownLatch latch;
private static class SimpleThread extends Thread {
 public void run() {
  latch.countDown();
 }
}
public static void main(String[] args) {
 int threadcount = 10;
 latch = new CountDownLatch(threadcount);
 for (int i = 0; i < 10; i++) {
  Thread t = new SimpleThread();
  t.start();
 }
 // waiting threads all finished
 latch.await();
}

Получить количество потоков из атрибута latch класса Main.

person store88    schedule 01.04.2011
comment
Я думаю, что это ожидание завершения данной операции в другом потоке, но автор попросил дождаться завершения и возврата потока, верно? Не будет ли Thread.join() более подходящим в его случае? - person squawknull; 01.04.2011
comment
CountDownLatch может помочь Main дождаться завершения всех потоков и кэшировать количество потоков. - person store88; 01.04.2011
comment
Спасибо, я прочитал сегодня о CountDownLatch, но я думаю, что это слишком много для моей задачи (соединение - отличный метод для этого, но у меня есть некоторые проблемы с индексированным массивом). А может и немножко не правильно. Но все равно спасибо! - person Clark; 01.04.2011
comment
› «help Main для ожидания завершения всех потоков и кэширования количества потоков» О! Можете ли вы показать образец? После этого я подумаю, что лучше для моей ситуации. - person Clark; 01.04.2011

Я полагаю, вы пытаетесь заставить ThreadWorker отправлять новые потоки для всех завершенных потоков.

Я бы использовал BlockingQueue, к которому добавляются потоки (ваши Runnable(s)) по завершении. ThreadWorker будет ждать завершения потока, а затем запустит новый поток.

public class YourRunnable implements Runnable {
  private final BlockingQueue<YourRunnable> queue;
  public YourRunnable(BlockingQueue<YourRunnable> queue){
    this.queue = queue;
  }
  public void run{
      // Your Code...
      // Finished Processing
      queue.add(this);
  }
}
public class ThreadWorkder implements Runnable { 
  private final BlockingQueue<YourRunnable> queue;
  ThreadWorker(BlockingQueue<YourRunnable> queue){
    this.queue = queue;
  }
  public void run{
    while(queue.take()){
       (new Thread(new YourRunnable(queue))).start();
    }
  }
  // general main method
  public static void main(String [] args){
    BlockingQueue<YourRunnable> queue = new LinkedBlockingQueue<YourRunnable>();
    ThreadWorker worker = new ThreadWorker(queue);
    Thread(worker).start();
    for (int i = 0; i < 100; i++){
      (new Thread(new YourRunnable(queue))).start();
    }
  }
}
person richs    schedule 01.04.2011

Используйте коллекцию вместо массива. Когда потоки завершатся, пусть они удалятся из массива. Что-то вроде этого:

public class Foo {
  Vector<Thread> threads = new Vector<Thread>(); //Vector is threadsafe

  public ensureThreadCount(int count) {
    while (threads.size() < count) {
      Thread t = new AweseomeThread(threads);
      threads.add(t);
      t.start();
    }
  }
}

public class AwesomeThread {
  Collection threads;
  public AwesomeThread(Collection threads) {
    this.threads = threads;
  }

  public void run() {
    try {
      // do stuff
    } catch (Throwable t) {
    } finally {
      threads.remove(this);
    }
  }
}

Затем пусть ваш воркер просто вызовет Foo.ensureThreadCount().

person squawknull    schedule 01.04.2011