Как реализовать PriorityBlockingQueue с ThreadPoolExecutor и настраиваемыми задачами

Я много искал, но не нашел решения своей проблемы.

У меня есть собственный класс BaseTask, который использует ThreadPoolExecutor для обработки задач. Я хочу установить приоритеты задач, но когда я пытаюсь использовать PriorityBlockingQueue, я получаю ClassCastException, потому что ThreadPoolExecutor превращает мои Задачи в объект FutureTask.

Это, очевидно, имеет смысл, потому что FutureTask не реализует Comparable, но как мне решить проблему приоритета? Я читал, что вы можете переопределить newTaskFor() в ThreadPoolExecutor, но я вообще не могу найти этот метод ...?

Любые предложения будут высоко ценится!

Код в помощь:

В моем BaseTask классе у меня есть

private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
    1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);

private final BaseFutureTask<Result> mFuture;

public BaseTask(int priority) {
    mFuture = new BaseFutureTask<Result>(mWorker, priority);
}

public final BaseTask<Params, Progress, Result> execute(Params... params) {

    /* Some unimportant code here */

    sExecutor.execute(mFuture);
}

В BaseFutureTask классе

@Override
public int compareTo(BaseFutureTask another) {
    long diff = this.priority - another.priority;

    return Long.signum(diff);
}

В BaseThreadPoolExecutor классе я переопределяю 3 submit методов ... Конструктор в этом классе вызывается, но ни один из submit методов


person greve    schedule 23.08.2010    source источник
comment
См. Также stackoverflow.com/questions/807223/   -  person John McCarthy    schedule 25.01.2012
comment
Поток, на который ссылается OP, - это stackoverflow.com/questions/11430574/   -  person Engineer    schedule 30.09.2012


Ответы (7)


public class ExecutorPriority {

public static void main(String[] args) {

    PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());

    Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);
    exe.execute(new RunWithPriority(2) {

        @Override
        public void run() {

            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });
    exe.execute(new RunWithPriority(10) {

        @Override
        public void run() {
            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });

}

private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {
        return o1.getPriority().compareTo(o2.getPriority());
    }
}

}

как вы можете догадаться, RunWithPriority - это абстрактный класс, который является Runnable и имеет поле приоритета Integer

person dupdup    schedule 30.03.2011
comment
Я действительно не понимаю твой пример! Я тупой или первый Executor ex никогда не использовался? ржу не могу - person mat_boy; 16.04.2013
comment
Я не понимаю, почему это самый популярный ответ. new ComparePriority() не указывает параметр универсального типа, поэтому я считаю это быстрым и грязным решением. - person Timmos; 24.02.2016

Вы можете использовать эти вспомогательные классы:

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

И

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

И этот вспомогательный метод:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

И затем используйте его так:

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}
person Stanislav Vitvitskyy    schedule 16.05.2013
comment
Это должен быть принятый ответ. Спасибо @Stanislav. - person Gray; 19.07.2019

Я постараюсь объяснить эту проблему полнофункциональным кодом. Но прежде чем погрузиться в код, я хотел бы рассказать о PriorityBlockingQueue.

PriorityBlockingQueue: PriorityBlockingQueue - это реализация BlockingQueue. Он принимает задачи вместе с их приоритетом и сначала отправляет на выполнение задачу с наивысшим приоритетом. Если любые две задачи имеют одинаковый приоритет, нам нужно предоставить некоторую настраиваемую логику, чтобы решить, какая задача выполняется первой.

Теперь сразу перейдем к коду.

Класс драйвера: этот класс создает исполнителя, который принимает задачи, а затем отправляет их на выполнение. Здесь мы создаем две задачи: одну с НИЗКИМ приоритетом, а другую с ВЫСОКИМ приоритетом. Здесь мы говорим исполнителю запустить MAX из 1 потока и использовать PriorityBlockingQueue.

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);

    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
}

Класс MyTask: MyTask реализует Runnable и принимает приоритет в качестве аргумента в конструкторе. Когда эта задача запускается, она печатает сообщение, а затем переводит поток в спящий режим на 1 секунду.

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

Класс MyFutureTask: поскольку мы используем PriorityBlocingQueue для хранения наших задач, наши задачи должны быть заключены в FutureTask, а наша реализация FutureTask должна реализовывать интерфейс Comparable. Интерфейс Comparable сравнивает приоритет двух разных задач и отправляет на выполнение задачу с наивысшим приоритетом.

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

Приоритетный класс: Понятный приоритетный класс.

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

Теперь, когда мы запускаем этот пример, мы получаем следующий результат

The following Runnable is getting executed High
The following Runnable is getting executed Low

