Straightforward and flexible concurrent task queue library in modern >= C++20. It's thread-safe with support for single or multiple message types, customizable container backends, and configurable worker thread pools. It's header-only library.
- Overview
- Requirements
- Installation
- Core Components
- Usage
- Test Coverage
- Project Structure
- API Reference
- Thread Safety
CTQ provides a lightweight concurrent task queue implementation that allows you to define queues with:
- Custom container types (e.g.,
std::vector,std::list, customcircular_buffer) cirular_bufferis included into the libary- Single or multiple message types with type-safe dispatching to corresponding tasks (callbacks)
- Configurable number of worker threads
- Optional queue size limits with blocking behavior
#include "ctq/task_queue.h"
#include <vector>
#include <string>
#include <iostream>
int main() {
// Create a queue that handles multiple types
// with callbacks for each type
ctq::task_queue<std::vector, int, std::string, double> queue(
{
[](int n) { std::cout << "Int: " << n << std::endl; },
[](std::string s) { std::cout << "String: " << s << std::endl; },
[](double d) { std::cout << "Double: " << d << std::endl; }
},
std::nullopt, // No max size
2 // two worker threads
);
queue.push(42);
queue.push(std::string("hello"));
queue.push(3.14);
queue.push(100);
... // Workers process the queue in the background
// As a worker thread (std::jthread) becomes available,
// it picks the next item from the queue and invokes
// the corresponding callback based on the item type.
//
// The queue destructor will join all threads automatically.
}- C++20 compatible compiler
- CMake 3.15 or higher (for building tests)
- Google Test (for running unit tests)
git clone https://github.com/egfinch/ctq.git
cd ctq
mkdir build
cd build
cmake ..
make installNow you can include CTQ in your project by adding the following to your CMakeList.txt:
find_package(ctq REQUIRED)
target_link_libraries(your_target PRIVATE ctq::ctq)Since CTQ is header-only, you can simply copy the include/ctq directory to your project or add it to your include path:
cp -r include/ctq /path/to/your/project/include/To build and run the tests, in the build (after running cmake, above) execute:
make ctq_test
./ctq_testA simple high-level wrapper that:
- Supports single or multiple message types
- Uses
std::variantinternally for multi-type support - Maps each type to its corresponding callback function
- Provides a simple
push()interface - Works with
std::vector,std::list,std::deque, andcircular_buffer
Supported Containers:
std::vector<T>std::list<T>std::deque<T>ctq::circular_buffer<T>- your custom container with required interface
A simple circular buffer implementation with:
- Fixed capacity
- FIFO semantics
- Methods:
push_back(),pop_front(),emplace_back() - Additional
next()method (pop and return) empty(),size(),capacity(), andfront()queries- Can be used as underlying container for
basic_task_queue
The core task queue implementation (used by task_queue):
- Manages a pool of worker threads using
std::jthread - Processes items from the queue using a provided callback function
- Supports optional maximum queue size with blocking behavior
- RAII, automatically stops workers on destruction
#include "ctq/task_queue.h"
#include <vector>
#include <iostream>
int main() {
// Create a task queue with a single callback for integers
ctq::task_queue<std::vector, int> queue(
[](int n) {
std::cout << "Processing: " << n << std::endl;
},
2 // 2 worker threads
);
// Add tasks to the queue
queue.push(42);
queue.push(100);
queue.push(7);
// Workers process tasks in the background
// Queue destructor will wait for all tasks to complete
}Uses std::variant internally to handle multiple types with corresponding callbacks.
#include "ctq/task_queue.h"
#include <vector>
#include <string>
#include <iostream>
int main() {
// Create a queue that handles multiple types
ctq::task_queue<std::vector, int, std::string, double> queue(
{
[](int n) { std::cout << "Int: " << n << std::endl; },
[](std::string s) { std::cout << "String: " << s << std::endl; },
[](double d) { std::cout << "Double: " << d << std::endl; }
},
std::nullopt, // No max size
3 // 3 worker threads
);
queue.push(42);
queue.push(std::string("hello"));
queue.push(3.14);
queue.push(100);
}#include "ctq/task_queue.h"
#include <vector>
int main() {
// Create a queue with maximum 10 items
// Pushing to a full queue will block until space is available
ctq::task_queue<std::vector, int> queue(
[](int n) { /* process */ },
10, // max 10 items
2 // 2 workers
);
for (int i = 0; i < 100; ++i) {
queue.push(i); // May block if queue is full
}
}#include "ctq/task_queue.h"
#include <vector>
int main() {
ctq::basic_task_queue<std::vector<int>> queue(
[](int item) {
// Process item
},
std::nullopt, // No max size
4 // 4 worker threads
);
queue.push(1);
queue.emplace(2);
queue.push(3);
}The library supports multiple container types as the underlying queue storage:
#include "ctq/task_queue.h"
#include <list>
ctq::task_queue<std::list, int> queue(
[](int n) { /* process */ },
2 // workers
);
queue.push(42);#include "ctq/task_queue.h"
#include <deque>
ctq::task_queue<std::deque, int> queue(
[](int n) { /* process */ },
3 // workers
);
queue.push(42);#include "ctq/task_queue.h"
ctq::basic_task_queue<ctq::circular_buffer<int>> queue(
[](int n) { /* process */ },
100, // circular_buffer capacity
2 // workers
);
queue.push(42);The access_queue method provides thread-safe access to the underlying queue container. This is useful when you need to inspect or manipulate the queue directly, such as checking its size, clearing it, or performing custom operations. The provided function is executed with the queue's internal mutex locked, ensuring thread safety.
#include "ctq/task_queue.h"
#include <vector>
#include <iostream>
int main() {
ctq::task_queue<std::vector, int> queue(
[](int n) {
// Process items slowly
std::this_thread::sleep_for(std::chrono::milliseconds(100));
},
2 // workers
);
// Add some items
for (int i = 0; i < 10; ++i) {
queue.push(i);
}
// Check queue size in a thread-safe way
queue.access_queue([](auto& q) {
std::cout << "Current queue size: " << q.size() << std::endl;
});
// Clear the queue if needed
queue.access_queue([](auto& q) {
q.clear();
std::cout << "Queue cleared. New size: " << q.size() << std::endl;
});
}Important: The function passed to access_queue should be quick to execute, as it holds the queue's mutex and blocks all queue operations while running.
The unit test suite (test/ctq_test.cpp) includes comprehensive tests for all components:
- Constructor and capacity verification
push_back()and size trackingnext()method (pop and return)pop_front()operation- Circular wrapping behavior
emplace_back()operation- Complex type support
front()method verification
- Basic callback execution
- Multiple worker threads
- Queue size constraints and blocking behavior
emplace()method- Processing order verification
- Complex type handling
- Single type queue operations
- Multiple type queue with
std::variant - Max element constraints
- Multiple worker threads
- Complex multi-type scenarios
- Callback routing for different types
- std::list: Basic operations, single/multi-type queues, bounded queues, complex types
- std::deque tests: Basic operations, single/multi-type queues, bounded queues, order preservation
- circular_buffer tests: Basic operations, multiple workers, bounded behavior, complex types, order preservation
- Cross-container tests: Verification that all containers produce identical results
The tests verify:
- Thread safety with atomic counters and mutexes
- Correct callback invocation
- FIFO ordering (with single worker)
- Concurrent processing with multiple workers
- Blocking behavior on bounded queues
- Proper cleanup on destruction
ctq/
├── include/
│ └── ctq/
│ ├── circular_buffer.h # Circular buffer implementation
│ └── task_queue.h # Task queue implementations
├── test/
│ └── ctq_test.cpp # Comprehensive unit tests
├── CMakeLists.txt # CMake configuration
└── README.md # This file
Supported Containers:
std::vector<T>std::list<T>std::deque<T>ctq::circular_buffer<T>- your custom container with required interface
Constructor:
task_queue(callbacks cb, std::optional<size_t> max_elements, size_t workers = 1)task_queue(callbacks cb, size_t workers = 1) //Unbounded queue constructor
Note: Unbounded queue constructor is equivalent to passing std::nullopt for max_elements
Note: callbacks is std::function<void(Ts)>... for each type Ts
Methods:
void push(type item)- Add item to queuevoid emplace(Args&&... args)- Construct item in placevoid access_queue(std::function<void(queue&)> f)- Thread-safe queue access
Methods:
circular_buffer(size_t max_size)- Constructorvoid push_back(T&& v)- Add item to buffervoid emplace_back(Args&&... args)- Construct item in placeT next()- Get and remove front itemvoid pop_front()- Remove front itemT front()- Get front item without removingsize_t size() const- Get current sizesize_t capacity() const- Get maximum capacitybool empty() const- Check if empty
Note: Can be used as a container for task_queue
Constructor:
basic_task_queue(callback cb, std::optional<size_t> max_elements, size_t workers = 1)
Methods:
void push(type item)- Add item to queue (may block if bounded)void emplace(Args&&... args)- Construct item in placevoid access_queue(std::function<void(queue&)> f)- Thread-safe queue access
All queue operations are thread-safe:
- Multiple producers can call
put()/push()concurrently - Multiple workers process items concurrently