Process-shared condition variable : how to recover after one process dies?

2

I'm working on a simple FIFO queue to synchronize multiple instances of a server process.

This is very similar to Linux synchronization with FIFO waiting queue, except dealing with multiple processes instead of threads. I adapted caf's ticket lock to use process-shared mutex and condition variable from a shared memory segment. It also handles timeouts in case one process dies while processing a request:

#include <assert.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <sys/mman.h>
#include <pthread.h>
#include <signal.h>
#include <errno.h>

static inline void fail(char *str)
{
    perror(str);
    exit(1);
}

/***************************************************************************************************/

/* Simple ticket lock queue with pthreads
 * https://stackoverflow.com/questions/3050083/linux-synchronization-with-fifo-waiting-queue
 */

typedef struct ticket_lock {
    pthread_mutex_t mutex;
    pthread_cond_t  cond;
    int queue_head, queue_tail;
} ticket_lock_t;

static void
ticket_init(ticket_lock_t *t)
{
    pthread_mutexattr_t mattr;
    pthread_mutexattr_init(&mattr);
    pthread_mutexattr_setpshared(&mattr, PTHREAD_PROCESS_SHARED);
    pthread_mutexattr_setrobust(&mattr, PTHREAD_MUTEX_ROBUST);
    pthread_mutex_init(&t->mutex, &mattr);
    pthread_mutexattr_destroy(&mattr);

    pthread_condattr_t cattr;
    pthread_condattr_init(&cattr);
    pthread_condattr_setpshared(&cattr, PTHREAD_PROCESS_SHARED);
    pthread_cond_init(&t->cond, &cattr);
    pthread_condattr_destroy(&cattr);

    t->queue_head = t->queue_tail = 0;
}

static void
ticket_broadcast(ticket_lock_t *ticket)
{
    pthread_cond_broadcast(&ticket->cond);
}

static int
ticket_lock(ticket_lock_t *ticket)
{
    pthread_mutex_lock(&ticket->mutex);
    int queue_me = ticket->queue_tail++;

    while (queue_me > ticket->queue_head) {
        time_t sec = time(NULL) + 5;  /* 5s timeout */
        struct timespec ts = { .tv_sec = sec, .tv_nsec = 0 };

        fprintf(stderr, "%i: waiting, current: %i  me: %i\n", getpid(), ticket->queue_head, queue_me);

        if (pthread_cond_timedwait(&ticket->cond, &ticket->mutex, &ts) == 0)
            continue;

        if (errno != ETIMEDOUT) fail("pthread_cond_timedwait");

        /* Timeout, kick current user... */
        fprintf(stderr, "kicking stale ticket %i\n", ticket->queue_head);
        ticket->queue_head++;
        ticket_broadcast(ticket);
    }

    pthread_mutex_unlock(&ticket->mutex);
    return queue_me;
}

static void
ticket_unlock(ticket_lock_t *ticket, int me)
{
    pthread_mutex_lock(&ticket->mutex);
    if (ticket->queue_head == me) {  /* Normal case: we haven't timed out. */
        ticket->queue_head++;
        ticket_broadcast(ticket);
    }
    pthread_mutex_unlock(&ticket->mutex);
}


/***************************************************************************************************/
/* Shared memory */

#define SHM_NAME    "fifo_sched"
#define SHM_MAGIC   0xdeadbeef

struct sched_shm {
    int  size;
    int  magic;
    int  ready;

    /* sched stuff */
    ticket_lock_t queue;
};

static unsigned int shm_size = 256;
static struct sched_shm *shm = 0;

/* Create new shared memory segment */
static void
create_shm()
{
       int fd = shm_open(SHM_NAME, O_RDWR | O_CREAT | O_TRUNC, 0644);
       assert(fd != -1);
       int r = ftruncate(fd, shm_size);  assert(r == 0);
       void *pt = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
       assert(pt != MAP_FAILED);
       fprintf(stderr, "Created shared memory.\n");

       shm = pt;
       memset(shm, 0, sizeof(*shm));
       shm->size = shm_size;
       shm->magic = SHM_MAGIC;
       shm->ready = 0;

       ticket_init(&shm->queue);

       shm->ready = 1;
}

