Stack overflow when thread number is large enough (i.e, 50)

0

My code runs ok when thread number is 15 or less, but when I run it with larger thread number (but still a very tiny number) say 50. I ran into following error when main function exits, seems like error occurs in the cleaning up process. I couldn't figure out where the bug is. My development tool is Visual Studio 2017. Here's my code:

threadsafe_queue class:

#pragma once
#include <memory>
#include <mutex>

template<typename T>
class threadsafe_queue
{
private:
    struct Node {
        std::shared_ptr<T> data;
        std::unique_ptr<Node> next;
    };
    Node* tail;
    std::unique_ptr<Node> head;
    std::mutex head_mutex;
    std::mutex tail_mutex;
    std::condition_variable data_cond;

    Node* get_tail();
    std::unique_ptr<Node> pop_head();
    std::unique_lock<std::mutex> wait_for_data();
public:
    threadsafe_queue();
    ~threadsafe_queue();
    threadsafe_queue(const threadsafe_queue& t) = delete;
    threadsafe_queue operator = (const threadsafe_queue& t) = delete;

    void push(T);
    bool try_pop(T&);
    std::shared_ptr<T> try_pop();
    void wait_and_pop(T&);
    std::shared_ptr<T> wait_and_pop();
    bool empty();
};

using namespace std;

template<typename T>
threadsafe_queue<T>::threadsafe_queue() {
    head = std::unique_ptr<Node>(new Node);
    tail = head.get();
}

template<typename T>
threadsafe_queue<T>::~threadsafe_queue()
{
}

template<typename T>
typename threadsafe_queue<T>::Node* threadsafe_queue<T>::get_tail() {
    lock_guard<mutex> lock(tail_mutex);
    return tail;
}

template<typename T>
unique_ptr<typename threadsafe_queue<T>::Node> threadsafe_queue<T>::pop_head()
{
    auto old_head = move(head);
    head = move(old_head->next);
    return old_head;
}

template<typename T>
unique_lock<mutex> threadsafe_queue<T>::wait_for_data()
{
    unique_lock<mutex> headLock(head_mutex);
    data_cond.wait(headLock, [&] {return head.get() != get_tail(); });
    return std::move(headLock);
}

template<typename T>
void threadsafe_queue<T>::wait_and_pop(T & value)
{
    unique_lock<mutex> lock(wait_for_data());
    value = move(pop_head()->data);
}

template<typename T>
shared_ptr<T> threadsafe_queue<T>::wait_and_pop()
{
    unique_lock<mutex> lock(wait_for_data());
    return pop_head()->data;
}

template<typename T>
void threadsafe_queue<T>::push(T newValue)
{
    shared_ptr<T> data(make_shared<T>(std::move(newValue)));
    unique_ptr<Node> new_tail(new Node);
    {
        lock_guard<mutex> lock(tail_mutex);
        tail->data = data;
        Node* new_tail_ptr = new_tail.get();
        tail->next = move(new_tail);
        tail = new_tail_ptr;
    }
    data_cond.notify_one();
}

template<typename T>
bool threadsafe_queue<T>::try_pop(T & value)
{
    lock_guard<mutex> headLock(head_mutex);
    if (head == get_tail())
        return false;
    value = move(pop_head()->data);
    return true;
}

template<typename T>
shared_ptr<T> threadsafe_queue<T>::try_pop()
{
    lock_guard<mutex> headLock(head_mutex);
    if (head == get_tail())
        return shared_ptr<T>();
    return pop_head()->data;
}

template<typename T>
bool threadsafe_queue<T>::empty()
{
    lock_guard<mutex> lock(head_mutex);
    return head.get() == get_tail();
}

main function:

#pragma once
#include "threadsafe_queue.h"
#include <assert.h>
#include <memory>
#include <atomic>
#include <vector>
#include <thread>

using namespace std;
void worker(threadsafe_queue<int>& queue, std::atomic<int>& count, int const & pushcount, int const & popcount) {
    for (unsigned i = 0; i < pushcount; i++) {
        queue.push(i);
        count++;
    }

    for (unsigned i = 0; i < popcount; i++) {
        queue.wait_and_pop();
        count--;
    }
}

int main() {
    threadsafe_queue<int> queue;
    std::atomic<int> item_count = 0;
    std::vector<thread*> threads;
    unsigned const THREAD_COUNT=50, PUSH_COUT=100, POP_COUNT=50;

    for (unsigned i = 0; i < THREAD_COUNT; i++) {
        threads.push_back(new thread(worker, ref(queue), ref(item_count), ref(PUSH_COUT), ref(POP_COUNT)));
    }

    for (auto thread : threads) {
        thread->join();
    }

    for (auto thread : threads) {
        delete thread;
    }
    assert(item_count == THREAD_COUNT * (PUSH_COUT-POP_COUNT));

    return 0;
}

error message:

Unhandled exception at 0x00862899 in Sample.exe: 0xC00000FD: Stack overflow 
(parameters: 0x00000001, 0x00E02FDC). occurred

The location of the error is in memory library code:

    const pointer& _Myptr() const _NOEXCEPT
    {   // return const reference to pointer
    return (_Mypair._Get_second());
    }
c++11
concurrency
shared-ptr
unique-ptr
asked on Stack Overflow Mar 31, 2018 by Alan Zeng

1 Answer

0

The answer is based on @IgorTandetnik 's comment above. Basically I needed to implement ~threadsafe_queue to destroy the nodes iteratively. The nodes are linked, so they will be destructed in recursive manner, which causes stack overflow when number of nodes remaining in the queue is relatively large. Below is the destructor code.

threadsafe_queue<T>::~threadsafe_queue(){
    Node* current = head.release();
    while (current != tail) {
        Node* temp = (current->next).release();
        delete current;
        current = temp;
    }
    delete tail;
}
answered on Stack Overflow Apr 1, 2018 by Alan Zeng

User contributions licensed under CC BY-SA 3.0