remove cqueue, use class SafeQueue (#19774)
* class SafeQueue
* cleanup includes
* space
* add timeout
use try_poll
* add function empty() & size()
* class SafeQueue
* add timeout
use try_poll
* add function empty() & size()
* rebase master
* rebase master
* for loop
* fix bug
old-commit-hash: e6783f4d9f
commatwo_master
parent
7a086b3f3d
commit
da9b0e8baa
10 changed files with 105 additions and 153 deletions
@ -1,54 +0,0 @@ |
|||||||
#include <stdlib.h> |
|
||||||
#include <string.h> |
|
||||||
#include <assert.h> |
|
||||||
|
|
||||||
#include "cqueue.h" |
|
||||||
|
|
||||||
// TODO: replace by C++ queue and CV. See camerad
|
|
||||||
|
|
||||||
void queue_init(Queue *q) { |
|
||||||
memset(q, 0, sizeof(*q)); |
|
||||||
TAILQ_INIT(&q->q); |
|
||||||
pthread_mutex_init(&q->lock, NULL); |
|
||||||
pthread_cond_init(&q->cv, NULL); |
|
||||||
} |
|
||||||
|
|
||||||
void* queue_pop(Queue *q) { |
|
||||||
pthread_mutex_lock(&q->lock); |
|
||||||
while (TAILQ_EMPTY(&q->q)) { |
|
||||||
pthread_cond_wait(&q->cv, &q->lock); |
|
||||||
} |
|
||||||
QueueEntry *entry = TAILQ_FIRST(&q->q); |
|
||||||
TAILQ_REMOVE(&q->q, entry, entries); |
|
||||||
pthread_mutex_unlock(&q->lock); |
|
||||||
|
|
||||||
void* r = entry->data; |
|
||||||
free(entry); |
|
||||||
return r; |
|
||||||
} |
|
||||||
|
|
||||||
void* queue_try_pop(Queue *q) { |
|
||||||
pthread_mutex_lock(&q->lock); |
|
||||||
|
|
||||||
void* r = NULL; |
|
||||||
if (!TAILQ_EMPTY(&q->q)) { |
|
||||||
QueueEntry *entry = TAILQ_FIRST(&q->q); |
|
||||||
TAILQ_REMOVE(&q->q, entry, entries); |
|
||||||
r = entry->data; |
|
||||||
free(entry); |
|
||||||
} |
|
||||||
|
|
||||||
pthread_mutex_unlock(&q->lock); |
|
||||||
return r; |
|
||||||
} |
|
||||||
|
|
||||||
void queue_push(Queue *q, void *data) { |
|
||||||
QueueEntry *entry = calloc(1, sizeof(QueueEntry)); |
|
||||||
assert(entry); |
|
||||||
entry->data = data; |
|
||||||
|
|
||||||
pthread_mutex_lock(&q->lock); |
|
||||||
TAILQ_INSERT_TAIL(&q->q, entry, entries); |
|
||||||
pthread_cond_signal(&q->cv); |
|
||||||
pthread_mutex_unlock(&q->lock); |
|
||||||
} |
|
@ -1,33 +0,0 @@ |
|||||||
#ifndef COMMON_CQUEUE_H |
|
||||||
#define COMMON_CQUEUE_H |
|
||||||
|
|
||||||
#include <sys/queue.h> |
|
||||||
#include <pthread.h> |
|
||||||
|
|
||||||
#ifdef __cplusplus |
|
||||||
extern "C" { |
|
||||||
#endif |
|
||||||
|
|
||||||
// a blocking queue
|
|
||||||
|
|
||||||
typedef struct QueueEntry { |
|
||||||
TAILQ_ENTRY(QueueEntry) entries; |
|
||||||
void *data; |
|
||||||
} QueueEntry; |
|
||||||
|
|
||||||
typedef struct Queue { |
|
||||||
TAILQ_HEAD(queue, QueueEntry) q; |
|
||||||
pthread_mutex_t lock; |
|
||||||
pthread_cond_t cv; |
|
||||||
} Queue; |
|
||||||
|
|
||||||
void queue_init(Queue *q); |
|
||||||
void* queue_pop(Queue *q); |
|
||||||
void* queue_try_pop(Queue *q); |
|
||||||
void queue_push(Queue *q, void *data); |
|
||||||
|
|
||||||
#ifdef __cplusplus |
|
||||||
} // extern "C"
|
|
||||||
#endif |
|
||||||
|
|
||||||
#endif |
|
@ -0,0 +1,52 @@ |
|||||||
|
#pragma once |
||||||
|
|
||||||
|
#include <condition_variable> |
||||||
|
#include <mutex> |
||||||
|
#include <queue> |
||||||
|
|
||||||
|
template <class T> |
||||||
|
class SafeQueue { |
||||||
|
public: |
||||||
|
SafeQueue() = default; |
||||||
|
|
||||||
|
void push(const T& v) { |
||||||
|
{ |
||||||
|
std::unique_lock lk(m); |
||||||
|
q.push(v); |
||||||
|
} |
||||||
|
cv.notify_one(); |
||||||
|
} |
||||||
|
|
||||||
|
T pop() { |
||||||
|
std::unique_lock lk(m); |
||||||
|
cv.wait(lk, [this] { return !q.empty(); }); |
||||||
|
T v = q.front(); |
||||||
|
q.pop(); |
||||||
|
return v; |
||||||
|
} |
||||||
|
|
||||||
|
bool try_pop(T& v, int timeout_ms = 0) { |
||||||
|
std::unique_lock lk(m); |
||||||
|
if (!cv.wait_for(lk, std::chrono::milliseconds(timeout_ms), [this] { return !q.empty(); })) { |
||||||
|
return false; |
||||||
|
} |
||||||
|
v = q.front(); |
||||||
|
q.pop(); |
||||||
|
return true; |
||||||
|
} |
||||||
|
|
||||||
|
bool empty() const { |
||||||
|
std::scoped_lock lk(m); |
||||||
|
return q.empty(); |
||||||
|
} |
||||||
|
|
||||||
|
size_t size() const { |
||||||
|
std::scoped_lock lk(m); |
||||||
|
return q.size(); |
||||||
|
} |
||||||
|
|
||||||
|
private: |
||||||
|
mutable std::mutex m; |
||||||
|
std::condition_variable cv; |
||||||
|
std::queue<T> q; |
||||||
|
}; |
Loading…
Reference in new issue