/* Attach existing shared memory segment */
static int
attach_shm()
{
    int fd = shm_open(SHM_NAME, O_RDWR, 0);
    if (fd == -1)  return 0;  /* Doesn't exist yet... */

    shm = mmap(0, shm_size, PROT_READ | PROT_WRITE, MAP_SHARED, fd, 0);
    if (shm == MAP_FAILED)  fail("mmap");
    fprintf(stderr, "Mapped shared memory.\n");

    assert(shm->magic == SHM_MAGIC);
    assert(shm->ready);
    return 1;
}

static void
shm_init()
{  
    fprintf(stderr, "shm_init()\n");
    assert(shm_size >= sizeof(struct sched_shm));
    if (!attach_shm())
        create_shm();
}


/***************************************************************************************************/

int main()
{
    shm_init();

    while (1) {
        int ticket = ticket_lock(&shm->queue);
        printf("%i: start %i\n", getpid(), ticket);
        printf("%i: done  %i\n", getpid(), ticket);
        ticket_unlock(&shm->queue, ticket);
    }
    return 0;
}

This works well standalone and while adding extra processes:

$ gcc -g -Wall -std=gnu99 -o foo foo.c -lpthread -lrt
$ ./foo
$ ./foo     # (in other term)
...
26370: waiting, current: 134803  me: 134804
26370: start 134804
26370: done  134804
26370: waiting, current: 134805  me: 134806
26370: start 134806
26370: done  134806
26370: waiting, current: 134807  me: 134808

However killing the 2nd instance breaks pthread_cond_timedwait() in the 1st:

pthread_cond_timedwait: No such file or directory

Which makes sense in a way, the condition variable was tracking this process and it's not there anymore.

Surely there must be a way to recover from this ?

c
linux
pthreads
condition-variable
asked on Stack Overflow Dec 28, 2017 by lemonsqueeze

2 Answers

1

[too long for a comment]

 pthread_cond_timedwait: No such file or directory

Hu! :-)

The pthread_*() family of functions does not set errno to any error code but returns it.

So to get any usable results change this

    if (pthread_cond_timedwait(&ticket->cond, &ticket->mutex, &ts) == 0)
        continue;

    if (errno != ETIMEDOUT) fail("pthread_cond_timedwait");

to be

    if ((errno = pthread_cond_timedwait(&ticket->cond, &ticket->mutex, &ts)) == 0)
        continue;

    if (errno != ETIMEDOUT) fail("pthread_cond_timedwait");
answered on Stack Overflow Dec 28, 2017 by alk
1

Ok, quoting posix pthread_mutex_lock() reference:

If mutex is a robust mutex and the process containing the owning thread terminated while holding the mutex lock, a call to pthread_mutex_lock() shall return the error value [EOWNERDEAD]. [...] In these cases, the mutex is locked by the thread but the state it protects is marked as inconsistent. The application should ensure that the state is made consistent for reuse and when that is complete call pthread_mutex_consistent(). If the application is unable to recover the state, it should unlock the mutex without a prior call to pthread_mutex_consistent(), after which the mutex is marked permanently unusable.

So in addition to alk's comment to robustly handle processes dying with the mutex locked we need to watch for EOWNERDEAD when calling pthread_mutex_lock() and pthread_cond_timedwait(), and call pthread_mutex_consistent() on it.

Something like:

if ((errno = pthread_cond_timedwait(&ticket->cond, &ticket->mutex, &ts)) == 0)
    continue;
if (errno == EOWNERDEAD)  /* Recover mutex owned by dead process */
    pthread_mutex_consistent(&ticket->mutex);
else if (errno != ETIMEDOUT)
    fail("pthread_cond_timedwait");
answered on Stack Overflow Dec 28, 2017 by lemonsqueeze • edited Dec 28, 2017 by lemonsqueeze

User contributions licensed under CC BY-SA 3.0