aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--src/lib/filters/threaded_fork.cpp18
-rw-r--r--src/lib/utils/barrier.cpp39
-rw-r--r--src/lib/utils/barrier.h45
-rw-r--r--src/lib/utils/info.txt1
-rw-r--r--src/tests/test_filters.cpp2
5 files changed, 93 insertions, 12 deletions
diff --git a/src/lib/filters/threaded_fork.cpp b/src/lib/filters/threaded_fork.cpp
index ff54bcbc6..07df590f8 100644
--- a/src/lib/filters/threaded_fork.cpp
+++ b/src/lib/filters/threaded_fork.cpp
@@ -11,6 +11,7 @@
#if defined(BOTAN_TARGET_OS_HAS_THREADS)
#include <botan/internal/semaphore.h>
+#include <botan/internal/barrier.h>
namespace Botan {
@@ -23,14 +24,14 @@ struct Threaded_Fork_Data
Semaphore m_input_ready_semaphore;
/*
- * Ensures that all threads have completed processing data.
+ * Synchronises all threads to complete processing data in lock-step.
*/
- Semaphore m_input_complete_semaphore;
+ Barrier m_input_complete_barrier;
/*
* The work that needs to be done. This should be only when the threads
* are NOT running (i.e. before notifying the work condition, after
- * the input_complete_semaphore is completely reset.)
+ * the input_complete_barrier has reset.)
*/
const byte* m_input = nullptr;
@@ -121,11 +122,11 @@ void Threaded_Fork::thread_delegate_work(const byte input[], size_t length)
m_thread_data->m_input_length = length;
//Let the workers start processing.
+ m_thread_data->m_input_complete_barrier.wait(total_ports() + 1);
m_thread_data->m_input_ready_semaphore.release(total_ports());
//Wait for all the filters to finish processing.
- for(size_t i = 0; i != total_ports(); ++i)
- m_thread_data->m_input_complete_semaphore.acquire();
+ m_thread_data->m_input_complete_barrier.sync();
//Reset the thread data
m_thread_data->m_input = nullptr;
@@ -136,18 +137,13 @@ void Threaded_Fork::thread_entry(Filter* filter)
{
while(true)
{
- /*
- * This is plain wrong: a single thread can get the semaphore
- * more than one time, meaning it will process the input twice
- * and some other thread/filter will not see this input.
- */
m_thread_data->m_input_ready_semaphore.acquire();
if(!m_thread_data->m_input)
break;
filter->write(m_thread_data->m_input, m_thread_data->m_input_length);
- m_thread_data->m_input_complete_semaphore.release();
+ m_thread_data->m_input_complete_barrier.sync();
}
}
diff --git a/src/lib/utils/barrier.cpp b/src/lib/utils/barrier.cpp
new file mode 100644
index 000000000..81c578b72
--- /dev/null
+++ b/src/lib/utils/barrier.cpp
@@ -0,0 +1,39 @@
+/*
+* Barrier
+* (C) 2016 Joel Low
+*
+* Botan is released under the Simplified BSD License (see license.txt)
+*/
+
+#include <botan/internal/barrier.h>
+
+#if defined(BOTAN_TARGET_OS_HAS_THREADS)
+
+namespace Botan {
+
+void Barrier::wait(unsigned delta)
+ {
+ lock_guard_type<mutex_type> lock(m_mutex);
+ m_value += delta;
+ }
+
+void Barrier::sync()
+ {
+ std::unique_lock<mutex_type> lock(m_mutex);
+ --m_value;
+ if(m_value > 0)
+ {
+ unsigned current_syncs = m_syncs;
+ m_cond.wait(lock, [this, &current_syncs] { return m_syncs != current_syncs; });
+ }
+ else
+ {
+ m_value = 0;
+ ++m_syncs;
+ m_cond.notify_all();
+ }
+ }
+
+}
+
+#endif
diff --git a/src/lib/utils/barrier.h b/src/lib/utils/barrier.h
new file mode 100644
index 000000000..6d5cf9e58
--- /dev/null
+++ b/src/lib/utils/barrier.h
@@ -0,0 +1,45 @@
+/*
+* Barrier
+* (C) 2016 Joel Low
+*
+* Botan is released under the Simplified BSD License (see license.txt)
+*/
+
+#ifndef BOTAN_UTIL_BARRIER_H__
+#define BOTAN_UTIL_BARRIER_H__
+
+#include <botan/mutex.h>
+
+#if defined(BOTAN_TARGET_OS_HAS_THREADS)
+#include <condition_variable>
+#endif
+
+namespace Botan {
+
+#if defined(BOTAN_TARGET_OS_HAS_THREADS)
+// Barrier implements a barrier synchronization primitive. wait() will indicate
+// how many threads to synchronize; each thread needing synchronization should
+// call sync(). When sync() returns, the barrier is reset to zero, and the
+// m_syncs counter is incremented. m_syncs is a counter to ensure that wait()
+// can be called after a sync() even if the previously sleeping threads have
+// not awoken.)
+class Barrier
+ {
+ public:
+ explicit Barrier(int value = 0) : m_value(value), m_syncs(0) {}
+
+ void wait(unsigned delta);
+
+ void sync();
+
+ private:
+ int m_value;
+ unsigned m_syncs;
+ mutex_type m_mutex;
+ std::condition_variable m_cond;
+ };
+#endif
+
+}
+
+#endif
diff --git a/src/lib/utils/info.txt b/src/lib/utils/info.txt
index 189b2da1f..75a428a83 100644
--- a/src/lib/utils/info.txt
+++ b/src/lib/utils/info.txt
@@ -22,6 +22,7 @@ version.h
</header:public>
<header:internal>
+barrier.h
bit_ops.h
ct_utils.h
donna128.h
diff --git a/src/tests/test_filters.cpp b/src/tests/test_filters.cpp
index d9e9ac23b..6ce83ccba 100644
--- a/src/tests/test_filters.cpp
+++ b/src/tests/test_filters.cpp
@@ -35,7 +35,7 @@ class Filter_Tests : public Test
results.push_back(test_pipe_codec());
results.push_back(test_fork());
-#if defined(BOTAN_TARGET_OS_HAS_THREADS) && 0
+#if defined(BOTAN_TARGET_OS_HAS_THREADS)
// Threaded_Fork is broken
results.push_back(test_threaded_fork());
#endif