Last Update:
C++20, Spans, Threads and Fun
Table of Contents
In this post, we’ll have fun using C++20’s spans to process data on multiple threads. What’s more, we’ll be equipped with the latest concurrency features from C++20.
This text was motivated by the following comment under my recent article on std::span
:
But why does this article… not show the major use case? A span is non-owning, so passing it to a function it can work on a subset of data owned by someone else! You can have a vector of data, and let two threads process two halves of it! Brilliant! By VictorEijkhout
Starting slow with std::thread
Our task is relatively simple but can be extended to various multi-phase computations.
We need to initialize a container with numbers, filter them, then build a histogram.
Here’s the overview of the process:
Here’s a slow start with std::thread
constexpr int DATA_SIZE = 1000;
constexpr int MAX_NUM = 256;
std::vector<int> filteredNumbers;
std::mutex filteredNumbersMutex;
void initializeNumbers(std::span<int> numbers) {
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(0, MAX_NUM);
for (auto& num : numbers)
num = distrib(gen);
}
void filterNumbers(std::span<int> numbers) {
std::vector<int> local;
std::ranges::copy_if(numbers, std::back_inserter(local),
[](int v) {return v % 2 == 0; });
std::lock_guard<std::mutex> lock(filteredNumbersMutex);
filteredNumbers.insert(filteredNumbers.end(), local.begin(), local.end());
}
void analyzeNumbers(std::span<int> numbers, std::map<int, int>& outHistogram) {
for (int num : numbers)
outHistogram[num]++;
}
Let’s break down the code:
- The code initiates the process by generating a series of random numbers. This is done in the
initializeNumbers
function, which takes astd::span<int>
as its argument. This span points to a portion of the main data vector, allowing the function to work independently on either the first or second half of the vector. Inside the function, a random number generator is used to populate the elements of the span with random integers. This demonstrates howstd::span
enables efficient and safe operations on container slices without explicitly copying data or managing pointers. - After initializing the numbers, the next phase is to filter them based on a specific criterion, which, in this case, is selecting even numbers. The
filterNumbers
function again operates on a span of the main vector. It uses thestd::ranges::copy_if
algorithm to select numbers that meet the criterion (even numbers) and copies them into a local vector. This local vector is then merged into a sharedfilteredNumbers
vector, with thread safety ensured by a mutex. This step illustrates how spans can be used in conjunction with standard algorithms and thread-safe practices to process and manipulate subsets of data in parallel. - Finally, the code builds a histogram from the filtered numbers. This is accomplished in the
analyzeNumbers
function, which takes a span of the filtered numbers and a reference to a histogram map. Each thread processes a portion of the filtered numbers, counting the occurrences of each number and updating its local histogram map. After processing, these local histograms are merged into a final histogram in the main thread. This step highlights how spans can facilitate parallel data processing, with each thread working on its own segment and efficiently combining results for the final output.
Here’s the final main that also performs the coordination of the threads:
int main() {
std::vector<int> numbers(DATA_SIZE);
std::span<int> firstHalf(numbers.data(), numbers.size()/2);
std::span<int> secondHalf(numbers.data() + numbers.size()/2, numbers.size()/2);
// Initialization phase
std::thread initThread1(initializeNumbers, firstHalf);
std::thread initThread2(initializeNumbers, secondHalf);
initThread1.join();
initThread2.join();
// Filtering phase
std::thread filterThread1(filterNumbers, firstHalf);
std::thread filterThread2(filterNumbers, secondHalf);
filterThread1.join();
filterThread2.join();
// Analysis phase
std::array<std::map<int, int>, 2> localHistograms;
std::span<int> filteredHalf(filteredNumbers.data(), filteredNumbers.size() / 2);
std::span<int> filteredSecondHalf(filteredNumbers.data() + filteredNumbers.size()/2, filteredNumbers.size()/2);
std::thread analyzeThread1(analyzeNumbers, filteredHalf, std::ref(localHistograms[0]));
std::thread analyzeThread2(analyzeNumbers, filteredSecondHalf, std::ref(localHistograms[1]));
analyzeThread1.join();
analyzeThread2.join();
std::map<int, int> finalHistogram;
for (auto&& hist : localHistograms)
for (const auto& [num, count] : hist)
finalHistogram[num] += count;
// Output the final histogram
for (const auto& [num, count] : finalHistogram)
std::cout << std::format("{:>3}: {}\n", num, std::string(count, '*'));
}
Run it at Compiler Explorer
Spans simplify data handling in concurrent programming. Instead of making smaller data copies, managing full container access, or using iterators—which can be inefficient, complex, or verbose—std::span
offers a lightweight, non-owning view of the data. This approach allows for clear and efficient segment passing of a container to different threads, enhancing both the readability and maintainability of the code."
Adding std::jthread
One of the simplest ways to add some C++20 is to use std::jthread
. This new type of thread object ensures that a thread is joined when the owner object goes out of the scope. This automatic resource management feature of std::jthread
streamlines code and reduces the risk of concurrency-related bugs. Beyond this, std::jthread
offers additional functionalities, such as accepting stop tokens for more controlled thread interruption, which we’ll delve into in a future discussion.
Our example already calls .join()
in well-defined places, so we have to recreate this behavior using jthreads
:
Writing
// Initialization phase
std::jthread initThread1(initializeNumbers, allSpan.first(DATA_SIZE/2));
std::jthread initThread2(initializeNumbers, allSpan.last(DATA_SIZE/2));
// Filtering phase
std::jthread filterThread1(filterNumbers, allSpan.first(DATA_SIZE / 2));
std::jthread filterThread2(filterNumbers, allSpan.last(DATA_SIZE / 2));
It is not the best idea… as the initThread1 won’t finish before filterThread1
might start! So we have to add braces:
// Initialization phase
{
std::jthread initThread1(initializeNumbers, allSpan.first(DATA_SIZE/2));
std::jthread initThread2(initializeNumbers, allSpan.last(DATA_SIZE/2));
}
// Filtering phase
{
std::jthread filterThread1(filterNumbers, allSpan.first(DATA_SIZE / 2));
std::jthread filterThread2(filterNumbers, allSpan.last(DATA_SIZE / 2));
}
Now the braces create separate scopes for each thread and when the scope ends all created threads will join and complete their jobs.
Experiment here @Compiler Explorer
Barriers
We can now try a different approach. Instead of creating six threads, each responsible for a separate phase of the data processing workflow, why not streamline the process using fewer threads?
This is where we introduce the concept of worker threads that process data in steps, using a powerful C++20 feature called std::barrier
. By adopting this strategy, we can effectively manage the workflow with just two threads, reducing the overhead associated with thread management and context switching.
std::barrier
is a synchronization primitive that enables multiple threads to wait for each other to reach a certain point in their execution before any of them can proceed. This feature is handy in scenarios where different phases of a task need to be executed in a coordinated manner across multiple threads. In our case, we use std::barrier
to synchronize the transition between different data processing phases.
Here’s how it works: After completing each phase (e.g., initialization, filtering, and histogram analysis), each worker thread calls phaseBarrier.arrive_and_wait()
. This function call indicates that the thread has arrived at the barrier and must wait until all other participating threads have also reached this point. Once all threads have arrived, the barrier is ‘broken’, and all threads are allowed to proceed to the next phase of processing. This ensures that each phase of processing is completed by all threads before any of them move on to the next phase.
This mechanism significantly simplifies the coordination between threads. By using std::barrier
, we avoid the complexity of manually managing multiple threads for each phase and eliminate the need for intricate signaling or locking mechanisms. It leads to cleaner, more maintainable, and efficient multi-threaded code, making it an ideal choice for our multi-phase data processing task."
Here’s the code for the processing function:
void processNumbers(std::span<int> numbers, std::barrier<>& phaseBarrier, std::map<int, int>& outHistogram) {
// Phase 1: Initialize Numbers
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(0, MAX_NUM);
for (auto& num : numbers) {
num = distrib(gen);
}
// Signal completion of phase 1 and wait
phaseBarrier.arrive_and_wait();
// Phase 2: Additional processing...
std::vector<int> localFiltered;
std::ranges::copy_if(numbers, std::back_inserter(localFiltered),
[](int v) {return v % 2 == 0; });
for (int num : localFiltered)
outHistogram[num]++;
}
And the main function:
int main() {
std::vector<int> numbers(DATA_SIZE);
std::barrier phaseBarrier(2);
std::span<int> firstHalf(numbers.data(), numbers.size()/2);
std::span<int> secondHalf(numbers.data() + numbers.size()/2, numbers.size()/2);
std::array<std::map<int, int>, 2> localHistograms;
{
std::jthread workerThread1(processNumbers, firstHalf, std::ref(phaseBarrier),
std::ref(localHistograms[0]));
std::jthread workerThread2(processNumbers, secondHalf, std::ref(phaseBarrier),
std::ref(localHistograms[1]));
}
std::map<int, int> finalHistogram;
for (auto&& hist : localHistograms)
for (const auto& [num, count] : hist)
finalHistogram[num] += count;
// Output the final histogram
for (const auto& [num, count] : finalHistogram)
std::cout << std::format("{:>3}: {}\n", num, std::string(count, '*'));
}
We use barrier once: it’s initialized with the count of the worker threads, and then after we initialize the numbers, we wait for other workers’ threads. Once it’s done, we can move to the next phase of filtering and generating local histograms.
Latches
While we’ve focused on std::barrier
for our multi-phase data processing, another concurrency feature introduced in C++20 is std::latch
. A latch is a synchronization primitive but functions differently from a barrier. It acts as a single-use barrier with a countdown mechanism. A std::latch
is initialized with a counter of type std::ptrdiff_t
, representing the number of ‘arrivals’ needed to release all waiting threads.
Threads can signal their arrival at a latch by calling count_down()
, which decrements the latch’s counter. When the counter reaches zero, all threads waiting on the latch (using the wait() method) are released and can continue execution. Unlike std::barrier,
which can be reused after all threads have arrived, a latch is a one-time-use object. Once its counter reaches zero, it can’t be reset or reused.
In our case std::latch
might be even more suitable and simpler than barrier.
void processNumbers(std::span<int> numbers, std::latch& phaseLatch, std::map<int, int>& outHistogram) {
// Phase 1: Initialize Numbers
std::random_device rd;
std::mt19937 gen(rd());
std::uniform_int_distribution<> distrib(0, MAX_NUM);
for (auto& num : numbers) {
num = distrib(gen);
}
// Signal completion of phase 1 and wait
phaseLatch.arrive_and_wait();
// Phase 2: Additional processing...
std::vector<int> localFiltered;
std::ranges::copy_if(numbers, std::back_inserter(localFiltered),
[](int v) {return v % 2 == 0; });
for (int num : localFiltered)
outHistogram[num]++;
}
int main() {
std::vector<int> numbers(DATA_SIZE);
std::latch phaseLatch(2);
std::span<int> firstHalf(numbers.data(), numbers.size()/2);
std::span<int> secondHalf(numbers.data() + numbers.size()/2, numbers.size()/2);
std::array<std::map<int, int>, 2> localHistograms;
{
std::jthread workerThread1(processNumbers, firstHalf, std::ref(phaseLatch),
std::ref(localHistograms[0]));
std::jthread workerThread2(processNumbers, secondHalf, std::ref(phaseLatch),
std::ref(localHistograms[1]));
}
std::map<int, int> finalHistogram;
for (auto&& hist : localHistograms)
for (const auto& [num, count] : hist)
finalHistogram[num] += count;
// Output the final histogram
for (const auto& [num, count] : finalHistogram)
std::cout << std::format("{:>3}: {}\n", num, std::string(count, '*'));
}
It’s just a start
Ok, it was fun!
We went through a series of steps and created a simple processing application: it initializes numbers, and later, we used threads to process that data. We explored jthreads
that automatically join, and then we throw some barriers and latches to coordinate execution phases between threads.
But that’s just a start. The examples used only two threads, but why now check std::thread::hardware_concurrency()
and create as many worker threads as your system has CPUs available? Then, we had only random numbers, but we could read data from files or networks. I’ll leave that to you to experiment.
What’s more, C++20 doesn’t just add jthread
, barrier
and latch
, here’s a list of more cool changes related to concurrency in that C++ version:
std::atomic_ref
: Allows atomic operations on non-atomic objects.std::atomic<std::shared_ptr<T>>
andstd::atomic<std::weak_ptr<T>>
: Specializations for atomic operations on shared and weak pointers.std::counting_semaphore
andstd::binary_semaphore
: New semaphore types for controlling access to shared resources.std::stop_token
,std::stop_source
, andstd::stop_callback
: Mechanisms for cooperative thread interruption.
See more @CppReference
Back to you
- Have you used C++20 multithreading/concurrency additions?
- What do you use to process data using worker threads?
- What else would you mprove in my code?
Join the discussion below in the comments.
I've prepared a valuable bonus for you!
Learn all major features of recent C++ Standards on my Reference Cards!
Check it out here: