Data race during nested thread creation

0

While trying to create nested threads, helgrind reports several different types of data races.

==4429== Possible data race during write of size 8 at 0x5673830 by thread #13
==4429== Locks held: none
==4429==    at 0x4C379EF: memset (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x5060C85: get_cached_stack (allocatestack.c:250)
==4429==    by 0x5060C85: allocate_stack (allocatestack.c:501)
==4429==    by 0x5060C85: pthread_create@@GLIBC_2.2.5 (pthread_create.c:537)
==4429==    by 0x4C32BF7: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x4022D7: read_group (c_esp.c:318)
==4429==    by 0x4C32DF6: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x50606A9: start_thread (pthread_create.c:333)
==4429==  Address 0x5673830 is 16 bytes inside a block of size 560 alloc'd
==4429==    at 0x4C2EFB5: calloc (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x40134C4: allocate_dtv (dl-tls.c:322)
==4429==    by 0x40134C4: _dl_allocate_tls (dl-tls.c:544)
==4429==    by 0x50610D2: allocate_stack (allocatestack.c:588)
==4429==    by 0x50610D2: pthread_create@@GLIBC_2.2.5 (pthread_create.c:537)
==4429==    by 0x4C32BF7: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x4022D7: read_group (c_esp.c:318)
==4429==    by 0x4C32DF6: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x50606A9: start_thread (pthread_create.c:333)
==4429==  Block was alloc'd by thread #3


==4429== Possible data race during write of size 1 at 0x724368F by thread #13
==4429== Locks held: none
==4429==    at 0x4C3856C: mempcpy (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x40132F6: _dl_allocate_tls_init (dl-tls.c:520)
==4429==    by 0x5060C8D: get_cached_stack (allocatestack.c:253)
==4429==    by 0x5060C8D: allocate_stack (allocatestack.c:501)
==4429==    by 0x5060C8D: pthread_create@@GLIBC_2.2.5 (pthread_create.c:537)
==4429==    by 0x4C32BF7: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x4022D7: read_group (c_esp.c:318)
==4429==    by 0x4C32DF6: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x50606A9: start_thread (pthread_create.c:333)
==4429== 
==4429== This conflicts with a previous write of size 1 by thread #4
==4429== Locks held: none
==4429==    at 0x5060612: start_thread (pthread_create.c:265)
==4429==  Address 0x724368f is in a rw- anonymous segment


==4429== Possible data race during read of size 8 at 0x7243728 by thread #14
==4429== Locks held: none
==4429==    at 0x40178C: read_record (c_esp.c:171)
==4429==    by 0x4C32DF6: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x50606A9: start_thread (pthread_create.c:333)
==4429== 
==4429== This conflicts with a previous write of size 8 by thread #13
==4429== Locks held: none
==4429==    at 0x5060DB7: pthread_create@@GLIBC_2.2.5 (pthread_create.c:589)
==4429==    by 0x4C32BF7: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x4022D7: read_group (c_esp.c:318)
==4429==    by 0x4C32DF6: ??? (in /usr/lib/valgrind/vgpreload_helgrind-amd64-linux.so)
==4429==    by 0x50606A9: start_thread (pthread_create.c:333)
==4429==  Address 0x7243728 is in a rw- anonymous segment

The mentioned lines are:

  • 318: pthread_create(pth_array + k, &attrs, read_record, args_array + k);

  • 171: void *read_record(void *_args){

So it seems that there is a data race when threads are being created? Is it possible that the allocated stack for threads in "different branches" is being overlapped? Or did I screw up somewhere?

I already tried reducing stack size for each thread with no success.

As requested, a minimal reproducible example:

#include <pthread.h>

void *func_b(void *args){
    return args;
}

void *func_a(void *args){
    pthread_t pth[5];
    int i;
    for(i = 0; i < 5; i++){
        pthread_create(&pth[i], NULL, func_b, NULL);
    }

    for(i = 0; i < 5; i++){
        pthread_join(pth[i], NULL);
    }
    return args;
}

int main(void){
    pthread_t pth[100];
    int i;
    for(i = 0; i < 100; i++){
        pthread_create(&pth[i], NULL, func_a, NULL);
    }

    for(i = 0; i < 100; i++){
        pthread_join(pth[i], NULL);
    }
}

The full output from helgrind is here and all the relevant code is next. This snippet goes from line 171 to line 477.

void *read_record(void *_args){
    struct _thread_args *args = (struct _thread_args *)_args;
    uint32_t data_size = 0;

    args->item->record->type = calloc(5, sizeof *args->item->record->type);
    memcpy(args->item->record->type, *args->map, 4);
    *args->map += 4;
    memcpy(&data_size, *args->map, 4);
    *args->map += 4;
    memcpy(&args->item->record->flags, *args->map, 4);
    *args->map += 4;
    memcpy(&args->item->record->id, *args->map, 4);
    *args->map += 4;
    memcpy(&args->item->record->revision, *args->map, 4);
    *args->map += 4;
    memcpy(&args->item->record->version, *args->map, 2);
    *args->map += 2;
    memcpy(&args->item->record->unknown, *args->map, 2);
    *args->map += 2;

    args->item->record->misc_data = NULL;
    args->item->record->children = NULL;
    args->item->record->last = NULL;
    // args->item->record->next = NULL;
    // args->item->record->previous = NULL;
    args->item->record->_proxy = NULL;
    args->item->record->compression_level = 0x7f;  // some invalid number, doesn't matter

    if(args->parse_records){
        args->item->record->is_parsed = TRUE;
        if(args->item->record->flags & 0x00040000){
            uLongf uncompressed_size = 0;
            memcpy(&uncompressed_size, *args->map, 4);
            *args->map += 4;

            if(*(*args->map + 1) & (1 << 6)){
                if(*(*args->map + 1) & (1 << 7)){
                    args->item->record->compression_level = Z_BEST_COMPRESSION;  // both hight bits are set  11

                } else{
                    args->item->record->compression_level = Z_BEST_SPEED;  // 01
                }
            } else{
                if(*(*args->map + 1) & (1 << 7)){
                    args->item->record->compression_level = Z_DEFAULT_COMPRESSION;  // 10

                } else{
                    args->item->record->compression_level = Z_NO_COMPRESSION;  // 00
                }
            }

            uint8_t *uncompressed_data = malloc(uncompressed_size * sizeof *uncompressed_data);
            uint8_t *start = uncompressed_data;
            uncompress(uncompressed_data, &uncompressed_size, *args->map, data_size - 4);
            parse_record_data((uint8_t **)&uncompressed_data, uncompressed_size, &args->item->record->children, &args->item->record->last);
            *args->map += data_size - 4;
            free(start);
        } else{
            parse_record_data(args->map, data_size, &args->item->record->children, &args->item->record->last);
        }
    } else{
        args->item->record->is_parsed = FALSE;
        args->item->record->misc_data = malloc(sizeof *args->item->record->misc_data);
        args->item->record->misc_data->data = malloc(data_size * sizeof *args->item->record->misc_data->data);
        args->item->record->misc_data->data_size = data_size;
        memcpy(args->item->record->misc_data->data, *args->map, data_size);
        *args->map += data_size;
    }

    return NULL;
}

void *read_group(void *_args){
    struct _thread_args *args = (struct _thread_args *)_args;
    uint32_t data_size = 0;

    args->item->group->type = "GRUP";
    *args->map += 4;
    memcpy(&data_size, *args->map, 4);
    *args->map += 4;
    memset(args->item->group->label, 0, 5);
    memcpy(args->item->group->label, *args->map, 4);
    *args->map += 4;
    memcpy(&args->item->group->group_type, *args->map, 4);
    *args->map += 4;
    memcpy(&args->item->group->stamp, *args->map, 2);
    *args->map += 2;
    memcpy(&args->item->group->unknown1, *args->map, 2);
    *args->map += 2;
    memcpy(&args->item->group->version, *args->map, 2);
    *args->map += 2;
    memcpy(&args->item->group->unknown2, *args->map, 2);
    *args->map += 2;

    args->item->group->children = NULL;
    args->item->group->last = NULL;
    args->item->group->_proxy = NULL;

    uint8_t *data_start = *args->map;
    int item_count = 0;
    while(*args->map < (data_start + data_size - 24)){
        item_count++;
        char next_type[5] = {0};
        memcpy(next_type, *args->map, 4);
        uint32_t next_data_size = 0;
        memcpy(&next_data_size, *args->map + 4, 4);
        *args->map += next_data_size;
        if(strcmp(next_type, "GRUP") != 0){
            *args->map += 24;
        }
    }
    *args->map = data_start;

    pthread_attr_t  attrs;
    pthread_attr_init(&attrs);
    pthread_attr_setstacksize(&attrs, 1000);

    struct _group_item *current = NULL;
    bool current_is_record = FALSE;
    pthread_t *pth_array = malloc(item_count * sizeof *pth_array);
    uint8_t **map_array = malloc(item_count * sizeof *map_array);
    struct _thread_args *args_array = malloc(item_count * sizeof *args_array);
    int k;
    for(k = 0; k < item_count; k++){
        char next_type[5] = {0};
        memcpy(next_type, *args->map, 4);
        uint32_t new_data_size = 0;
        memcpy(&new_data_size, *args->map + 4, 4);
        *(map_array + k) = *args->map;
        *args->map += new_data_size;

        (*(args_array + k)).map = map_array + k;
        (*(args_array + k)).parse_records = args->parse_records;
        (*(args_array + k)).item = malloc(sizeof *(*(args_array + k)).item);
        if(strcmp(next_type, "GRUP") == 0){
            (*(args_array + k)).is_record = FALSE;
            (*(args_array + k)).item->group = malloc(sizeof *(*(args_array + k)).item->group);
            pthread_create(pth_array + k, &attrs, read_group, args_array + k);
            //read_group(args_array + k);
        } else{
            *args->map += 24;
            (*(args_array + k)).is_record = TRUE;
            (*(args_array + k)).item->record = malloc(sizeof *(*(args_array + k)).item->record);
            pthread_create(pth_array + k, &attrs, read_record, args_array + k);
            //read_record(args_array + k);
        }

        if(args->item->group->children == NULL){
            args->item->group->children = current = (*(args_array + k)).item;
            args->item->group->children_is_record = current_is_record = (*(args_array + k)).is_record;
            if((*(args_array + k)).is_record){
                current->record->previous = NULL;
            } else{
                current->group->previous = NULL;
            }
            continue;
        }

        if((*(args_array + k)).is_record){
            if(current_is_record){
                current->record->next = (*(args_array + k)).item;
                current->record->next_is_record = (*(args_array + k)).is_record;
                (*(args_array + k)).item->record->previous = current;
                (*(args_array + k)).item->record->previous_is_record = current_is_record;
            } else{
                current->group->next = (*(args_array + k)).item;
                current->group->next_is_record = (*(args_array + k)).is_record;
                (*(args_array + k)).item->record->previous = current;
                (*(args_array + k)).item->record->previous_is_record = current_is_record;
            }
        } else{
            if(current_is_record){
                current->record->next = (*(args_array + k)).item;
                current->record->next_is_record = (*(args_array + k)).is_record;
                (*(args_array + k)).item->group->previous = current;
                (*(args_array + k)).item->group->previous_is_record = current_is_record;
            } else{
                current->group->next = (*(args_array + k)).item;
                current->group->next_is_record = (*(args_array + k)).is_record;
                (*(args_array + k)).item->group->previous = current;
                (*(args_array + k)).item->group->previous_is_record = current_is_record;
            }
        }

        current = (*(args_array + k)).item;
        current_is_record = (*(args_array + k)).is_record;
    }

    args->item->group->last = current;
    args->item->group->last_is_record = current_is_record;
    if(current != NULL){
        if(current_is_record){
            current->record->next = NULL;
        } else{
            current->group->next = NULL;
        }
    }

    int i;
    for(i = 0; i < item_count; i++){
       pthread_join(*(pth_array + i), NULL);
    }

    free(pth_array);
    free(map_array);
    free(args_array);

    assert(*args->map == (data_start + data_size - 24));
    return NULL;
}

Plugin *plugin_read(const char *filename, const bool parse_records){
    Plugin *plugin = malloc(sizeof *plugin);
    plugin->children = NULL;
    plugin->last = NULL;
    plugin->_proxy = NULL;

    FILE *fileobject = fopen(filename, "rb");
    if(fileobject == NULL){
        printf("No file found.\n");
        return NULL;
    }
    fseek(fileobject, 0, SEEK_END);
    long int filesize = ftell(fileobject);
    rewind(fileobject);
    uint8_t *map = malloc(filesize * sizeof *map);
    fread(map, filesize, 1, fileobject);
    fclose(fileobject);

    uint8_t *start = map;

    pthread_attr_t  attrs;
    pthread_attr_init(&attrs);
    pthread_attr_setstacksize(&attrs, 1000);

    pthread_t header_pth;
    plugin->header = malloc(sizeof *plugin->header);
    struct _group_item header_item;
    header_item.record = plugin->header;
    uint8_t *header_map = map;
    struct _thread_args header_args;
    header_args.item = &header_item;
    header_args.map = &header_map;
    header_args.parse_records = parse_records;
    pthread_create(&header_pth, &attrs, read_record, &header_args);
    uint32_t header_data_size = 0;
    memcpy(&header_data_size, map + 4, 4);
    map += header_data_size + 24;

    uint8_t *after_header = map;
    int group_count = 0;
    while(map < (start + filesize)){
        group_count++;
        uint32_t next_data_size = 0;
        memcpy(&next_data_size, map + 4, 4);
        map += next_data_size;
    }
    map = after_header;

    pthread_t *pth = malloc(group_count * sizeof *pth);
    uint8_t **map_array = malloc(group_count * sizeof *map_array);
    struct _thread_args *args = malloc(group_count * sizeof *args);
    struct _group_item *current = NULL;
    int k;
    for(k = 0; k < group_count; k++){
        uint32_t new_data_size = 0;
        memcpy(&new_data_size, map + 4, 4);
        *(map_array + k) = map;
        map += new_data_size;

        (*(args + k)).item = malloc(sizeof *(*(args + k)).item);
        (*(args + k)).item->group = malloc(sizeof *(*(args + k)).item->group);
        (*(args + k)).map = map_array + k;
        (*(args + k)).parse_records = parse_records;
        pthread_create(pth + k, &attrs, read_group, args + k);

        if(plugin->children == NULL){
            plugin->children = (*(args + k)).item;
            current = (*(args + k)).item;
            current->group->previous = NULL;
            continue;
        }

        current->group->next = (*(args + k)).item;
        current->group->next_is_record = FALSE;
        (*(args + k)).item->group->previous = current;
        (*(args + k)).item->group->previous_is_record = FALSE;
        current = (*(args + k)).item;
    }
    plugin->last = current;
    plugin->last->group->next = NULL;

    pthread_join(header_pth, NULL);
    int i;
    for(i = 0; i < group_count; i++){
        pthread_join(*(pth + i), NULL);
    }

    assert(map == (start + filesize));

    free(start);
    free(pth);
    free(args);
    free(map_array);

    return plugin;
}

Thanks!

c
multithreading
pthreads
valgrind
asked on Stack Overflow Oct 18, 2016 by Daniel Nunes • edited Oct 19, 2016 by Daniel Nunes

1 Answer

2

Try --sim-hints=no-nptl-pthread-stackcache.

See http://www.valgrind.org/docs/manual/manual-core.html#manual-core.rareopts for more information.

answered on Stack Overflow Oct 21, 2016 by phd

User contributions licensed under CC BY-SA 3.0