SkillAgentSearch skills...

StdWorkerThread

C++ std::thread Event Loop with Message Queue and Timer

Install / Use

/learn @endurodave/StdWorkerThread
About this skill

Quality Score

0/100

Supported Platforms

Universal

README

License MIT conan Ubuntu conan Ubuntu conan Windows

C++ std::thread Event Loop with Message Queue and Timer

Create a worker thread with an event loop, message queue and a timer using the C++11 thread support library.

Table of Contents

Preface

Originally published on CodeProject at: <a href="http://www.codeproject.com/Articles/1169105/Cplusplus-std-thread-Event-Loop-with-Message-Queue"><strong>C++ std::thread Event Loop with Message Queue and Timer</strong></a>

Getting Started

CMake is used to create the project build files on any Windows or Linux machine. The source code works on any C++ compiler with std::thread support.

  1. Clone the repository.
  2. From the repository root, run the following CMake command:
    cmake -B Build .
  3. Build and run the project within the Build directory.

Introduction

<p>An event loop, or sometimes called a message loop, is a thread that waits for and dispatches incoming events. The thread blocks waiting for requests to arrive and then dispatches the event to an event handler function. A message queue is typically used by the loop to hold incoming messages. Each message is sequentially dequeued, decoded, and then an action is performed. Event loops are one way to implement inter-process communication.</p> <p>All operating systems provide support for multi-threaded applications. Each OS has unique function calls for creating threads, message queues and timers. With the advent of the C++11 thread support library, it&rsquo;s now possible to create portable code and avoid the OS-specific function calls. This article provides a simple example of how to create a thread event loop, message queue and timer services while only relying upon the C++ Standard Library. Any C++11 compiler supporting the thread library should be able to compile the attached source.</p>

Background

<p>Typically, I need a thread to operate as an event loop. Incoming messages are dequeued by the thread and data is dispatched to an appropriate function handler based on a unique message identifier. Timer support capable of invoking a function is handy for low speed polling or to generate a timeout if something doesn&rsquo;t happen in the expected amount of time. Many times, the worker thread is created at startup and isn&rsquo;t destroyed until the application terminates.</p> <p>A key requirement for the implementation is that the incoming messages must execute on the same thread instance. Whereas say <code>std::async </code>may use a temporary thread from a pool, this class ensures that all incoming messages use the same thread. For instance, a subsystem could be implemented with code that is not thread-safe. A single <code>WorkerThread </code>instance is used to safely dispatch function calls into the subsystem.</p> <p>At first glance, the C++ thread support seems to be missing some key features. Yes, <code>std::thread </code>is available to spin off a thread but there is no thread-safe queue and no timers &ndash; services that most OS&rsquo;s provide. I&rsquo;ll show how to use the C++ Standard Library to create these &ldquo;missing&rdquo; features and provide an event processing loop familiar to many programmers.</p>

WorkerThread

