#include #include "pooledthrd.h" ////////////////////////////////////////////////////////// PPooledThread::PPooledThread( PINDEX stackSize, int deletion, int priorityLevel, const PString & threadName, PPooledThreadManager * manager ) : PPooledThreadManager::PPooledThreadBase( manager, deletion==PThread::AutoDeleteThread?TRUE:FALSE ) { SetPriority( (PThread::Priority)priorityLevel ); SetThreadName( threadName ); } PPooledThread::~PPooledThread() { } void PPooledThread::Main() { } PPooledThread * PPooledThread::Current() { PThreadIdentifer pthreadId = PThread::GetCurrentThreadId(); PPooledThreadManager::PPooledBaseThread * pooledThread = get_singleton().GetPooledThread( pthreadId ); if( pooledThread == NULL ) return NULL; PString baseThreadId = pooledThread->GetBaseThreadId(); return (PPooledThread *)::get_singleton().GetBaseThread( baseThreadId ); return NULL; } void PPooledThread::PrintOn( ostream & strm /// Stream to output text representation ) const { if( m_Thread != NULL ) m_Thread->PrintOn( strm ); } void PPooledThread::Restart() { /// DO NOTHING! } BOOL PPooledThread::WaitForTermination( DWORD wait ) { return PPooledThreadBase::WaitForTermination( wait ); } void PPooledThread::Terminate() { /// DO NOTHING! } BOOL PPooledThread::IsTerminated() { /// Pooled htreads never terminate return WaitForTermination( 1 ); } void PPooledThread::Suspend( BOOL susp /// Flag to suspend or resume a thread. ) { if( m_Thread != NULL ) m_Thread->Suspend(); } BOOL PPooledThread::IsSuspended() const { if( m_Thread != NULL ) return m_Thread->IsSuspended(); return TRUE; } /// Suspend the current thread for the specified amount of time. void PPooledThread::Sleep( const PTimeInterval & delay /// Time interval to sleep for. ) { PThread::Sleep( delay ); } void PPooledThread::SetPriority( PThread::Priority priorityLevel /// New priority for thread. ) { if( m_Thread != NULL ) m_Thread->SetPriority( priorityLevel ); } PThread::Priority PPooledThread::GetPriority() const { if( m_Thread != NULL ) return m_Thread->GetPriority(); return PThread::NormalPriority; } void PPooledThread::SetAutoDelete( PThread::AutoDeleteFlag deletion /// New auto delete setting. ) { m_IsAutoDelete = ( deletion == PThread::AutoDeleteThread ? TRUE: FALSE ); } PString PPooledThread::GetThreadName() const { if( m_Thread != NULL ) return m_Thread->GetThreadName(); return ""; } void PPooledThread::SetThreadName( const PString & name /// New name for the thread. ) { if( m_Thread != NULL ) m_Thread->SetThreadName( name ); } PThreadIdentifer PPooledThread::GetThreadId() const { if( m_Thread != NULL ) return m_Thread->GetThreadId(); return 0; } PThreadIdentifer PPooledThread::GetCurrentThreadId() { return PThread::GetCurrentThreadId(); } void PPooledThread::Yield() { PThread::Yield(); } PPooledThread * PPooledThread::Create( const PNotifier & notifier, /// Function to execute in thread. INT parameter, /// Parameter value to pass to notifier. int deletion, /// Automatically delete PThread instance on termination of thread. int priorityLevel, /// Initial priority of thread. const PString & threadName, /// The name of the thread (for Debug/Trace) PINDEX stackSize /// Stack size on some platforms ) { PPooledThread * thread = new PPooledCallbackThread(notifier, parameter, deletion, priorityLevel, threadName, stackSize); if (deletion != PThread::AutoDeleteThread) return thread; // Do not return a pointer to the thread if it is auto-delete as this // pointer is extremely dangerous to use, it could be deleted at any moment // from now on so using the pointer could crash the program. return NULL; } //////////////////////////////////////////////////////////////////////////////// PPooledCallbackThread::PPooledCallbackThread(const PNotifier & notifier, INT param, int deletion, int priorityLevel, const PString & threadName, PINDEX stackSize) : PPooledThread(stackSize, deletion, priorityLevel, threadName), callback(notifier), parameter(param) { Resume(); } void PPooledCallbackThread::Main() { callback(*this, parameter); } ////////////////////////////////////////////////////////////////////////////////// PPooledThreadManager::PPooledThreadManager() { m_BaseThreadId = 0; m_MaxPoolSize = 1000; m_LastUsedThreadIndex = 0; m_PermanentThreads.DisallowDeleteObjects(); m_BaseThreads.DisallowDeleteObjects(); m_SurplusThreads.DisallowDeleteObjects(); m_ActiveThreads.DisallowDeleteObjects(); int initSize = 5; int stackSize = 30000; for( int i = 0; i < initSize; i++ ) { PPooledBaseThread * thrd = new PPooledBaseThread( *this, stackSize ); m_PermanentThreads.Append( thrd ); m_FreeThreads.Enqueue( thrd ); } } PPooledThreadManager::~PPooledThreadManager() { m_SyncPoint.Signal(); for(;;) { if( m_PermanentThreads.GetSize() == 0 ) break; PPooledBaseThread * thrd = (PPooledBaseThread *)m_PermanentThreads.RemoveAt( 0 ); thrd->DestroyThread(); delete thrd; } for(;;) { if( m_SurplusThreads.GetSize() == 0 ) break; PPooledBaseThread * thrd = (PPooledBaseThread *)m_SurplusThreads.RemoveAt( 0 ); thrd->DestroyThread(); delete thrd; } } PPooledThreadManager::PPooledBaseThread * PPooledThreadManager::GetPooledThread( const PThreadIdentifer & threadId ) { PPooledThreadManager::PPooledBaseThread * pooledThread = NULL; PWaitAndSignal lock( m_ThreadPoolMutex ); for( PINDEX i = 0; i < m_PermanentThreads.GetSize(); i++ ) { if( m_PermanentThreads[i].GetThreadId() == threadId ) { pooledThread = (PPooledThreadManager::PPooledBaseThread*)m_PermanentThreads.GetAt( i ); break; } } return pooledThread; } PPooledThreadManager::PPooledBaseThread * PPooledThreadManager::ExecuteInThread( const PString & baseThreadId ) { if( m_PermanentThreads.GetSize() == 0 ) return NULL; PWaitAndSignal lock( m_ThreadPoolMutex ); PPooledBaseThread * thrd = m_FreeThreads.Dequeue(); if( thrd != NULL ) { thrd->SetBaseThreadId( baseThreadId ); return thrd; } if( (unsigned)m_PermanentThreads.GetSize() < m_MaxPoolSize ) { /// Grow the pool thrd = new PPooledBaseThread( *this ); thrd->SetBaseThreadId( baseThreadId ); m_PermanentThreads.Append( thrd ); return thrd; } /// Surplus thread thrd = new PPooledBaseThread( *this, TRUE /* Auto delete */ ); m_SurplusThreadsMutex.Wait(); m_SurplusThreads.Append( thrd ); m_SurplusThreadsMutex.Signal(); thrd->SetBaseThreadId( baseThreadId ); return thrd; } void PPooledThreadManager::DeleteSurplusThread( PPooledBaseThread * thrd ) { PWaitAndSignal lock( m_SurplusThreadsMutex ); m_SurplusThreads.Remove( thrd ); thrd->DestroyThread(); } void PPooledThreadManager::EnqueueThread( PPooledBaseThread * thrd ) { PWaitAndSignal lock( m_ThreadPoolMutex ); m_FreeThreads.Enqueue( thrd ); } PPooledThreadManager::PPooledBaseThread * PPooledThreadManager::DequeueThread() { PWaitAndSignal lock( m_ThreadPoolMutex ); return m_FreeThreads.Dequeue(); } PString PPooledThreadManager::GetNextThreadId() { PWaitAndSignal lock( m_ThreadIdMutex ); if( m_BaseThreadId == UINT_MAX ) m_BaseThreadId = 0; PString id( ++m_BaseThreadId ); return id; } void PPooledThreadManager::AddBaseThread( PPooledThreadBase * thrd ) { PWaitAndSignal lock( m_BaseThreadMutex ); m_BaseThreads.SetAt( thrd->GetId(), thrd ); } PPooledThreadManager::PPooledThreadBase * PPooledThreadManager::GetBaseThread( const PString & id ) { PWaitAndSignal lock( m_BaseThreadMutex ); return (PPooledThreadBase *)m_BaseThreads.GetAt( id ); } void PPooledThreadManager::RemoveBaseThread( const PString & id ) { PWaitAndSignal lock( m_BaseThreadMutex ); m_BaseThreads.RemoveAt( id ); } void PPooledThreadManager::AddActiveThread( PPooledBaseThread * thrd ) { PWaitAndSignal lock( m_ActiveThreadMutex ); m_ActiveThreads.Append( thrd ); } void PPooledThreadManager::RemoveActiveThread( PPooledBaseThread * thrd ) { PWaitAndSignal lock( m_ActiveThreadMutex ); m_ActiveThreads.Remove( thrd ); } ///////////////////////////////////////////////////////////////////////////////// PPooledThreadManager::PPooledBaseThread::PPooledBaseThread( PPooledThreadManager & manager, int stackSize, BOOL autoDeleteThread ) : PThread( stackSize, NoAutoDeleteThread, NormalPriority, "" ), m_Manager( manager ) { m_AutoDeleteThread = autoDeleteThread; m_ExitThread = FALSE; m_IsBusy = FALSE; } PPooledThreadManager::PPooledBaseThread::~PPooledBaseThread() { m_ExitThread = TRUE; Signal(); } void PPooledThreadManager::PPooledBaseThread::DestroyThread() { m_ExitThread = TRUE; Signal(); if( IsPermanentThread() && m_IsBusy ) { PAssert( WaitForTermination( 10000 ), "vxThread: " + GetThreadName() + " failed to terminate" ); } } void PPooledThreadManager::PPooledBaseThread::Main() { for(;;) { m_SyncPoint.Wait(); if( m_ExitThread ) { m_CompleteEvent.SetEvent(); return; } m_IsBusy = TRUE; PPooledThreadBase * base = m_Manager.GetBaseThread( m_BaseThreadId ); if( base == NULL ) { m_IsBusy = FALSE; m_CompleteEvent.SetEvent(); continue; } base->m_InnerMutex.Wait(); m_Manager.AddActiveThread( this ); base->InternalMain(); Yield(); m_Manager.RemoveActiveThread( this ); m_Manager.RemoveBaseThread( m_BaseThreadId ); if( base->IsAutoDelete() ) { base->m_InnerMutex.Signal(); delete base; }else{ base->m_InnerMutex.Signal(); } m_IsBusy = FALSE; m_CompleteEvent.SetEvent(); m_CompleteEvent.ResetEvent(); if( !IsPermanentThread() ) { /// This is a surplus thread so delete it m_Manager.DeleteSurplusThread( this ); return; } /// return this thread to the pool m_Manager.EnqueueThread( this ); } } BOOL PPooledThreadManager::PPooledBaseThread::IsBusy() { return m_IsBusy; } void PPooledThreadManager::PPooledBaseThread::Signal() { m_SyncPoint.Signal(); } BOOL PPooledThreadManager::PPooledBaseThread::IsPermanentThread() { return !m_AutoDeleteThread; } BOOL PPooledThreadManager::PPooledBaseThread::WaitForCompleteEvent( DWORD wait ) { if( wait == 0 ) return m_CompleteEvent.WaitEvent(); return m_CompleteEvent.WaitEvent( wait ); } //////////////////////////////////////////////////////////////////////////////////// PPooledThreadManager::PPooledThreadBase::PPooledThreadBase( PPooledThreadManager * manager, BOOL autoDelete ) { if( manager == NULL ) manager = ::get_singleton_instance(); m_Manager = manager; m_IsAutoDelete = autoDelete; m_Id = m_Manager->GetNextThreadId(); m_Manager->AddBaseThread( this ); m_Thread = m_Manager->ExecuteInThread( m_Id ); PAssertNULL( m_Thread ); } PPooledThreadManager::PPooledThreadBase::~PPooledThreadBase() { } void PPooledThreadManager::PPooledThreadBase::InternalMain() { Main(); m_Thread = NULL; } void PPooledThreadManager::PPooledThreadBase::Main() { } BOOL PPooledThreadManager::PPooledThreadBase::Resume() { PAssertNULL( m_Thread ); if( m_Thread!= NULL ) { m_Thread->Resume(); m_Thread->Signal(); return TRUE; } return FALSE; } void PPooledThreadManager::PPooledThreadBase::Suspend( BOOL susp ) { if( m_Thread!= NULL ) { m_Thread->Suspend( susp ); } } BOOL PPooledThreadManager::PPooledThreadBase::WaitForTermination( DWORD wait ) { if( m_Thread == NULL ) return TRUE; return m_Thread->WaitForCompleteEvent( wait ); } ///////////////////////////////////////////////////////////////////// PEventSync::PEventSync() { triggered = FALSE; } BOOL PEventSync::WaitEvent( const PTimeInterval & waitTime ) { if( !mutex.Wait( waitTime ) ) return FALSE; if( triggered ) { mutex.Signal(); return TRUE; } BOOL ok = syncPoint.Wait( waitTime ); mutex.Signal(); return ok; } void PEventSync::SetEvent() { triggered = TRUE; syncPoint.Signal(); } void PEventSync::ResetEvent() { PWaitAndSignal lock( mutex ); triggered = FALSE; } void PEventSync::SetAndResetEvent() { syncPoint.Signal(); PWaitAndSignal lock( mutex ); triggered = FALSE; }