summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorSven Gothel <[email protected]>2020-10-19 13:25:42 +0200
committerSven Gothel <[email protected]>2020-10-19 13:25:42 +0200
commit3b249239525bed207ac1f9fd9231fbda5c33656f (patch)
tree28f9a7f0b68efc8431a0d39eefacdbf9e0773d84 /src
parent9cfa1a4530e7c0b071b5875e8857ee4e4abac209 (diff)
GATTHandler: Use read lock-free cow_vector for characteristicListenerList, avoiding locks in callback iteration
Also use relaxed_atomic_bool for sendIndicationConfirmation.
Diffstat (limited to 'src')
-rw-r--r--src/direct_bt/GATTHandler.cpp60
1 files changed, 34 insertions, 26 deletions
diff --git a/src/direct_bt/GATTHandler.cpp b/src/direct_bt/GATTHandler.cpp
index 0146e28e..25af8b40 100644
--- a/src/direct_bt/GATTHandler.cpp
+++ b/src/direct_bt/GATTHandler.cpp
@@ -99,20 +99,14 @@ bool GATTHandler::validateConnected() noexcept {
return true;
}
+static jau::cow_vector<std::shared_ptr<GATTCharacteristicListener>>::equal_comparator _characteristicListenerRefEqComparator =
+ [](const std::shared_ptr<GATTCharacteristicListener> &a, const std::shared_ptr<GATTCharacteristicListener> &b) -> bool { return *a == *b; };
+
bool GATTHandler::addCharacteristicListener(std::shared_ptr<GATTCharacteristicListener> l) {
if( nullptr == l ) {
throw IllegalArgumentException("GATTEventListener ref is null", E_FILE_LINE);
}
- const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor
- for(auto it = characteristicListenerList.begin(); it != characteristicListenerList.end(); ) {
- if ( **it == *l ) {
- return false; // already included
- } else {
- ++it;
- }
- }
- characteristicListenerList.push_back(l);
- return true;
+ return characteristicListenerList.push_back_unique(l, _characteristicListenerRefEqComparator);
}
bool GATTHandler::removeCharacteristicListener(std::shared_ptr<GATTCharacteristicListener> l) noexcept {
@@ -120,7 +114,8 @@ bool GATTHandler::removeCharacteristicListener(std::shared_ptr<GATTCharacteristi
ERR_PRINT("Given GATTCharacteristicListener ref is null");
return false;
}
- return removeCharacteristicListener( l.get() );
+ const int count = characteristicListenerList.erase_matching(l, false /* all_matching */, _characteristicListenerRefEqComparator);
+ return count > 0;
}
bool GATTHandler::removeCharacteristicListener(const GATTCharacteristicListener * l) noexcept {
@@ -128,15 +123,22 @@ bool GATTHandler::removeCharacteristicListener(const GATTCharacteristicListener
ERR_PRINT("Given GATTCharacteristicListener ref is null");
return false;
}
- const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor
- for(auto it = characteristicListenerList.begin(); it != characteristicListenerList.end(); ) {
+ const std::lock_guard<std::recursive_mutex> lock(characteristicListenerList.get_write_mutex());
+ std::shared_ptr<std::vector<std::shared_ptr<GATTCharacteristicListener>>> snapshot = characteristicListenerList.copy_store();
+ int count = 0;
+ for(auto it = snapshot->begin(); it != snapshot->end(); ) {
if ( **it == *l ) {
- it = characteristicListenerList.erase(it);
- return true;
+ it = snapshot->erase(it);
+ count++;
+ break;
} else {
++it;
}
}
+ if( 0 < count ) {
+ characteristicListenerList.set_store(std::move(snapshot));
+ return true;
+ }
return false;
}
@@ -153,32 +155,36 @@ int GATTHandler::removeAllAssociatedCharacteristicListener(const GATTCharacteris
ERR_PRINT("Given GATTCharacteristic ref is null");
return false;
}
- const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor
- for(auto it = characteristicListenerList.begin(); it != characteristicListenerList.end(); ) {
+ const std::lock_guard<std::recursive_mutex> lock(characteristicListenerList.get_write_mutex());
+ std::shared_ptr<std::vector<std::shared_ptr<GATTCharacteristicListener>>> snapshot = characteristicListenerList.copy_store();
+ int count = 0;
+ for(auto it = snapshot->begin(); it != snapshot->end(); ) {
if ( (*it)->match(*associatedCharacteristic) ) {
- it = characteristicListenerList.erase(it);
- return true;
+ it = snapshot->erase(it);
+ count++;
+ break;
} else {
++it;
}
}
+ if( 0 < count ) {
+ characteristicListenerList.set_store(std::move(snapshot));
+ return true;
+ }
return false;
}
int GATTHandler::removeAllCharacteristicListener() noexcept {
- const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor
int count = characteristicListenerList.size();
characteristicListenerList.clear();
return count;
}
void GATTHandler::setSendIndicationConfirmation(const bool v) {
- const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor
sendIndicationConfirmation = v;
}
bool GATTHandler::getSendIndicationConfirmation() noexcept {
- const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor
return sendIndicationConfirmation;
}
@@ -211,7 +217,7 @@ void GATTHandler::l2capReaderThreadImpl() {
const std::shared_ptr<TROOctets> data(new POctets(a->getValue()));
const uint64_t timestamp = a->ts_creation;
int i=0;
- for_each_idx_mtx(mtx_eventListenerList, characteristicListenerList, [&](std::shared_ptr<GATTCharacteristicListener> &l) {
+ jau::for_each_cow(characteristicListenerList, [&](std::shared_ptr<GATTCharacteristicListener> &l) {
try {
if( l->match(*decl) ) {
l->notificationReceived(decl, data, timestamp);
@@ -225,7 +231,8 @@ void GATTHandler::l2capReaderThreadImpl() {
});
} else if( AttPDUMsg::Opcode::ATT_HANDLE_VALUE_IND == opc ) {
const AttHandleValueRcv * a = static_cast<const AttHandleValueRcv*>(attPDU.get());
- COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: IND: %s, sendIndicationConfirmation %d, listener %zd", a->toString().c_str(), sendIndicationConfirmation, characteristicListenerList.size());
+ COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: IND: %s, sendIndicationConfirmation %d, listener %zd",
+ a->toString().c_str(), sendIndicationConfirmation.load(), characteristicListenerList.size());
bool cfmSent = false;
if( sendIndicationConfirmation ) {
AttHandleValueCfm cfm;
@@ -236,7 +243,7 @@ void GATTHandler::l2capReaderThreadImpl() {
const std::shared_ptr<TROOctets> data(new POctets(a->getValue()));
const uint64_t timestamp = a->ts_creation;
int i=0;
- for_each_idx_mtx(mtx_eventListenerList, characteristicListenerList, [&](std::shared_ptr<GATTCharacteristicListener> &l) {
+ jau::for_each_cow(characteristicListenerList, [&](std::shared_ptr<GATTCharacteristicListener> &l) {
try {
if( l->match(*decl) ) {
l->indicationReceived(decl, data, timestamp, cfmSent);
@@ -327,6 +334,7 @@ GATTHandler::GATTHandler(const std::shared_ptr<DBTDevice> &device) noexcept
GATTHandler::~GATTHandler() noexcept {
disconnect(false /* disconnectDevice */, false /* ioErrorCause */);
+ characteristicListenerList.clear();
services.clear();
genericAccess = nullptr;
}
@@ -350,7 +358,7 @@ bool GATTHandler::disconnect(const bool disconnectDevice, const bool ioErrorCaus
const std::lock_guard<std::recursive_mutex> lock(mtx_command); // RAII-style acquire and relinquish via destructor
DBG_PRINT("GATTHandler::disconnect: Start: disconnectDevice %d, ioErrorCause %d: GattHandler[%s], l2cap[%s]: %s",
disconnectDevice, ioErrorCause, getStateString().c_str(), l2cap.getStateString().c_str(), deviceString.c_str());
- removeAllCharacteristicListener();
+ characteristicListenerList.clear();
PERF3_TS_TD("GATTHandler::disconnect.1");
{