Несмотря на то, что мы отправили сначала НИЗКИЙ приоритет, а задачу с ВЫСОКИМ приоритетом позже, но поскольку мы используем PriorityBlockingQueue, задача с более высоким приоритетом будет выполняться первой.

person thedarkpassenger    schedule 24.01.2016

Мое решение:

public class XThreadPoolExecutor extends ThreadPoolExecutor
{
    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory, RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
    {
        return new ComparableFutureTask<>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
    {
        return new ComparableFutureTask<>(callable);
    }

    protected class ComparableFutureTask<V>
        extends FutureTask<V> implements Comparable<ComparableFutureTask<V>>
    {
        private Object object;
        public ComparableFutureTask(Callable<V> callable)
        {
            super(callable);
            object = callable;
        }

        public ComparableFutureTask(Runnable runnable, V result)
        {
            super(runnable, result);
            object = runnable;
        }

        @Override
        @SuppressWarnings("unchecked")
        public int compareTo(ComparableFutureTask<V> o)
        {
            if (this == o)
            {
                return 0;
            }
            if (o == null)
            {
                return -1; // high priority
            }
            if (object != null && o.object != null)
            {
                if (object.getClass().equals(o.object.getClass()))
                {
                    if (object instanceof Comparable)
                    {
                        return ((Comparable) object).compareTo(o.object);
                    }
                }
            }
            return 0;
        }
    }
}
person canghailan    schedule 12.12.2011

Похоже, они оставили это вне гармонии Apache. Существует svn commit log около года назад, исправив отсутствие newTaskFor. Вероятно, вы можете просто переопределить submit функции в расширенном ThreadPoolExecutor, чтобы создать расширенный FutureTask, который равен Comparable. Они не очень длинные.

person Rich Schuler    schedule 23.08.2010
comment
Нет, вы должны продлить его. Он упаковывается в FutureTask в различных методах отправки. - person Rich Schuler; 24.08.2010
comment
Не похоже, чтобы был вызван какой-либо из submit методов ... Добавлен код, чтобы помочь понять - person greve; 25.08.2010
comment
Больше не звоните ThreadPoolExecutor#execute. Все, что вы хотите сделать с помощью пула потоков, нужно будет выполнить через вызов submit. Затем submit методы должны вызывать execute за вас. - person Rich Schuler; 25.08.2010
comment
Но единственное, что делает метод submit, это просто вызывает метод execute ... Он создает новый BaseFutureTask, а затем выполняет его. - person greve; 25.08.2010

Чтобы ответить на ваш вопрос: метод newTaskFor() находится в суперклассе ThreadPoolExecutor, AbstractExecutorService. Однако вы можете просто переопределить его в ThreadPoolExecutor.

person Engineer    schedule 30.09.2012

Этот ответ является упрощенной версией ответа @StanislavVitvitskyy. Спасибо ему.

Я хотел сделать отправленные мной вакансии Comparable. Я создал ExecutorService с PriorityBlockingQueue и расширил его для обработки newTaskFor(...) методов:

ExecutorService pool = new ThreadPoolExecutor(corePoolSize, maximumPoolSize,
    keepAliveTime, timeUnit, new PriorityBlockingQueue<Runnable>()) {

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
        return new ComparableFutureTask<T>(runnable, value);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new ComparableFutureTask<T>(callable);
    };
};

Я определил ComparableFutureTask, который расширяет FutureTask и реализует Comparable, делегируя job.compareTo(...), которые отправлены в пул.

public class ComparableFutureTask<T> extends FutureTask<T>
    implements Comparable<Object> {

    private final Comparable<Object> comparableJob;

    @SuppressWarnings("unchecked")
    public ComparableFutureTask(Runnable runnable, T value) {
        super(runnable, value);
        this.comparableJob = (Comparable<Object>) runnable;
    }

    @SuppressWarnings("unchecked")
    public ComparableFutureTask(Callable<T> callable) {
        super(callable);
        this.comparableJob = (Comparable<Object>) callable;
    }

    @Override
    public int compareTo(Object o) {
        return this.comparableJob
            .compareTo(((ComparableFutureTask<?>) o).comparable);
    }
}

Этот ExecutorService затем может обрабатывать Runnable или Callable задания, которые также являются Comparable. Например:

public class MyJob implements Runnable, Comparable<MyJob> {
    private int priority;
    ...
    @Override
    public int compareTo(MyJob other) {
        // we want higher priority to go first
        return other.priority - this.priority;
    }
    ...
}

Важно отметить, что если вы отправите задание, которое не Comparable, в эту очередь, оно выдаст ClassCastException.

person Gray    schedule 19.07.2019