summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMichael Bien <[email protected]>2011-08-03 16:50:06 +0200
committerMichael Bien <[email protected]>2011-08-03 16:50:06 +0200
commitb709c6156393cdb48106e8db8626cbfc332c0541 (patch)
tree65ecc151ff6de65c2169145f64ce61cbd8505cfb
parentc40dfd633ef0f841851ed64c2817a425148378b9 (diff)
refactoring; extracted CLAbstractExecutorService and CLPoolable.
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLAbstractExecutorService.java273
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java261
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLPoolable.java59
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLTask.java20
-rw-r--r--src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java16
-rw-r--r--test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java2
6 files changed, 375 insertions, 256 deletions
diff --git a/src/com/jogamp/opencl/util/concurrent/CLAbstractExecutorService.java b/src/com/jogamp/opencl/util/concurrent/CLAbstractExecutorService.java
new file mode 100644
index 0000000..843e08e
--- /dev/null
+++ b/src/com/jogamp/opencl/util/concurrent/CLAbstractExecutorService.java
@@ -0,0 +1,273 @@
+/*
+ * Copyright (c) 2011, Michael Bien
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are
+ * permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this list of
+ * conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this list
+ * of conditions and the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/*
+ * Created on Tuesday, August 02 2011 02:20
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import com.jogamp.opencl.CLCommandQueue;
+import com.jogamp.opencl.CLResource;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * Common superclass for Executor services supporting OpenCL driven tasks.
+ * @author Michael Bien
+ */
+public abstract class CLAbstractExecutorService implements CLResource {
+
+ protected final ExecutorService excecutor;
+ protected final List<CLCommandQueue> queues;
+
+ private FinishAction finishAction = FinishAction.DO_NOTHING;
+ private boolean released;
+
+ protected CLAbstractExecutorService(ExecutorService executor, List<CLCommandQueue> queues) {
+ this.queues = queues;
+ this.excecutor = executor;
+ }
+
+
+ <R> TaskWrapper<R> wrapTask(CLPoolable<? extends CLQueueContext, R> task) {
+ return new TaskWrapper(task, finishAction);
+ }
+
+ private <R> List<TaskWrapper<R>> wrapTasks(Collection<? extends CLPoolable<? extends CLQueueContext, R>> tasks) {
+ List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size());
+ for (CLPoolable<? extends CLQueueContext, R> task : tasks) {
+ if(task == null) {
+ throw new NullPointerException("at least one task was null");
+ }
+ wrapper.add(new TaskWrapper<R>((CLPoolable<CLQueueContext, R>)task, finishAction));
+ }
+ return wrapper;
+ }
+
+ /**
+ * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result.
+ * @see ExecutorService#invokeAll(java.util.Collection)
+ */
+ public <R> List<Future<R>> invokeAll(Collection<? extends CLPoolable<? extends CLQueueContext, R>> tasks) throws InterruptedException {
+ List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
+ return excecutor.invokeAll(wrapper);
+ }
+
+ /**
+ * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result.
+ * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit)
+ */
+ public <R> List<Future<R>> invokeAll(Collection<? extends CLPoolable<? extends CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
+ List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
+ return excecutor.invokeAll(wrapper, timeout, unit);
+ }
+
+ /**
+ * Submits all tasks for immediate execution (blocking) until a result can be returned.
+ * All other unfinished but started tasks are cancelled.
+ * @see ExecutorService#invokeAny(java.util.Collection)
+ */
+ public <R> R invokeAny(Collection<? extends CLPoolable<? extends CLQueueContext, R>> tasks) throws InterruptedException, ExecutionException {
+ List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
+ return excecutor.invokeAny(wrapper);
+ }
+
+ /**
+ * Submits all tasks for immediate execution (blocking) until a result can be returned.
+ * All other unfinished but started tasks are cancelled.
+ * @see ExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit)
+ */
+ public <R> R invokeAny(Collection<? extends CLPoolable<? super CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
+ List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
+ return excecutor.invokeAny(wrapper, timeout, unit);
+ }
+
+ /**
+ * Submits this task to the pool for execution returning its {@link Future}.
+ * @see ExecutorService#submit(java.util.concurrent.Callable)
+ */
+ public <R> Future<R> submit(CLPoolable<? extends CLQueueContext, R> task) {
+ return excecutor.submit(wrapTask(task));
+ }
+
+ /**
+ * Submits all tasks to the pool for execution and returns their {@link Future}.
+ * Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLPoolable)} for every task.
+ */
+ public <R> List<Future<R>> submitAll(Collection<? extends CLPoolable<? extends CLQueueContext, R>> tasks) {
+ List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size());
+ for (CLPoolable<? extends CLQueueContext, R> task : tasks) {
+ futures.add(submit(task));
+ }
+ return futures;
+ }
+
+ /**
+ * Calls {@link CLCommandQueue#flush()} on all queues.
+ */
+ public void flushQueues() {
+ for (CLCommandQueue queue : queues) {
+ queue.flush();
+ }
+ }
+
+ /**
+ * Calls {@link CLCommandQueue#finish()} on all queues.
+ */
+ public void finishQueues() {
+ for (CLCommandQueue queue : queues) {
+ queue.finish();
+ }
+ }
+
+ /**
+ * Returns the command queues used in this pool.
+ */
+ public List<CLCommandQueue> getQueues() {
+ return Collections.unmodifiableList(queues);
+ }
+
+ /**
+ * Returns the size of this pool (number of command queues).
+ */
+ public int getPoolSize() {
+ return queues.size();
+ }
+ /**
+ * Returns the action which is executed when a task finishes.
+ */
+ public FinishAction getFinishAction() {
+ return finishAction;
+ }
+
+ ExecutorService getExcecutor() {
+ return excecutor;
+ }
+
+ @Override
+ public boolean isReleased() {
+ return released;
+ }
+
+ /**
+ * Sets the action which is run after every completed task.
+ * This is mainly intended for debugging, default value is {@link FinishAction#DO_NOTHING}.
+ */
+ public void setFinishAction(FinishAction action) {
+ this.finishAction = action;
+ }
+
+ /**
+ * Releases the queue context, all queues including a shutdown of the internal threadpool.
+ * The call will block until all currently executing tasks have finished, no new tasks are started.
+ */
+ @Override
+ public void release() {
+ if (released) {
+ throw new RuntimeException(getClass().getSimpleName() + " already released");
+ }
+ released = true;
+ excecutor.shutdownNow(); // threads will cleanup CL resources on exit
+ try {
+ excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ protected static interface CommandQueueThread {
+
+ Map<Object, CLQueueContext> getContextMap();
+
+ CLCommandQueue getQueue();
+ }
+
+ protected static class TaskWrapper<R> implements Callable<R> {
+
+ protected final CLPoolable<CLQueueContext, R> task;
+ private final FinishAction mode;
+
+ private TaskWrapper(CLPoolable<CLQueueContext, R> task, FinishAction mode) {
+ this.task = task;
+ this.mode = mode;
+ }
+
+ @Override
+ public R call() throws Exception {
+
+ CommandQueueThread thread = (CommandQueueThread)Thread.currentThread();
+
+ final Object key = task.getContextKey();
+
+ CLQueueContext context = thread.getContextMap().get(key);
+ if(context == null) {
+ context = task.createQueueContext(thread.getQueue());
+ thread.getContextMap().put(key, context);
+ }
+
+ R result = task.execute(context);
+ if(mode.equals(FinishAction.FLUSH)) {
+ context.queue.flush();
+ }else if(mode.equals(FinishAction.FINISH)) {
+ context.queue.finish();
+ }
+ return result;
+ }
+
+ }
+
+ /**
+ * The action executed after a task completes.
+ */
+ public enum FinishAction {
+
+ /**
+ * Does nothing, the task is responsible to make sure all computations
+ * have finished when the task finishes
+ */
+ DO_NOTHING,
+
+ /**
+ * Flushes the queue on task completion.
+ */
+ FLUSH,
+
+ /**
+ * Finishes the queue on task completion.
+ */
+ FINISH
+ }
+
+}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
index beafcc9..f6fadd6 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLCommandQueuePool.java
@@ -31,26 +31,21 @@ package com.jogamp.opencl.util.concurrent;
import com.jogamp.opencl.CLCommandQueue;
import com.jogamp.opencl.CLDevice;
-import com.jogamp.opencl.CLResource;
import com.jogamp.opencl.util.CLMultiContext;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
/**
* A multithreaded, fixed size pool of OpenCL command queues.
@@ -59,23 +54,11 @@ import java.util.concurrent.TimeoutException;
* instead of {@link Callable}s and provides a per-queue context for resource sharing across all tasks of one queue.
* @author Michael Bien
*/
-public class CLCommandQueuePool implements CLResource {
+public class CLCommandQueuePool extends CLAbstractExecutorService {
- private ThreadPoolExecutor excecutor;
- private FinishAction finishAction = FinishAction.DO_NOTHING;
- private boolean released;
- private final List<CLCommandQueue> queues;
- private CLCommandQueuePool(Collection<CLCommandQueue> queues) {
- this.queues = new ArrayList<CLCommandQueue>(queues);
- initExecutor();
- }
-
- private void initExecutor() {
- BlockingQueue<Runnable> queue = new LinkedBlockingDeque<Runnable>();
- QueueThreadFactory factory = new QueueThreadFactory(queues);
- int size = queues.size();
- this.excecutor = new CLThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue, factory);
+ private CLCommandQueuePool(ExecutorService executor, List<CLCommandQueue> queues) {
+ super(executor, queues);
}
public static CLCommandQueuePool create(CLMultiContext mc, CLCommandQueue.Mode... modes) {
@@ -91,145 +74,19 @@ public class CLCommandQueuePool implements CLResource {
}
public static CLCommandQueuePool create(Collection<CLCommandQueue> queues) {
- return new CLCommandQueuePool(queues);
- }
-
- /**
- * Submits this task to the pool for execution returning its {@link Future}.
- * @see ExecutorService#submit(java.util.concurrent.Callable)
- */
- public <R> Future<R> submit(CLTask<? extends CLQueueContext, R> task) {
- return excecutor.submit(wrapTask(task));
- }
-
- /**
- * Submits all tasks to the pool for execution and returns their {@link Future}.
- * Calls {@link #submit(com.jogamp.opencl.util.concurrent.CLTask)} for every task.
- */
- public <R> List<Future<R>> submitAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) {
- List<Future<R>> futures = new ArrayList<Future<R>>(tasks.size());
- for (CLTask<? extends CLQueueContext, R> task : tasks) {
- futures.add(submit(task));
- }
- return futures;
- }
-
- /**
- * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result.
- * @see ExecutorService#invokeAll(java.util.Collection)
- */
- public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) throws InterruptedException {
- List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
- return excecutor.invokeAll(wrapper);
- }
-
- /**
- * Submits all tasks to the pool for immediate execution (blocking) and returns their {@link Future} holding the result.
- * @see ExecutorService#invokeAll(java.util.Collection, long, java.util.concurrent.TimeUnit)
- */
- public <R> List<Future<R>> invokeAll(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException {
- List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
- return excecutor.invokeAll(wrapper, timeout, unit);
- }
-
- /**
- * Submits all tasks for immediate execution (blocking) until a result can be returned.
- * All other unfinished but started tasks are cancelled.
- * @see ExecutorService#invokeAny(java.util.Collection)
- */
- public <R> R invokeAny(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) throws InterruptedException, ExecutionException {
- List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
- return excecutor.invokeAny(wrapper);
- }
-
- /*public*/ CLTask<? extends CLQueueContext, ?> takeCLTask() throws InterruptedException {
- return ((CLFutureTask<?>)excecutor.getQueue().take()).getCLTask();
- }
-
- /**
- * Submits all tasks for immediate execution (blocking) until a result can be returned.
- * All other unfinished but started tasks are cancelled.
- * @see ExecutorService#invokeAny(java.util.Collection, long, java.util.concurrent.TimeUnit)
- */
- public <R> R invokeAny(Collection<? extends CLTask<? super CLQueueContext, R>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
- List<TaskWrapper<R>> wrapper = wrapTasks(tasks);
- return excecutor.invokeAny(wrapper, timeout, unit);
- }
-
- <R> TaskWrapper<R> wrapTask(CLTask<? extends CLQueueContext, R> task) {
- return new TaskWrapper(task, finishAction);
- }
-
- private <R> List<TaskWrapper<R>> wrapTasks(Collection<? extends CLTask<? extends CLQueueContext, R>> tasks) {
- List<TaskWrapper<R>> wrapper = new ArrayList<TaskWrapper<R>>(tasks.size());
- for (CLTask<? extends CLQueueContext, R> task : tasks) {
- if(task == null) {
- throw new NullPointerException("at least one task was null");
- }
- wrapper.add(new TaskWrapper<R>((CLTask<CLQueueContext, R>)task, finishAction));
- }
- return wrapper;
- }
-
- /**
- * Calls {@link CLCommandQueue#flush()} on all queues.
- */
- public void flushQueues() {
- for (CLCommandQueue queue : queues) {
- queue.flush();
- }
- }
-
- /**
- * Calls {@link CLCommandQueue#finish()} on all queues.
- */
- public void finishQueues() {
- for (CLCommandQueue queue : queues) {
- queue.finish();
- }
- }
-
- /**
- * Releases the queue context, all queues including a shutdown of the internal threadpool.
- * The call will block until all currently executing tasks have finished, no new tasks are started.
- */
- @Override
- public void release() {
- if(released) {
- throw new RuntimeException(getClass().getSimpleName()+" already released");
- }
- released = true;
- excecutor.shutdownNow(); // threads will cleanup CL resources on exit
- try {
- excecutor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
- } catch (InterruptedException ex) {
- throw new RuntimeException(ex);
- }
- }
-
- ExecutorService getExcecutor() {
- return excecutor;
- }
+
+ List<CLCommandQueue> list = new ArrayList<CLCommandQueue>(queues);
- /**
- * Returns the command queues used in this pool.
- */
- public List<CLCommandQueue> getQueues() {
- return Collections.unmodifiableList(queues);
- }
+ BlockingQueue<Runnable> queue = new LinkedBlockingDeque<Runnable>();
+ CommandQueuePoolThreadFactory factory = new CommandQueuePoolThreadFactory(list);
+ int size = list.size();
- /**
- * Returns the size of this pool (number of command queues).
- */
- public int getPoolSize() {
- return queues.size();
+ ExecutorService executor = new CLThreadPoolExecutor(size, size, 0L, TimeUnit.MILLISECONDS, queue, factory);
+ return new CLCommandQueuePool(executor, list);
}
- /**
- * Returns the action which is executed when a task finishes.
- */
- public FinishAction getFinishAction() {
- return finishAction;
+ /*public*/ CLPoolable<? extends CLQueueContext, ?> takeCLTask() throws InterruptedException {
+ return ((CLFutureTask<?>)getExcecutor().getQueue().take()).getCLPoolable();
}
/**
@@ -238,7 +95,7 @@ public class CLCommandQueuePool implements CLResource {
* value is only an approximation.
*/
public long getTaskCount() {
- return excecutor.getTaskCount();
+ return getExcecutor().getTaskCount();
}
/**
@@ -247,40 +104,32 @@ public class CLCommandQueuePool implements CLResource {
* the returned value is only an approximation, but one that does not ever decrease across successive calls.
*/
public long getCompletedTaskCount() {
- return excecutor.getCompletedTaskCount();
+ return getExcecutor().getCompletedTaskCount();
}
/**
* Returns the approximate number of queues that are actively executing tasks.
*/
public int getActiveCount() {
- return excecutor.getActiveCount();
+ return getExcecutor().getActiveCount();
}
@Override
- public boolean isReleased() {
- return released;
- }
-
- /**
- * Sets the action which is run after every completed task.
- * This is mainly intended for debugging, default value is {@link FinishAction#DO_NOTHING}.
- */
- public void setFinishAction(FinishAction action) {
- this.finishAction = action;
+ ThreadPoolExecutor getExcecutor() {
+ return (ThreadPoolExecutor) excecutor;
}
@Override
public String toString() {
- return getClass().getSimpleName()+" [queues: "+getPoolSize()+" on finish: "+finishAction+"]";
+ return getClass().getSimpleName()+" [queues: "+getPoolSize()+" on finish: "+getFinishAction()+"]";
}
- private static class QueueThreadFactory implements ThreadFactory {
+ private static class CommandQueuePoolThreadFactory implements ThreadFactory {
private final List<CLCommandQueue> queues;
private int index;
- private QueueThreadFactory(List<CLCommandQueue> queues) {
+ public CommandQueuePoolThreadFactory(List<CLCommandQueue> queues) {
this.queues = queues;
this.index = 0;
}
@@ -292,20 +141,20 @@ public class CLCommandQueuePool implements CLResource {
ThreadGroup group = (sm != null) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup();
CLCommandQueue queue = queues.get(index);
- QueueThread thread = new QueueThread(group, runnable, queue, index++);
+ Thread thread = new CommandQueuePoolThread(group, runnable, queue, index++);
thread.setDaemon(true);
-
+
return thread;
}
}
-
- private static class QueueThread extends Thread {
+
+ private static class CommandQueuePoolThread extends Thread implements CommandQueueThread {
private final CLCommandQueue queue;
private final Map<Object, CLQueueContext> contextMap;
- public QueueThread(ThreadGroup group, Runnable runnable, CLCommandQueue queue, int index) {
+ public CommandQueuePoolThread(ThreadGroup group, Runnable runnable, CLCommandQueue queue, int index) {
super(group, runnable, "queue-worker-thread-"+index+"["+queue+"]");
this.queue = queue;
this.contextMap = new HashMap<Object, CLQueueContext>();
@@ -321,38 +170,14 @@ public class CLCommandQueuePool implements CLResource {
}
}
- }
-
- private static class TaskWrapper<R> implements Callable<R> {
-
- private final CLTask<CLQueueContext, R> task;
- private final FinishAction mode;
-
- private TaskWrapper(CLTask<CLQueueContext, R> task, FinishAction mode) {
- this.task = task;
- this.mode = mode;
+ @Override
+ public CLCommandQueue getQueue() {
+ return queue;
}
@Override
- public R call() throws Exception {
-
- QueueThread thread = (QueueThread)Thread.currentThread();
-
- final Object key = task.getContextKey();
-
- CLQueueContext context = thread.contextMap.get(key);
- if(context == null) {
- context = task.createQueueContext(thread.queue);
- thread.contextMap.put(key, context);
- }
-
- R result = task.execute(context);
- if(mode.equals(FinishAction.FLUSH)) {
- context.queue.flush();
- }else if(mode.equals(FinishAction.FINISH)) {
- context.queue.finish();
- }
- return result;
+ public Map<Object, CLQueueContext> getContextMap() {
+ return contextMap;
}
}
@@ -366,12 +191,12 @@ public class CLCommandQueuePool implements CLResource {
this.wrapper = wrapper;
}
- public CLTask<? extends CLQueueContext, R> getCLTask() {
+ public CLPoolable<? extends CLQueueContext, R> getCLPoolable() {
return wrapper.task;
}
}
-
+
private static class CLThreadPoolExecutor extends ThreadPoolExecutor {
public CLThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
@@ -386,26 +211,4 @@ public class CLCommandQueuePool implements CLResource {
}
- /**
- * The action executed after a task completes.
- */
- public enum FinishAction {
-
- /**
- * Does nothing, the task is responsible to make sure all computations
- * have finished when the task finishes
- */
- DO_NOTHING,
-
- /**
- * Flushes the queue on task completion.
- */
- FLUSH,
-
- /**
- * Finishes the queue on task completion.
- */
- FINISH
- }
-
}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLPoolable.java b/src/com/jogamp/opencl/util/concurrent/CLPoolable.java
new file mode 100644
index 0000000..6efd8de
--- /dev/null
+++ b/src/com/jogamp/opencl/util/concurrent/CLPoolable.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (c) 2011, Michael Bien
+ * All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without modification, are
+ * permitted provided that the following conditions are met:
+ *
+ * 1. Redistributions of source code must retain the above copyright notice, this list of
+ * conditions and the following disclaimer.
+ *
+ * 2. Redistributions in binary form must reproduce the above copyright notice, this list
+ * of conditions and the following disclaimer in the documentation and/or other materials
+ * provided with the distribution.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
+ * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND
+ * FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR
+ * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+ * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
+ * SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON
+ * ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
+ * NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF
+ * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ *
+ */
+
+/*
+ * Created on Tuesday, August 02 2011 21:51
+ */
+package com.jogamp.opencl.util.concurrent;
+
+import com.jogamp.opencl.CLCommandQueue;
+
+/**
+ * A OpenCL task designed to be executed in a managed environment.
+ * @author Michael Bien
+ */
+public interface CLPoolable<C extends CLQueueContext, R> {
+
+ /**
+ * Creates a CLQueueContext for this task. A context may contain static resources
+ * like OpenCL program binaries or pre allocated buffers. A context can be used by an group
+ * of tasks identified by a common context key ({@link #getContextKey()}). This method
+ * won't be called if a context was already created by an previously executed task having the
+ * same context key.
+ */
+ C createQueueContext(CLCommandQueue queue);
+
+ /**
+ * Runs the task on a queue and returns a result.
+ */
+ R execute(C context);
+
+ /**
+ * Returns the context key for this task.
+ */
+ Object getContextKey();
+
+}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLTask.java b/src/com/jogamp/opencl/util/concurrent/CLTask.java
index 287dc32..25f9fd4 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLTask.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLTask.java
@@ -29,35 +29,19 @@
*/
package com.jogamp.opencl.util.concurrent;
-import com.jogamp.opencl.CLCommandQueue;
-
/**
* A task executed on a command queue.
* @author Michael Bien
*/
-public abstract class CLTask<C extends CLQueueContext, R> {
-
-
- /**
- * Creates a CLQueueContext for this task. A context may contain static resources
- * like OpenCL program binaries or pre allocated buffers. A context can be used by an group
- * of tasks identified by a common context key ({@link #getContextKey()}). This method
- * won't be called if a context was already created by an previously executed task having the
- * same context key.
- */
- public abstract C createQueueContext(CLCommandQueue queue);
+public abstract class CLTask<C extends CLQueueContext, R> implements CLPoolable<C, R> {
/**
* Returns the context key for this task. Default implementation returns {@link #getClass()}.
*/
+ @Override
public Object getContextKey() {
return getClass();
}
- /**
- * Runs the task on a queue and returns a result.
- */
- public abstract R execute(C context);
-
}
diff --git a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java
index fe0b7e1..815ffc8 100644
--- a/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java
+++ b/src/com/jogamp/opencl/util/concurrent/CLTaskCompletionService.java
@@ -44,25 +44,25 @@ import java.util.concurrent.TimeUnit;
public class CLTaskCompletionService<R> {
private final ExecutorCompletionService<R> service;
- private final CLCommandQueuePool pool;
+ private final CLAbstractExecutorService executor;
/**
* Creates an CLTaskCompletionService using the supplied pool for base
* task execution and a LinkedBlockingQueue with the capacity of {@link Integer#MAX_VALUE}
* as a completion queue.
*/
- public CLTaskCompletionService(CLCommandQueuePool pool) {
- this.service = new ExecutorCompletionService<R>(pool.getExcecutor());
- this.pool = pool;
+ public CLTaskCompletionService(CLAbstractExecutorService executor) {
+ this.service = new ExecutorCompletionService<R>(executor.getExcecutor());
+ this.executor = executor;
}
/**
* Creates an CLTaskCompletionService using the supplied pool for base
* task execution the supplied queue as its completion queue.
*/
- public CLTaskCompletionService(CLCommandQueuePool pool, BlockingQueue<Future<R>> queue) {
+ public CLTaskCompletionService(CLAbstractExecutorService pool, BlockingQueue<Future<R>> queue) {
this.service = new ExecutorCompletionService<R>(pool.getExcecutor(), queue);
- this.pool = pool;
+ this.executor = pool;
}
/**
@@ -70,8 +70,8 @@ public class CLTaskCompletionService<R> {
* results of the task. Upon completion, this task may be taken or polled.
* @see CompletionService#submit(java.util.concurrent.Callable)
*/
- public Future<R> submit(CLTask<? extends CLQueueContext, R> task) {
- return service.submit(pool.wrapTask(task));
+ public Future<R> submit(CLPoolable<? extends CLQueueContext, R> task) {
+ return service.submit(executor.wrapTask(task));
}
/**
diff --git a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
index 9690fcd..a76f86f 100644
--- a/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
+++ b/test/com/jogamp/opencl/util/concurrent/CLMultiContextTest.java
@@ -223,7 +223,7 @@ public class CLMultiContextTest {
data = Buffers.newDirectIntBuffer(slice*taskCount);
tasks = createTasks(decrementProgramSource, data, taskCount, slice);
- CLTaskCompletionService<IntBuffer> service = new CLTaskCompletionService(pool);
+ CLTaskCompletionService<IntBuffer> service = new CLTaskCompletionService<IntBuffer>(pool);
for (CLTestTask task : tasks) {
service.submit(task);
}