diff options
author | Sven Gothel <[email protected]> | 2020-10-19 13:25:42 +0200 |
---|---|---|
committer | Sven Gothel <[email protected]> | 2020-10-19 13:25:42 +0200 |
commit | 3b249239525bed207ac1f9fd9231fbda5c33656f (patch) | |
tree | 28f9a7f0b68efc8431a0d39eefacdbf9e0773d84 /src | |
parent | 9cfa1a4530e7c0b071b5875e8857ee4e4abac209 (diff) |
GATTHandler: Use read lock-free cow_vector for characteristicListenerList, avoiding locks in callback iteration
Also use relaxed_atomic_bool for sendIndicationConfirmation.
Diffstat (limited to 'src')
-rw-r--r-- | src/direct_bt/GATTHandler.cpp | 60 |
1 files changed, 34 insertions, 26 deletions
diff --git a/src/direct_bt/GATTHandler.cpp b/src/direct_bt/GATTHandler.cpp index 0146e28e..25af8b40 100644 --- a/src/direct_bt/GATTHandler.cpp +++ b/src/direct_bt/GATTHandler.cpp @@ -99,20 +99,14 @@ bool GATTHandler::validateConnected() noexcept { return true; } +static jau::cow_vector<std::shared_ptr<GATTCharacteristicListener>>::equal_comparator _characteristicListenerRefEqComparator = + [](const std::shared_ptr<GATTCharacteristicListener> &a, const std::shared_ptr<GATTCharacteristicListener> &b) -> bool { return *a == *b; }; + bool GATTHandler::addCharacteristicListener(std::shared_ptr<GATTCharacteristicListener> l) { if( nullptr == l ) { throw IllegalArgumentException("GATTEventListener ref is null", E_FILE_LINE); } - const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor - for(auto it = characteristicListenerList.begin(); it != characteristicListenerList.end(); ) { - if ( **it == *l ) { - return false; // already included - } else { - ++it; - } - } - characteristicListenerList.push_back(l); - return true; + return characteristicListenerList.push_back_unique(l, _characteristicListenerRefEqComparator); } bool GATTHandler::removeCharacteristicListener(std::shared_ptr<GATTCharacteristicListener> l) noexcept { @@ -120,7 +114,8 @@ bool GATTHandler::removeCharacteristicListener(std::shared_ptr<GATTCharacteristi ERR_PRINT("Given GATTCharacteristicListener ref is null"); return false; } - return removeCharacteristicListener( l.get() ); + const int count = characteristicListenerList.erase_matching(l, false /* all_matching */, _characteristicListenerRefEqComparator); + return count > 0; } bool GATTHandler::removeCharacteristicListener(const GATTCharacteristicListener * l) noexcept { @@ -128,15 +123,22 @@ bool GATTHandler::removeCharacteristicListener(const GATTCharacteristicListener ERR_PRINT("Given GATTCharacteristicListener ref is null"); return false; } - const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor - for(auto it = characteristicListenerList.begin(); it != characteristicListenerList.end(); ) { + const std::lock_guard<std::recursive_mutex> lock(characteristicListenerList.get_write_mutex()); + std::shared_ptr<std::vector<std::shared_ptr<GATTCharacteristicListener>>> snapshot = characteristicListenerList.copy_store(); + int count = 0; + for(auto it = snapshot->begin(); it != snapshot->end(); ) { if ( **it == *l ) { - it = characteristicListenerList.erase(it); - return true; + it = snapshot->erase(it); + count++; + break; } else { ++it; } } + if( 0 < count ) { + characteristicListenerList.set_store(std::move(snapshot)); + return true; + } return false; } @@ -153,32 +155,36 @@ int GATTHandler::removeAllAssociatedCharacteristicListener(const GATTCharacteris ERR_PRINT("Given GATTCharacteristic ref is null"); return false; } - const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor - for(auto it = characteristicListenerList.begin(); it != characteristicListenerList.end(); ) { + const std::lock_guard<std::recursive_mutex> lock(characteristicListenerList.get_write_mutex()); + std::shared_ptr<std::vector<std::shared_ptr<GATTCharacteristicListener>>> snapshot = characteristicListenerList.copy_store(); + int count = 0; + for(auto it = snapshot->begin(); it != snapshot->end(); ) { if ( (*it)->match(*associatedCharacteristic) ) { - it = characteristicListenerList.erase(it); - return true; + it = snapshot->erase(it); + count++; + break; } else { ++it; } } + if( 0 < count ) { + characteristicListenerList.set_store(std::move(snapshot)); + return true; + } return false; } int GATTHandler::removeAllCharacteristicListener() noexcept { - const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor int count = characteristicListenerList.size(); characteristicListenerList.clear(); return count; } void GATTHandler::setSendIndicationConfirmation(const bool v) { - const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor sendIndicationConfirmation = v; } bool GATTHandler::getSendIndicationConfirmation() noexcept { - const std::lock_guard<std::recursive_mutex> lock(mtx_eventListenerList); // RAII-style acquire and relinquish via destructor return sendIndicationConfirmation; } @@ -211,7 +217,7 @@ void GATTHandler::l2capReaderThreadImpl() { const std::shared_ptr<TROOctets> data(new POctets(a->getValue())); const uint64_t timestamp = a->ts_creation; int i=0; - for_each_idx_mtx(mtx_eventListenerList, characteristicListenerList, [&](std::shared_ptr<GATTCharacteristicListener> &l) { + jau::for_each_cow(characteristicListenerList, [&](std::shared_ptr<GATTCharacteristicListener> &l) { try { if( l->match(*decl) ) { l->notificationReceived(decl, data, timestamp); @@ -225,7 +231,8 @@ void GATTHandler::l2capReaderThreadImpl() { }); } else if( AttPDUMsg::Opcode::ATT_HANDLE_VALUE_IND == opc ) { const AttHandleValueRcv * a = static_cast<const AttHandleValueRcv*>(attPDU.get()); - COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: IND: %s, sendIndicationConfirmation %d, listener %zd", a->toString().c_str(), sendIndicationConfirmation, characteristicListenerList.size()); + COND_PRINT(env.DEBUG_DATA, "GATTHandler::reader: IND: %s, sendIndicationConfirmation %d, listener %zd", + a->toString().c_str(), sendIndicationConfirmation.load(), characteristicListenerList.size()); bool cfmSent = false; if( sendIndicationConfirmation ) { AttHandleValueCfm cfm; @@ -236,7 +243,7 @@ void GATTHandler::l2capReaderThreadImpl() { const std::shared_ptr<TROOctets> data(new POctets(a->getValue())); const uint64_t timestamp = a->ts_creation; int i=0; - for_each_idx_mtx(mtx_eventListenerList, characteristicListenerList, [&](std::shared_ptr<GATTCharacteristicListener> &l) { + jau::for_each_cow(characteristicListenerList, [&](std::shared_ptr<GATTCharacteristicListener> &l) { try { if( l->match(*decl) ) { l->indicationReceived(decl, data, timestamp, cfmSent); @@ -327,6 +334,7 @@ GATTHandler::GATTHandler(const std::shared_ptr<DBTDevice> &device) noexcept GATTHandler::~GATTHandler() noexcept { disconnect(false /* disconnectDevice */, false /* ioErrorCause */); + characteristicListenerList.clear(); services.clear(); genericAccess = nullptr; } @@ -350,7 +358,7 @@ bool GATTHandler::disconnect(const bool disconnectDevice, const bool ioErrorCaus const std::lock_guard<std::recursive_mutex> lock(mtx_command); // RAII-style acquire and relinquish via destructor DBG_PRINT("GATTHandler::disconnect: Start: disconnectDevice %d, ioErrorCause %d: GattHandler[%s], l2cap[%s]: %s", disconnectDevice, ioErrorCause, getStateString().c_str(), l2cap.getStateString().c_str(), deviceString.c_str()); - removeAllCharacteristicListener(); + characteristicListenerList.clear(); PERF3_TS_TD("GATTHandler::disconnect.1"); { |