Make MutexQueue use jsemaphore for signaling
authorsapier <Sapier at GMX dot net>
Mon, 6 Jan 2014 11:45:42 +0000 (12:45 +0100)
committersapier <Sapier at GMX dot net>
Fri, 10 Jan 2014 09:10:45 +0000 (10:10 +0100)
13 files changed:
src/client.cpp
src/client.h
src/connection.cpp
src/game.cpp
src/httpfetch.cpp
src/itemdef.cpp
src/jthread/jsemaphore.h
src/jthread/pthread/jsemaphore.cpp
src/jthread/win32/jsemaphore.cpp
src/shader.cpp
src/tile.cpp
src/util/container.h
src/util/thread.h

index b830bcdf321c6813f3e04e3dad087e7799a6d378..721c413c008c97c0245fc03add3d4ada2108c903 100644 (file)
@@ -286,6 +286,20 @@ Client::Client(
        }
 }
 
+void Client::Stop()
+{
+       //request all client managed threads to stop
+       m_mesh_update_thread.Stop();
+}
+
+bool Client::isShutdown()
+{
+
+       if (!m_mesh_update_thread.IsRunning()) return true;
+
+       return false;
+}
+
 Client::~Client()
 {
        {
@@ -296,7 +310,7 @@ Client::~Client()
        m_mesh_update_thread.Stop();
        m_mesh_update_thread.Wait();
        while(!m_mesh_update_thread.m_queue_out.empty()) {
-               MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front();
+               MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx();
                delete r.mesh;
        }
 
@@ -692,7 +706,7 @@ void Client::step(float dtime)
                while(!m_mesh_update_thread.m_queue_out.empty())
                {
                        num_processed_meshes++;
-                       MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_front();
+                       MeshUpdateResult r = m_mesh_update_thread.m_queue_out.pop_frontNoEx();
                        MapBlock *block = m_env.getMap().getBlockNoCreateNoEx(r.p);
                        if(block)
                        {
index 1ed80a2b0225dd58ef3c02f00c7b779b67a99cd0..1b7ad48e66d96556ecf6e182dcd6db1f6f5a0819 100644 (file)
@@ -289,6 +289,14 @@ public:
        );
        
        ~Client();
+
+       /*
+        request all threads managed by client to be stopped
+        */
+       void Stop();
+
+
+       bool isShutdown();
        /*
                The name of the local player should already be set when
                calling this, as it is sent in the initialization.
index 8f83f62192c097f696469c4162b46d073fb3a07f..bc9279649d5b0a52e49c8fc7a642617a5e35b558 100644 (file)
@@ -592,8 +592,9 @@ void * Connection::Thread()
                
                runTimeouts(dtime);
 
+               //NOTE this is only thread safe for ONE consumer thread!
                while(!m_command_queue.empty()){
-                       ConnectionCommand c = m_command_queue.pop_front();
+                       ConnectionCommand c = m_command_queue.pop_frontNoEx();
                        processCommand(c);
                }
 
@@ -1556,7 +1557,7 @@ ConnectionEvent Connection::getEvent()
                e.type = CONNEVENT_NONE;
                return e;
        }
-       return m_event_queue.pop_front();
+       return m_event_queue.pop_frontNoEx();
 }
 
 ConnectionEvent Connection::waitEvent(u32 timeout_ms)
index b751a2b627e8eef9955a5b1f0bad0fdb3e2ab6d0..aef60484fed175602bd07c212d7ec3504723e9d1 100644 (file)
@@ -813,7 +813,7 @@ public:
                services->setVertexShaderConstant("animationTimer", &animation_timer_f, 1);
 
                LocalPlayer* player = m_client->getEnv().getLocalPlayer();
-               v3f eye_position = player->getEyePosition(); 
+               v3f eye_position = player->getEyePosition();
                services->setPixelShaderConstant("eyePosition", (irr::f32*)&eye_position, 3);
                services->setVertexShaderConstant("eyePosition", (irr::f32*)&eye_position, 3);
 
@@ -1876,12 +1876,12 @@ void the_game(
                }
                else if(input->wasKeyDown(getKeySetting("keymap_screenshot")))
                {
-                       irr::video::IImage* const image = driver->createScreenShot(); 
-                       if (image) { 
-                               irr::c8 filename[256]; 
-                               snprintf(filename, 256, "%s" DIR_DELIM "screenshot_%u.png", 
+                       irr::video::IImage* const image = driver->createScreenShot();
+                       if (image) {
+                               irr::c8 filename[256];
+                               snprintf(filename, 256, "%s" DIR_DELIM "screenshot_%u.png",
                                                 g_settings->get("screenshot_path").c_str(),
-                                                device->getTimer()->getRealTime()); 
+                                                device->getTimer()->getRealTime());
                                if (driver->writeImageToFile(image, filename)) {
                                        std::wstringstream sstr;
                                        sstr<<"Saved screenshot to '"<<filename<<"'";
@@ -1891,8 +1891,8 @@ void the_game(
                                } else{
                                        infostream<<"Failed to save screenshot '"<<filename<<"'"<<std::endl;
                                }
-                               image->drop(); 
-                       }                        
+                               image->drop();
+                       }
                }
                else if(input->wasKeyDown(getKeySetting("keymap_toggle_hud")))
                {
@@ -2263,7 +2263,7 @@ void the_game(
                                                        new MainRespawnInitiator(
                                                                        &respawn_menu_active, &client);
                                        GUIDeathScreen *menu =
-                                                       new GUIDeathScreen(guienv, guiroot, -1, 
+                                                       new GUIDeathScreen(guienv, guiroot, -1,
                                                                &g_menumgr, respawner);
                                        menu->drop();
                                        
@@ -2755,7 +2755,7 @@ void the_game(
                                
                                // Sign special case, at least until formspec is properly implemented.
                                // Deprecated?
-                               if(meta && meta->getString("formspec") == "hack:sign_text_input" 
+                               if(meta && meta->getString("formspec") == "hack:sign_text_input"
                                                && !random_input
                                                && !input->isKeyDown(getKeySetting("keymap_sneak")))
                                {
@@ -3222,7 +3222,7 @@ void the_game(
 
                                driver->getOverrideMaterial().Material.ColorMask = irr::video::ECP_RED;
                                driver->getOverrideMaterial().EnableFlags  = irr::video::EMF_COLOR_MASK;
-                               driver->getOverrideMaterial().EnablePasses = irr::scene::ESNRP_SKY_BOX + 
+                               driver->getOverrideMaterial().EnablePasses = irr::scene::ESNRP_SKY_BOX +
                                                                                                                         irr::scene::ESNRP_SOLID +
                                                                                                                         irr::scene::ESNRP_TRANSPARENT +
                                                                                                                         irr::scene::ESNRP_TRANSPARENT_EFFECT +
@@ -3433,6 +3433,16 @@ void the_game(
        chat_backend.addMessage(L"", L"# Disconnected.");
        chat_backend.addMessage(L"", L"");
 
+       client.Stop();
+
+       //force answer all texture and shader jobs (TODO return empty values)
+
+       while(!client.isShutdown()) {
+               tsrc->processQueue();
+               shsrc->processQueue();
+               sleep_ms(100);
+       }
+
        // Client scope (client is destructed before destructing *def and tsrc)
        }while(0);
        } // try-catch
index 9eed045fedbc66bafe1f6bb045cc8befcf65a791..176a3b22a057792d335da6e374f4d0ee0f201926 100644 (file)
@@ -594,7 +594,7 @@ protected:
                        */
 
                        while (!m_requests.empty()) {
-                               Request req = m_requests.pop_front();
+                               Request req = m_requests.pop_frontNoEx();
                                processRequest(req);
                        }
                        processQueued(&pool);
index f77a198b5bce1e4c40ea67653262965283a8dd36..d5e03f7b367242d29471d13790779a81b74a6088 100644 (file)
@@ -642,6 +642,7 @@ public:
        void processQueue(IGameDef *gamedef)
        {
 #ifndef SERVER
+               //NOTE this is only thread safe for ONE consumer thread!
                while(!m_get_clientcached_queue.empty())
                {
                        GetRequest<std::string, ClientCached*, u8, u8>
index 70318d5da78b94373c68099fc09a986791d9b077..b62add253d64cedb295a73eecd6ec979df07cc6f 100644 (file)
@@ -36,6 +36,7 @@ public:
 
        void Post();
        void Wait();
+       bool Wait(unsigned int time_ms);
 
        int GetValue();
 
index 962b582f17004396e035e8a3c64a0e5fdd3719d8..ee14310653524b5635f25ae2bd3d202590cc080a 100644 (file)
@@ -17,8 +17,12 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
 */
 #include <assert.h>
+#include <errno.h>
+#include <sys/time.h>
 #include "jthread/jsemaphore.h"
+
 #define UNUSED(expr) do { (void)(expr); } while (0)
+
 JSemaphore::JSemaphore() {
        int sem_init_retval = sem_init(&m_semaphore,0,0);
        assert(sem_init_retval == 0);
@@ -49,6 +53,33 @@ void JSemaphore::Wait() {
        UNUSED(sem_wait_retval);
 }
 
+bool JSemaphore::Wait(unsigned int time_ms) {
+       struct timespec waittime;
+       struct timeval now;
+
+       if (gettimeofday(&now, NULL) == -1) {
+               assert("Unable to get time by clock_gettime!" == 0);
+               return false;
+       }
+
+       waittime.tv_nsec = ((time_ms % 1000) * 1000 * 1000) + (now.tv_usec * 1000);
+       waittime.tv_sec  = (time_ms / 1000) + (waittime.tv_nsec / (1000*1000*1000)) + now.tv_sec;
+       waittime.tv_nsec %= 1000*1000*1000;
+
+       errno = 0;
+       int sem_wait_retval = sem_timedwait(&m_semaphore,&waittime);
+
+       if (sem_wait_retval == 0)
+       {
+               return true;
+       }
+       else {
+               assert((errno == ETIMEDOUT) || (errno == EINTR));
+               return false;
+       }
+       return sem_wait_retval == 0 ? true : false;
+}
+
 int JSemaphore::GetValue() {
 
        int retval = 0;
index 3a1f2715ca782a61fb45d6b7f2fc11e59a110732..34167f391c4575b7eca61fb4caae13ee595f1a57 100755 (executable)
@@ -51,6 +51,21 @@ void JSemaphore::Wait() {
                        INFINITE);
 }
 
+bool JSemaphore::Wait(unsigned int time_ms) {
+       unsigned int retval = WaitForSingleObject(
+                       m_hSemaphore,
+                       time_ms);
+
+       if (retval == WAIT_OBJECT_0)
+       {
+               return true;
+       }
+       else {
+               assert(retval == WAIT_TIMEOUT);
+               return false;
+       }
+}
+
 int JSemaphore::GetValue() {
 
        long int retval = 0;
index 39296f6a3a60fcfac5cfebf1e63d55aa855f336e..d29c9d3a707e9166bd2a2a0ee26b471b2ddb254e 100644 (file)
@@ -427,21 +427,18 @@ u32 ShaderSource::getShaderId(const std::string &name)
                /* infostream<<"Waiting for shader from main thread, name=\""
                                <<name<<"\""<<std::endl;*/
 
-               try{
-                       while(true) {
-                               // Wait result for a second
-                               GetResult<std::string, u32, u8, u8>
-                                       result = result_queue.pop_front(1000);
-
-                               if (result.key == name) {
-                                       return result.item;
-                               }
+               while(true) {
+                       GetResult<std::string, u32, u8, u8>
+                               result = result_queue.pop_frontNoEx();
+
+                       if (result.key == name) {
+                               return result.item;
+                       }
+                       else {
+                               errorstream << "Got shader with invalid name: " << result.key << std::endl;
                        }
                }
-               catch(ItemNotFoundException &e){
-                       errorstream<<"Waiting for shader " << name << " timed out."<<std::endl;
-                       return 0;
-               }
+
        }
 
        infostream<<"getShaderId(): Failed"<<std::endl;
@@ -537,6 +534,7 @@ void ShaderSource::processQueue()
        /*
                Fetch shaders
        */
+       //NOTE this is only thread safe for ONE consumer thread!
        if(!m_get_shader_queue.empty()){
                GetRequest<std::string, u32, u8, u8>
                                request = m_get_shader_queue.pop();
index e003c302074b2573d2640c0596c9e1087ba23fe0..b8080c7084b9742a7749bef5b17b7e1daf872634 100644 (file)
@@ -775,6 +775,7 @@ void TextureSource::processQueue()
        /*
                Fetch textures
        */
+       //NOTE this is only thread safe for ONE consumer thread!
        if(!m_get_texture_queue.empty())
        {
                GetRequest<std::string, u32, u8, u8>
index e83c3cd375a9b7ea4f755d4aa7f9817163974641..6d836a4d55d20f5fc7751b8938d01041f5397959 100644 (file)
@@ -24,7 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include "../exceptions.h"
 #include "../jthread/jmutex.h"
 #include "../jthread/jmutexautolock.h"
-#include "../porting.h" // For sleep_ms
+#include "../jthread/jsemaphore.h"
 #include <list>
 #include <vector>
 #include <map>
@@ -201,6 +201,12 @@ public:
                ++m_list_size;
        }
        
+       void push_front(T t)
+       {
+               m_list.push_front(t);
+               ++m_list_size;
+       }
+
        T pop_front()
        {
                if(m_list.empty())
@@ -247,86 +253,141 @@ template<typename T>
 class MutexedQueue
 {
 public:
+       template<typename Key, typename U, typename Caller, typename CallerData>
+       friend class RequestQueue;
+
        MutexedQueue()
        {
        }
        bool empty()
        {
                JMutexAutoLock lock(m_mutex);
-               return m_list.empty();
+               return (m_size.GetValue() == 0);
        }
        void push_back(T t)
        {
                JMutexAutoLock lock(m_mutex);
                m_list.push_back(t);
+               m_size.Post();
        }
-       T pop_front(u32 wait_time_max_ms=0)
+
+       /* this version of pop_front returns a empty element of T on timeout.
+        * Make sure default constructor of T creates a recognizable "empty" element
+        */
+       T pop_frontNoEx(u32 wait_time_max_ms)
        {
-               u32 wait_time_ms = 0;
+               if (m_size.Wait(wait_time_max_ms))
+               {
+                       JMutexAutoLock lock(m_mutex);
 
-               for(;;)
+                       typename std::list<T>::iterator begin = m_list.begin();
+                       T t = *begin;
+                       m_list.erase(begin);
+                       return t;
+               }
+               else
                {
-                       {
-                               JMutexAutoLock lock(m_mutex);
-
-                               if(!m_list.empty())
-                               {
-                                       typename std::list<T>::iterator begin = m_list.begin();
-                                       T t = *begin;
-                                       m_list.erase(begin);
-                                       return t;
-                               }
-
-                               if(wait_time_ms >= wait_time_max_ms)
-                                       throw ItemNotFoundException("MutexedQueue: queue is empty");
-                       }
-
-                       // Wait a while before trying again
-                       sleep_ms(10);
-                       wait_time_ms += 10;
+                       return T();
                }
        }
+
+       T pop_front(u32 wait_time_max_ms)
+       {
+               if (m_size.Wait(wait_time_max_ms))
+               {
+                       JMutexAutoLock lock(m_mutex);
+
+                       typename std::list<T>::iterator begin = m_list.begin();
+                       T t = *begin;
+                       m_list.erase(begin);
+                       return t;
+               }
+               else
+               {
+                       throw ItemNotFoundException("MutexedQueue: queue is empty");
+               }
+       }
+
+       T pop_frontNoEx()
+       {
+               m_size.Wait();
+
+               JMutexAutoLock lock(m_mutex);
+
+               typename std::list<T>::iterator begin = m_list.begin();
+               T t = *begin;
+               m_list.erase(begin);
+               return t;
+       }
+
        T pop_back(u32 wait_time_max_ms=0)
        {
-               u32 wait_time_ms = 0;
+               if (m_size.Wait(wait_time_max_ms))
+               {
+                       JMutexAutoLock lock(m_mutex);
+
+                       typename std::list<T>::iterator last = m_list.end();
+                       last--;
+                       T t = *last;
+                       m_list.erase(last);
+                       return t;
+               }
+               else
+               {
+                       throw ItemNotFoundException("MutexedQueue: queue is empty");
+               }
+       }
+
+       /* this version of pop_back returns a empty element of T on timeout.
+        * Make sure default constructor of T creates a recognizable "empty" element
+        */
+       T pop_backNoEx(u32 wait_time_max_ms=0)
+       {
+               if (m_size.Wait(wait_time_max_ms))
+               {
+                       JMutexAutoLock lock(m_mutex);
 
-               for(;;)
+                       typename std::list<T>::iterator last = m_list.end();
+                       last--;
+                       T t = *last;
+                       m_list.erase(last);
+                       return t;
+               }
+               else
                {
-                       {
-                               JMutexAutoLock lock(m_mutex);
-
-                               if(!m_list.empty())
-                               {
-                                       typename std::list<T>::iterator last = m_list.end();
-                                       last--;
-                                       T t = *last;
-                                       m_list.erase(last);
-                                       return t;
-                               }
-
-                               if(wait_time_ms >= wait_time_max_ms)
-                                       throw ItemNotFoundException("MutexedQueue: queue is empty");
-                       }
-
-                       // Wait a while before trying again
-                       sleep_ms(10);
-                       wait_time_ms += 10;
+                       return T();
                }
        }
 
+       T pop_backNoEx()
+       {
+               m_size.Wait();
+
+               JMutexAutoLock lock(m_mutex);
+
+               typename std::list<T>::iterator last = m_list.end();
+               last--;
+               T t = *last;
+               m_list.erase(last);
+               return t;
+       }
+
+protected:
        JMutex & getMutex()
        {
                return m_mutex;
        }
 
+       // NEVER EVER modify the >>list<< you got by using this function!
+       // You may only modify it's content
        std::list<T> & getList()
        {
                return m_list;
        }
 
-protected:
        JMutex m_mutex;
        std::list<T> m_list;
+       JSemaphore m_size;
 };
 
 #endif
index bb8e03317b8a685a2c3834b72903d85ee1fee175..8b3c33621c494f3b5f2b7d73a1635c6372a6cad9 100644 (file)
@@ -24,6 +24,7 @@ with this program; if not, write to the Free Software Foundation, Inc.,
 #include "../jthread/jthread.h"
 #include "../jthread/jmutex.h"
 #include "../jthread/jmutexautolock.h"
+#include "porting.h"
 
 template<typename T>
 class MutexedVariable
@@ -123,36 +124,38 @@ public:
        void add(Key key, Caller caller, CallerData callerdata,
                        ResultQueue<Key, T, Caller, CallerData> *dest)
        {
-               JMutexAutoLock lock(m_queue.getMutex());
-               
-               /*
-                       If the caller is already on the list, only update CallerData
-               */
-               for(typename std::list< GetRequest<Key, T, Caller, CallerData> >::iterator
-                               i = m_queue.getList().begin();
-                               i != m_queue.getList().end(); ++i)
                {
-                       GetRequest<Key, T, Caller, CallerData> &request = *i;
-
-                       if(request.key == key)
+                       JMutexAutoLock lock(m_queue.getMutex());
+
+                       /*
+                               If the caller is already on the list, only update CallerData
+                       */
+                       for(typename std::list< GetRequest<Key, T, Caller, CallerData> >::iterator
+                                       i = m_queue.getList().begin();
+                                       i != m_queue.getList().end(); ++i)
                        {
-                               for(typename std::list< CallerInfo<Caller, CallerData, Key, T> >::iterator
-                                               i = request.callers.begin();
-                                               i != request.callers.end(); ++i)
+                               GetRequest<Key, T, Caller, CallerData> &request = *i;
+
+                               if(request.key == key)
                                {
-                                       CallerInfo<Caller, CallerData, Key, T> &ca = *i;
-                                       if(ca.caller == caller)
+                                       for(typename std::list< CallerInfo<Caller, CallerData, Key, T> >::iterator
+                                                       i = request.callers.begin();
+                                                       i != request.callers.end(); ++i)
                                        {
-                                               ca.data = callerdata;
-                                               return;
+                                               CallerInfo<Caller, CallerData, Key, T> &ca = *i;
+                                               if(ca.caller == caller)
+                                               {
+                                                       ca.data = callerdata;
+                                                       return;
+                                               }
                                        }
+                                       CallerInfo<Caller, CallerData, Key, T> ca;
+                                       ca.caller = caller;
+                                       ca.data = callerdata;
+                                       ca.dest = dest;
+                                       request.callers.push_back(ca);
+                                       return;
                                }
-                               CallerInfo<Caller, CallerData, Key, T> ca;
-                               ca.caller = caller;
-                               ca.data = callerdata;
-                               ca.dest = dest;
-                               request.callers.push_back(ca);
-                               return;
                        }
                }
 
@@ -168,12 +171,17 @@ public:
                ca.dest = dest;
                request.callers.push_back(ca);
                
-               m_queue.getList().push_back(request);
+               m_queue.push_back(request);
+       }
+
+       GetRequest<Key, T, Caller, CallerData> pop(unsigned int timeout_ms)
+       {
+               return m_queue.pop_front(timeout_ms);
        }
 
-       GetRequest<Key, T, Caller, CallerData> pop(bool wait_if_empty=false)
+       GetRequest<Key, T, Caller, CallerData> pop()
        {
-               return m_queue.pop_front(wait_if_empty);
+               return m_queue.pop_frontNoEx();
        }
 
        void pushResult(GetRequest<Key, T, Caller, CallerData> req,