<p>The <code>WorkerThread </code>class encapsulates all the necessary event loop mechanisms. A simple class interface allows thread creation, posting messages to the event loop, and eventual thread termination. The interface is shown below:</p> <pre lang="C++"> class WorkerThread { public: &nbsp; &nbsp; /// Constructor &nbsp; &nbsp; WorkerThread(const char* threadName); &nbsp; &nbsp; /// Destructor &nbsp; &nbsp; ~WorkerThread(); &nbsp; &nbsp; /// Called once to create the worker thread &nbsp; &nbsp; /// @return True if thread is created. False otherwise.&nbsp; &nbsp; &nbsp; bool CreateThread(); &nbsp; &nbsp; /// Called once a program exit to exit the worker thread &nbsp; &nbsp; void ExitThread(); &nbsp; &nbsp; /// Get the ID of this thread instance &nbsp; &nbsp; /// @return The worker thread ID &nbsp; &nbsp; std::thread::id GetThreadId(); &nbsp; &nbsp; /// Get the ID of the currently executing thread &nbsp; &nbsp; /// @return The current thread ID &nbsp; &nbsp; static std::thread::id GetCurrentThreadId(); &nbsp; &nbsp; /// Add a message to the thread queue &nbsp; &nbsp; /// @param[in] data - thread specific message information &nbsp; &nbsp; void PostMsg(std::shared_ptr&lt;UserData&gt; msg); private: &nbsp; &nbsp; WorkerThread(const WorkerThread&amp;) = delete; &nbsp; &nbsp; WorkerThread&amp; operator=(const WorkerThread&amp;) = delete; &nbsp; &nbsp; /// Entry point for the worker thread &nbsp; &nbsp; void Process(); &nbsp; &nbsp; /// Entry point for timer thread &nbsp; &nbsp; void TimerThread(); &nbsp; &nbsp; std::unique_ptr&lt;std::thread&gt; m_thread; &nbsp; &nbsp; std::queue&lt;std::shared_ptr&lt;ThreadMsg&gt;&gt; m_queue; &nbsp; &nbsp; std::mutex m_mutex; &nbsp; &nbsp; std::condition_variable m_cv; &nbsp; &nbsp; std::atomic&lt;bool&gt; m_timerExit; &nbsp; &nbsp; const char* THREAD_NAME; };</pre> <p>The first thing to notice is that <code>std::thread </code>is used to create a main worker thread. The main worker thread function is <code>Process()</code>.</p> <pre lang="C++"> bool WorkerThread::CreateThread() { if (!m_thread) m_thread = new thread(&amp;WorkerThread::Process, this); return true; }</pre>

Event Loop

<p>The <code>Process() </code>event loop is shown below. The thread relies upon a <code>std::queue&lt;ThreadMsg*&gt; </code>for the message queue. <code>std::queue </code>is not thread-safe so all access to the queue must be protected by mutex. A <code>std::condition_variable </code>is used to suspend the thread until notified that a new message has been added to the queue.</p> <pre lang="C++"> void WorkerThread::Process() { &nbsp; &nbsp; m_timerExit = false; &nbsp; &nbsp; std::thread timerThread(&amp;WorkerThread::TimerThread, this); &nbsp;&nbsp; &nbsp;while (1) &nbsp;&nbsp; &nbsp;{ &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;std::shared_ptr&lt;ThreadMsg&gt; msg; &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;{ &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;// Wait for a message to be added to the queue &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;std::unique_lock&lt;std::mutex&gt; lk(m_mutex); &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;while (m_queue.empty()) &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;m_cv.wait(lk); &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;if (m_queue.empty()) &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;continue; &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;msg = m_queue.front(); &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;m_queue.pop(); &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;} &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;switch (msg-&gt;id) &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;{ &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;case MSG_POST_USER_DATA: &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;{ &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;ASSERT_TRUE(msg-&gt;msg != NULL); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; auto userData = std::static_pointer_cast&lt;UserData&gt;(msg-&gt;msg); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cout &lt;&lt; userData-&gt;msg.c_str() &lt;&lt; &quot; &quot; &lt;&lt; userData-&gt;year &lt;&lt; &quot; on &quot; &lt;&lt; THREAD_NAME &lt;&lt; endl; &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;break; &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;} &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; case MSG_TIMER: &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; cout &lt;&lt; &quot;Timer expired on &quot; &lt;&lt; THREAD_NAME &lt;&lt; endl; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; break; &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;case MSG_EXIT_THREAD: &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;{ &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; m_timerExit = true; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; timerThread.join(); &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; return; &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;} &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;default: &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;ASSERT(); &nbsp;&nbsp; &nbsp;&nbsp;&nbsp; &nbsp;} &nbsp;&nbsp; &nbsp;} }</pre> <p><code>PostMsg() </code>creates a new <code>ThreadMsg </code>on the heap, adds the message to the queue, and then notifies the worker thread using a condition variable.</p> <pre lang="C++"> void WorkerThread::PostMsg(std::shared_ptr&lt;UserData&gt; data) { &nbsp;&nbsp; &nbsp;ASSERT
View on GitHub
GitHub Stars51
CategoryDevelopment
Updated6d ago
Forks17

Languages

C++

Security Score

100/100

Audited on Mar 21, 2026

No findings