本文共 10403 字,大约阅读时间需要 34 分钟。
背景:笔者之前一直从事嵌入式音视频相关的开发工作,对于音视频的数据的处理,生产者消费者队列必不可少,而如何实现一个高效稳定的生产者消费者队列则十分重要,不过按照笔者从业的经验,所看到的现象,不容乐观,很多知名大厂在这种基础组件的开发能力上十分堪忧。
视频数据的处理为什么需要生产者消费者队列?其实上面提到的音视频数据处理的特点就是答案。
不注重效率性能:
pthread_mutex_lock();state = check_some_state();if (state == xxx) { do_some_process();} else { usleep(x);}pthread_mutex_unlock();
以上代码loop一个状态,如果状态成立做有效的处理,不成立则睡眠一定时间,之后再次调用该段代码进行下一次的状态检测,而这个睡眠时间是一个随机经验值,很有可能下次仍然是无效的检测,接着睡眠再loop,多余的loop是一种资源的浪费。
数据的传递采用copy方式:
数据的传递,采用copy的方式,一帧数据在一个完整的处理流程中经过n次copy(笔者见过一个系统中一帧数据copy了8次之多)
生产者消费者队列和业务代码混杂在一起,没有分离:
对于开发者,都希望用最简单的接口完成某个功能,而不关心内部实现,在这里就是,只需要生产者生产数据,消费者消费数据,而内部的处理(同步,数据的处理等)完全不关心,这样开发者就不需要去弄很多锁,降低了开发难度,也可以使代码组件化,模块化。
有经验的开发者应该感觉以上都是基础点,不会有人犯这样的错误,不过笔者以自己的经历肯定的说,以上两种问题在某世界级大厂的视频设备上随处可见。
让我悲哀的是,当我指出这些问题时,某些开发者完全无动于衷。而我更无力的是,现在的cpu,memory性能实在是高,在某些不太高端的嵌入式芯片上,优化过的数据并没有十分明显,在一个实际项目上,经过优化后cpu大概降低1%(原总系统cpu占用7%),有经验的开发者又会说原7%的cpu占用率说明这个芯片做这个系统浪费了,不过也没办法其实已经用了比较低端的芯片了。。。
以上的吐槽主要是想说明:由于cpu,memory性能的提升,让很多开发者感觉软件的优化意义不大了,而我是一个理想主义者,对于某些设计ugly的代码真的是零容忍啊。
以上说了这么多,那如何操作呢?
#include#include #include #include #include #include #include #include #include #include #include #include #include #include "sfifo.h"//#define CONFIG_COND_FREE 1#define CONFIG_COND_ACTIVE 1#define MAX_SFIFO_NUM 32struct sfifo_des_s sfifo_des[MAX_SFIFO_NUM];struct sfifo_des_s *my_sfifo_des;struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p){ static long empty_count = 0; struct sfifo_s *sfifo = NULL; pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex));#ifdef CONFIG_COND_FREE while (sfifo_des_p->free_list.head == NULL) { pthread_cond_wait(&(sfifo_des_p->free_list.cond), &(sfifo_des_p->free_list.lock_mutex)); }#else if (sfifo_des_p->free_list.head == NULL) { if (empty_count++ % 120 == 0) { printf("free list empty\n"); } goto EXIT; }#endif sfifo = sfifo_des_p->free_list.head; sfifo_des_p->free_list.head = sfifo->next;EXIT: pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex)); return sfifo;}int sfifo_put_free_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p){ int send_cond = 0; pthread_mutex_lock(&(sfifo_des_p->free_list.lock_mutex)); if (sfifo_des_p->free_list.head == NULL) { sfifo_des_p->free_list.head = sfifo; sfifo_des_p->free_list.tail = sfifo; sfifo_des_p->free_list.tail->next = NULL; send_cond = 1; } else { sfifo_des_p->free_list.tail->next = sfifo; sfifo_des_p->free_list.tail = sfifo; sfifo_des_p->free_list.tail->next = NULL; } pthread_mutex_unlock(&(sfifo_des_p->free_list.lock_mutex));#ifdef CONFIG_COND_FREE if (send_cond) { pthread_cond_signal(&(sfifo_des_p->free_list.cond)); }#endif return 0;}struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p){ struct sfifo_s *sfifo = NULL; pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex));#ifdef CONFIG_COND_ACTIVE while (sfifo_des_p->active_list.head == NULL) { //pthread_cond_timedwait(&(sfifo_des_p->active_list.cond), &(sfifo_des_p->active_list.lock_mutex), &outtime); pthread_cond_wait(&(sfifo_des_p->active_list.cond), &(sfifo_des_p->active_list.lock_mutex)); }#else if (sfifo_des_p->active_list.head == NULL) { printf("active list empty\n"); goto EXIT; }#endif sfifo = sfifo_des_p->active_list.head; sfifo_des_p->active_list.head = sfifo->next;EXIT: pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex)); return sfifo;}int sfifo_put_active_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p){ int send_cond = 0; pthread_mutex_lock(&(sfifo_des_p->active_list.lock_mutex)); if (sfifo_des_p->active_list.head == NULL) { sfifo_des_p->active_list.head = sfifo; sfifo_des_p->active_list.tail = sfifo; sfifo_des_p->active_list.tail->next = NULL; send_cond = 1; } else { sfifo_des_p->active_list.tail->next = sfifo; sfifo_des_p->active_list.tail = sfifo; sfifo_des_p->active_list.tail->next = NULL; } pthread_mutex_unlock(&(sfifo_des_p->active_list.lock_mutex));#ifdef CONFIG_COND_ACTIVE if (send_cond) { pthread_cond_signal(&(sfifo_des_p->active_list.cond)); }#endif return 0;}int dump_sfifo_list(struct sfifo_list_des_s *list){ struct sfifo_s *sfifo = NULL; sfifo = list->head; do { printf("dump : %x\n", sfifo->buffer[0]); usleep(500 * 1000); } while (sfifo->next != NULL && (sfifo = sfifo->next)); return 0;}struct sfifo_des_s *sfifo_init(int sfifo_num, int sfifo_buffer_size, int sfifo_active_max_num){ int i = 0; struct sfifo_s *sfifo; struct sfifo_des_s *sfifo_des_p; sfifo_des_p = (struct sfifo_des_s *)malloc(sizeof(struct sfifo_des_s)); sfifo_des_p->sfifos_num = sfifo_num; sfifo_des_p->sfifos_active_max_num = sfifo_active_max_num; sfifo_des_p->free_list.sfifo_num = 0; sfifo_des_p->free_list.head = NULL; sfifo_des_p->free_list.tail = NULL; pthread_mutex_init(&sfifo_des_p->free_list.lock_mutex, NULL); pthread_cond_init(&sfifo_des_p->free_list.cond, NULL); sfifo_des_p->active_list.sfifo_num = 0; sfifo_des_p->active_list.head = NULL; sfifo_des_p->active_list.tail = NULL; pthread_mutex_init(&sfifo_des_p->active_list.lock_mutex, NULL); pthread_cond_init(&sfifo_des_p->active_list.cond, NULL); for (i = 0; i < sfifo_num; i++) { sfifo = (struct sfifo_s *)malloc(sizeof(struct sfifo_s)); sfifo->buffer = (unsigned char *)malloc(sfifo_buffer_size); printf("sfifo_init: %x\n", sfifo->buffer); memset(sfifo->buffer, i, sfifo_buffer_size); sfifo->size = sfifo_buffer_size; sfifo->next = NULL; sfifo_put_free_buf(sfifo, sfifo_des_p); } return sfifo_des_p;}void *productor_thread_func(void *arg){ struct sfifo_s *sfifo; while (1) { sfifo = sfifo_get_free_buf(my_sfifo_des); if (sfifo != NULL) { printf("+++++++++++++++++ put : %x\n", sfifo->buffer[0]); sfifo_put_active_buf(sfifo, my_sfifo_des); } //usleep(20*1000); }}void *comsumer_thread_func(void *arg){ struct sfifo_s *sfifo; int count = 0; while (1) { sfifo = sfifo_get_active_buf(my_sfifo_des); if (sfifo != NULL) { printf("---------------- get %x\n", sfifo->buffer[0]); sfifo_put_free_buf(sfifo, my_sfifo_des); } //usleep(10 * 1000); // if (count++ > 10000) { // exit(-1); // } }}int main(){ int ret; static pthread_t productor_thread; static pthread_t consumer_thread; struct sfifo_s *r_sfifo; my_sfifo_des = sfifo_init(10, 4096, 5); ret = pthread_create(&productor_thread, NULL, productor_thread_func, NULL); ret = pthread_create(&consumer_thread, NULL, comsumer_thread_func, NULL); while (1) { sleep(1); } return 0;}
以上是一个简单的生产者消费者队列的c语言的实现,对应的头文件在本文底部(贴代码太长看起来很崩溃)。
仔细的同学可能会发现,以上代码sfifo_get_free_buf()中默认是loop轮询检测free buffer链表的,你前面不是说了一大堆不能loop吗?怎么还用loop呢?
这里其实有两个原因:走读和运行以上代码的同学应该可以发现这里做了一个简单可运行的demo模拟了生产者和消费者双方:
void *productor_thread_func(void *arg){ struct sfifo_s *sfifo; while (1) { sfifo = sfifo_get_free_buf(my_sfifo_des); if (sfifo != NULL) { printf("+++++++++++++++++ put : %x\n", sfifo->buffer[0]); sfifo_put_active_buf(sfifo, my_sfifo_des); } //usleep(20*1000); }}void *comsumer_thread_func(void *arg){ struct sfifo_s *sfifo; int count = 0; while (1) { sfifo = sfifo_get_active_buf(my_sfifo_des); if (sfifo != NULL) { printf("---------------- get %x\n", sfifo->buffer[0]); sfifo_put_free_buf(sfifo, my_sfifo_des); } //usleep(10 * 1000); // if (count++ > 10000) { // exit(-1); // } }}
这里面对于使用者的优点有:
以上描述了一个生产者消费者队列c语言的实现,为什么是c语言版本的?因为其他高级语言,有很多成熟的库提供了该功能,完全不用自己写,而c就没这么完善了,不过这也说明了c的简单灵活。但悲哀的是很多人因此进行了很ugly的实现。
多吐槽几句,嵌入式行业由于各种技术原因,导致开发语言还是采用c,这样对开发人员有了不小的要求,而如何才能写一些优雅的代码,对人的素质有了要求,但现状是优秀的开发者都被互联网行业抢走了,导致嵌入式行业开发人员的水平参差不齐,本来应该是一个对编码能力要求很高的行业被一些水平低下的开发者占据。so,我离开了这个行业了。。。
附:模块头文件,类linux用户可通过gcc xxx.c命令build该demo,然后运行测试。
#ifndef SFIFO_H_#define SFIFO_H_struct sfifo_list_des_s { int sfifo_num; struct sfifo_s *head; struct sfifo_s *tail; pthread_mutex_t lock_mutex; pthread_cond_t cond;};struct sfifo_des_s { int sfifo_init; unsigned int sfifos_num; unsigned int sfifos_active_max_num; struct sfifo_list_des_s free_list; struct sfifo_list_des_s active_list;};struct sfifo_s { unsigned char *buffer; unsigned int size; struct sfifo_s *next;};extern struct sfifo_des_s *sfifo_init(int sfifo_num, int sfifo_buffer_size, int sfifo_active_max_num);/* productor */extern struct sfifo_s* sfifo_get_free_buf(struct sfifo_des_s *sfifo_des_p);extern int sfifo_put_free_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p);/* consumer */extern struct sfifo_s* sfifo_get_active_buf(struct sfifo_des_s *sfifo_des_p);extern int sfifo_put_active_buf(struct sfifo_s *sfifo, struct sfifo_des_s *sfifo_des_p);#endif