From 1f471c1dcea1984a24ff46fd196a9681de5ca215 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Fri, 31 Aug 2007 15:15:45 +0000 Subject: [PATCH] rewrite pvr code so most of the non-write-out code resides in the main thread --- dispatch.h | 2 + pvr.c | 291 ++++++++++++++++++++++++++++++++++++----------------- pvr.h | 2 - pvr_rec.c | 211 ++++++++++---------------------------- tvhead.h | 13 ++- 5 files changed, 261 insertions(+), 258 deletions(-) diff --git a/dispatch.h b/dispatch.h index 6e9d8653..7a50fbec 100644 --- a/dispatch.h +++ b/dispatch.h @@ -19,6 +19,8 @@ #ifndef DISPATCH_H #define DISPATCH_H +extern time_t dispatch_clock; + #define DISPATCH_READ 0x1 #define DISPATCH_WRITE 0x2 #define DISPATCH_ERR 0x4 diff --git a/pvr.c b/pvr.c index 7f4952a1..2b82668b 100644 --- a/pvr.c +++ b/pvr.c @@ -17,6 +17,7 @@ */ #include +#include #include #include @@ -41,14 +42,13 @@ #include "pvr.h" #include "pvr_rec.h" #include "epg.h" +#include "dispatch.h" -pthread_mutex_t pvr_mutex = PTHREAD_MUTEX_INITIALIZER; -struct pvr_rec_list pvrr_work_list[PVRR_WORK_MAX]; struct pvr_rec_list pvrr_global_list; static void pvr_database_load(void); -static void *pvr_main_thread(void *aux); static void pvr_unrecord(pvr_rec_t *pvrr); +static void pvrr_fsm(pvr_rec_t *pvrr); /**************************************************************************** * @@ -59,17 +59,10 @@ static void pvr_unrecord(pvr_rec_t *pvrr); void pvr_init(void) { - pthread_t ptid; - pthread_attr_t attr; - pvr_database_load(); - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - - pthread_create(&ptid, &attr, pvr_main_thread, NULL); } + char pvr_prog_status(event_t *e) { @@ -119,51 +112,6 @@ pvr_get_tag_entry(int e) * */ -static void -pvrr_launch(pvr_rec_t *pvrr) -{ - pthread_t ptid; - pthread_attr_t attr; - - pthread_attr_init(&attr); - pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); - pthread_create(&ptid, &attr, pvr_recorder_thread, pvrr); -} - - -static void * -pvr_main_thread(void *aux) -{ - pvr_rec_t *pvrr; - time_t now; - - pthread_mutex_lock(&pvr_mutex); - - while(1) { - - pthread_mutex_unlock(&pvr_mutex); - sleep(1); - pthread_mutex_lock(&pvr_mutex); - - time(&now); - - LIST_FOREACH(pvrr, &pvrr_work_list[PVRR_WORK_SCHEDULED], pvrr_work_link) { - - /* We start 30 seconds early to allow - * - Transponder to lock - * - MPEG I-frame lock - * - Acquire additional info from TV network (if program is delayed - * and such things) - */ - - if(pvrr->pvrr_start - 30 < now) { - LIST_REMOVE(pvrr, pvrr_work_link); - pvrr_launch(pvrr); - } - } - } -} - void pvr_inform_status_change(pvr_rec_t *pvrr) @@ -183,7 +131,9 @@ pvr_inform_status_change(pvr_rec_t *pvrr) static void pvr_free(pvr_rec_t *pvrr) { - LIST_REMOVE(pvrr, pvrr_work_link); + if(pvrr->pvrr_timer != NULL) + stimer_del(pvrr->pvrr_timer); + LIST_REMOVE(pvrr, pvrr_global_link); free(pvrr->pvrr_title); free(pvrr->pvrr_desc); @@ -199,61 +149,49 @@ pvr_free(pvr_rec_t *pvrr) static void pvr_unrecord(pvr_rec_t *pvrr) { - pvr_rec_t *x; - - pvr_inform_status_change(pvrr); - if(pvrr->pvrr_status == HTSTV_PVR_STATUS_SCHEDULED) { - x = LIST_NEXT(pvrr, pvrr_work_link); pvr_free(pvrr); } else { pvrr->pvrr_status = HTSTV_PVR_STATUS_ABORTED; + pvrr_fsm(pvrr); } - - clients_enq_ref(-1); + pvr_database_save(); - + clients_enq_ref(-1); } -static int -pvr_rec_cmp(pvr_rec_t *a, pvr_rec_t *b) -{ - return a->pvrr_start - b->pvrr_start; -} - -static int -pvr_glob_cmp(pvr_rec_t *a, pvr_rec_t *b) -{ - return b->pvrr_start - a->pvrr_start; -} - static void pvr_link_pvrr(pvr_rec_t *pvrr) { - struct pvr_rec_list *l; + pvrr->pvrr_ref = tag_get(); + + LIST_INSERT_HEAD(&pvrr_global_list, pvrr, pvrr_global_link); + switch(pvrr->pvrr_status) { + case HTSTV_PVR_STATUS_FILE_ERROR: + case HTSTV_PVR_STATUS_DISK_FULL: + case HTSTV_PVR_STATUS_ABORTED: + case HTSTV_PVR_STATUS_BUFFER_ERROR: + case HTSTV_PVR_STATUS_NONE: + break; + case HTSTV_PVR_STATUS_SCHEDULED: - l = &pvrr_work_list[PVRR_WORK_SCHEDULED]; - break; - + case HTSTV_PVR_STATUS_WAIT_SUBSCRIPTION: + case HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START: + case HTSTV_PVR_STATUS_WAIT_KEY_FRAME: case HTSTV_PVR_STATUS_RECORDING: - l = &pvrr_work_list[PVRR_WORK_RECORDING]; - break; - - default: - l = &pvrr_work_list[PVRR_WORK_DONE]; + case HTSTV_PVR_STATUS_PAUSED_COMMERCIAL: + pvrr->pvrr_status = HTSTV_PVR_STATUS_SCHEDULED; + pvrr_fsm(pvrr); break; } - pvrr->pvrr_ref = tag_get(); - LIST_INSERT_SORTED(&pvrr_global_list, pvrr, pvrr_global_link, pvr_glob_cmp); - LIST_INSERT_SORTED(l, pvrr, pvrr_work_link, pvr_rec_cmp); - clients_enq_ref(-1); pvr_inform_status_change(pvrr); + clients_enq_ref(-1); } @@ -346,6 +284,7 @@ pvr_database_save(void) pvr_rec_t *pvrr, *next; FILE *fp; struct stat st; + char status; fp = fopen(pvr_schedule_path_get(), "w"); if(fp == NULL) @@ -377,6 +316,8 @@ pvr_database_save(void) if(pvrr->pvrr_desc) fprintf(fp, "desc %s\n", pvrr->pvrr_desc); + status = pvrr->pvrr_status; + fprintf(fp, "status %c\n", pvrr->pvrr_status); fprintf(fp, "end of record ------------------------------\n"); @@ -437,7 +378,7 @@ pvr_database_load(void) else if(!strcmp(key, "status")) pvrr->pvrr_status = *val; - + else if(!strcmp(key, "end")) { if(pvrr->pvrr_channel == NULL || @@ -447,6 +388,7 @@ pvr_database_load(void) memset(pvrr, 0, sizeof(pvr_rec_t)); continue; } + pvr_link_pvrr(pvrr); pvrr = NULL; } @@ -460,3 +402,170 @@ pvr_database_load(void) } +/** + * wait for thread to exit + */ + +static void +pvr_wait_thread(pvr_rec_t *pvrr) +{ + pvr_data_t *pd; + + pd = malloc(sizeof(pvr_data_t)); + pd->tsb = NULL; + pthread_mutex_lock(&pvrr->pvrr_dq_mutex); + TAILQ_INSERT_TAIL(&pvrr->pvrr_dq, pd, link); + pthread_cond_signal(&pvrr->pvrr_dq_cond); + pthread_mutex_unlock(&pvrr->pvrr_dq_mutex); + + pthread_join(pvrr->pvrr_ptid, NULL); + printf("%s: thread joined\n", pvrr->pvrr_printname); +} + + + + +/** + * pvrr finite state machine + */ + + +static void pvr_record_callback(struct th_subscription *s, uint8_t *pkt, + th_pid_t *pi); + + +static void +pvrr_fsm_timeout(void *aux) +{ + pvr_rec_t *pvrr = aux; + + pvrr->pvrr_timer = NULL; + pvrr_fsm(pvrr); +} + + + +static void +pvrr_fsm(pvr_rec_t *pvrr) +{ + time_t delta; + + switch(pvrr->pvrr_status) { + case HTSTV_PVR_STATUS_NONE: + break; + + case HTSTV_PVR_STATUS_SCHEDULED: + delta = pvrr->pvrr_start - 30 - dispatch_clock; + + assert(pvrr->pvrr_timer == NULL); + + if(delta > 0) { + pvrr->pvrr_timer = stimer_add(pvrr_fsm_timeout, pvrr, delta); + break; + } + + delta = pvrr->pvrr_stop - dispatch_clock; + + if(delta <= 0) { + syslog(LOG_NOTICE, "pvr: \"%s\" - Recording skipped, " + "program has already come to pass", pvrr->pvrr_printname); + pvrr->pvrr_status = HTSTV_PVR_STATUS_DONE; + pvr_inform_status_change(pvrr); + pvr_database_save(); + break; + } + + /* Add a timer that fires when recording ends */ + + pvrr->pvrr_timer = stimer_add(pvrr_fsm_timeout, pvrr, delta); + + TAILQ_INIT(&pvrr->pvrr_dq); + pthread_cond_init(&pvrr->pvrr_dq_cond, NULL); + pthread_mutex_init(&pvrr->pvrr_dq_mutex, NULL); + + pvrr->pvrr_s = channel_subscribe(pvrr->pvrr_channel, pvrr, + pvr_record_callback, 1000, "pvr"); + + printf("recording, stop timer fires at %ld\n", delta); + + pvrr->pvrr_status = HTSTV_PVR_STATUS_WAIT_SUBSCRIPTION; + pvr_inform_status_change(pvrr); + break; + + case HTSTV_PVR_STATUS_WAIT_SUBSCRIPTION: + subscription_unsubscribe(pvrr->pvrr_s); + + pvrr->pvrr_status = HTSTV_PVR_STATUS_NO_TRANSPONDER; + pvr_inform_status_change(pvrr); + pvr_database_save(); + break; + + case HTSTV_PVR_STATUS_FILE_ERROR: + case HTSTV_PVR_STATUS_DISK_FULL: + case HTSTV_PVR_STATUS_ABORTED: + case HTSTV_PVR_STATUS_BUFFER_ERROR: + subscription_unsubscribe(pvrr->pvrr_s); + pvr_inform_status_change(pvrr); + pvr_database_save(); + pvr_wait_thread(pvrr); + break; + + case HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START: + case HTSTV_PVR_STATUS_WAIT_KEY_FRAME: + case HTSTV_PVR_STATUS_RECORDING: + case HTSTV_PVR_STATUS_PAUSED_COMMERCIAL: + + delta = pvrr->pvrr_stop - dispatch_clock; + printf("____ DELTA = %ld\n", delta); + if(delta <= 0) { + /* recording completed */ + printf("recording completed\n"); + subscription_unsubscribe(pvrr->pvrr_s); + + pvrr->pvrr_status = HTSTV_PVR_STATUS_DONE; + pvr_inform_status_change(pvrr); + pvr_database_save(); + pvr_wait_thread(pvrr); + } + + if(pvrr->pvrr_timer != NULL) + stimer_del(pvrr->pvrr_timer); + + pvrr->pvrr_timer = stimer_add(pvrr_fsm_timeout, pvrr, delta); + break; + } +} + + + +/* + * PVR data input callback + */ + +static void +pvr_record_callback(struct th_subscription *s, uint8_t *pkt, th_pid_t *pi) +{ + pvr_data_t *pd; + pvr_rec_t *pvrr = s->ths_opaque; + + if(pkt == NULL) + return; + + pd = malloc(sizeof(pvr_data_t)); + pd->tsb = malloc(188); + memcpy(pd->tsb, pkt, 188); + pd->pi = *pi; + pthread_mutex_lock(&pvrr->pvrr_dq_mutex); + TAILQ_INSERT_TAIL(&pvrr->pvrr_dq, pd, link); + pvrr->pvrr_dq_len++; + pthread_cond_signal(&pvrr->pvrr_dq_cond); + pthread_mutex_unlock(&pvrr->pvrr_dq_mutex); + + if(pvrr->pvrr_status == HTSTV_PVR_STATUS_WAIT_SUBSCRIPTION) { + /* ok, first packet, start recording thread */ + printf("recording starting\n"); + + pvrr->pvrr_status = HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START; + pthread_create(&pvrr->pvrr_ptid, NULL, pvr_recorder_thread, pvrr); + } +} diff --git a/pvr.h b/pvr.h index 161b4728..69aa1674 100644 --- a/pvr.h +++ b/pvr.h @@ -20,8 +20,6 @@ #define PVR_H extern char *pvrpath; -extern pthread_mutex_t pvr_mutex; -extern struct pvr_rec_list pvrr_work_list[PVRR_WORK_MAX]; extern struct pvr_rec_list pvrr_global_list; typedef enum { diff --git a/pvr_rec.c b/pvr_rec.c index a1216b38..f66b5012 100644 --- a/pvr_rec.c +++ b/pvr_rec.c @@ -79,129 +79,72 @@ static int pwo_end(pvr_rec_t *pvrr); static void pvr_generate_filename(pvr_rec_t *pvrr); -static void pvr_record_callback(struct th_subscription *s, uint8_t *pkt, - th_pid_t *pi); - - /* - * Main decoder thread + * Recording thread */ - - void * pvr_recorder_thread(void *aux) { pvr_rec_t *pvrr = aux; - th_channel_t *ch = pvrr->pvrr_channel; pvr_data_t *pd; - time_t now; - int x; + char *t, txt2[50]; + int x, run = 1; struct ts_pid_head pids; ts_pid_t *tsp; - void *opaque = NULL; - th_subscription_t *s; - char txt[50], txt2[50], *t; + th_subscription_t *s = pvrr->pvrr_s; + void *opaque; + time_t now; - LIST_INIT(&pids); - - pthread_mutex_lock(&pvr_mutex); - - LIST_INSERT_HEAD(&pvrr_work_list[PVRR_WORK_RECORDING], pvrr, pvrr_work_link); - pvr_generate_filename(pvrr); - - time(&now); - - if(pvrr->pvrr_stop <= now) { - syslog(LOG_NOTICE, - "pvr: \"%s\" - Recording skipped, program has already come to pass", - pvrr->pvrr_printname); - goto done; - } - - TAILQ_INIT(&pvrr->pvrr_dq); - pthread_cond_init(&pvrr->pvrr_dq_cond, NULL); - pthread_mutex_init(&pvrr->pvrr_dq_mutex, NULL); - - s = channel_subscribe(ch, pvrr, pvr_record_callback, 1000, "pvr"); - - /* Wait for a transponder to become available */ - - x = 0; - - while(1) { - if(s->ths_transport != NULL) - break; - - x++; - - pthread_mutex_unlock(&pvr_mutex); - sleep(1); - pthread_mutex_lock(&pvr_mutex); - - time(&now); - - if(now >= pvrr->pvrr_stop) { - syslog(LOG_ERR, - "pvr: \"%s\" - Could not allocate transponder, recording failed", - pvrr->pvrr_printname); - pvrr->pvrr_status = HTSTV_PVR_STATUS_NO_TRANSPONDER; - goto err; - } - } - - - pthread_mutex_unlock(&pvr_mutex); - - time(&now); + opaque = pwo_init(s, pvrr); if(opaque == NULL) { pvrr->pvrr_status = HTSTV_PVR_STATUS_FILE_ERROR; - goto err; + return NULL; } - if(x > 2) { - snprintf(txt, sizeof(txt), - ", %d seconds delayed due to unavailable transponder", x); - } else { - txt[0] = 0; - } - ctime_r(&pvrr->pvrr_stop, txt2); t = strchr(txt2, '\n'); if(t != NULL) *t = 0; - syslog(LOG_INFO, "pvr: \"%s\" - Recording started%s, ends at %s", - pvrr->pvrr_printname, txt, txt2); + syslog(LOG_INFO, "pvr: \"%s\" - Recording started, ends at %s", + pvrr->pvrr_printname, txt2); - pvrr->pvrr_status = HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START; - pvr_inform_status_change(pvrr); - while( - pvrr->pvrr_status == HTSTV_PVR_STATUS_RECORDING || - pvrr->pvrr_status == HTSTV_PVR_STATUS_WAIT_KEY_FRAME || - pvrr->pvrr_status == HTSTV_PVR_STATUS_PAUSED_COMMERCIAL || - pvrr->pvrr_status == HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START - ) { - - time(&now); + LIST_INIT(&pids); - if(pvrr->pvrr_status == HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START && - now >= pvrr->pvrr_start) { - pvrr->pvrr_status = HTSTV_PVR_STATUS_WAIT_KEY_FRAME; + while(run) { + + switch(pvrr->pvrr_status) { + case HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START: + + time(&now); + if(now >= pvrr->pvrr_start) + pvrr->pvrr_status = HTSTV_PVR_STATUS_WAIT_KEY_FRAME; + + break; + + case HTSTV_PVR_STATUS_RECORDING: + case HTSTV_PVR_STATUS_WAIT_KEY_FRAME: + case HTSTV_PVR_STATUS_PAUSED_COMMERCIAL: + break; + + default: + run = 0; + continue; } - + if(pvrr->pvrr_stop < now) { - pvrr->pvrr_status = HTSTV_PVR_STATUS_DONE; syslog(LOG_INFO, "pvr: \"%s\" - Recording completed", pvrr->pvrr_printname); break; } + pthread_mutex_lock(&pvrr->pvrr_dq_mutex); while((pd = TAILQ_FIRST(&pvrr->pvrr_dq)) == NULL) @@ -221,25 +164,30 @@ pvr_recorder_thread(void *aux) break; } - x = pvr_proc_tsb(pvrr, &pids, pd, s); - free(pd->tsb); - free(pd); + if(pd->tsb == NULL) { + run = 0; + } else { - if(x != 0) { + x = pvr_proc_tsb(pvrr, &pids, pd, s); + free(pd->tsb); - switch(errno) { - case ENOSPC: - pvrr->pvrr_status = HTSTV_PVR_STATUS_DISK_FULL; - syslog(LOG_INFO, "pvr: \"%s\" - Disk full, aborting", - pvrr->pvrr_printname); - break; - default: - pvrr->pvrr_status = HTSTV_PVR_STATUS_FILE_ERROR; - syslog(LOG_INFO, "pvr: \"%s\" - File error, aborting", - pvrr->pvrr_printname); - break; + if(x != 0) { + + switch(errno) { + case ENOSPC: + pvrr->pvrr_status = HTSTV_PVR_STATUS_DISK_FULL; + syslog(LOG_INFO, "pvr: \"%s\" - Disk full, aborting", + pvrr->pvrr_printname); + break; + default: + pvrr->pvrr_status = HTSTV_PVR_STATUS_FILE_ERROR; + syslog(LOG_INFO, "pvr: \"%s\" - File error, aborting", + pvrr->pvrr_printname); + break; + } } } + free(pd); } pwo_end(pvrr); @@ -250,65 +198,12 @@ pvr_recorder_thread(void *aux) free(tsp); } - pthread_mutex_lock(&pvr_mutex); - - err: - - subscription_unsubscribe(s); - - /* - * Drain any pending blocks - */ - - pthread_mutex_lock(&pvrr->pvrr_dq_mutex); - - while((pd = TAILQ_FIRST(&pvrr->pvrr_dq)) != NULL) { - TAILQ_REMOVE(&pvrr->pvrr_dq, pd, link); - free(pd->tsb); - free(pd); - } - pthread_mutex_unlock(&pvrr->pvrr_dq_mutex); - - - done: - pvr_inform_status_change(pvrr); - - LIST_REMOVE(pvrr, pvrr_work_link); - LIST_INSERT_HEAD(&pvrr_work_list[PVRR_WORK_DONE], pvrr, pvrr_work_link); - - pvr_database_save(); - - pthread_mutex_unlock(&pvr_mutex); + pvrr->pvrr_ptid = 0; return NULL; } -/* - * Data input callback - */ - -static void -pvr_record_callback(struct th_subscription *s, uint8_t *pkt, th_pid_t *pi) -{ - pvr_data_t *pd; - pvr_rec_t *pvrr = s->ths_opaque; - - if(pkt == NULL) - return; - - pd = malloc(sizeof(pvr_data_t)); - pd->tsb = malloc(188); - memcpy(pd->tsb, pkt, 188); - pd->pi = *pi; - pthread_mutex_lock(&pvrr->pvrr_dq_mutex); - TAILQ_INSERT_TAIL(&pvrr->pvrr_dq, pd, link); - pvrr->pvrr_dq_len++; - pthread_cond_signal(&pvrr->pvrr_dq_cond); - pthread_mutex_unlock(&pvrr->pvrr_dq_mutex); -} - - /** diff --git a/tvhead.h b/tvhead.h index 71e079fb..5dc9ab74 100644 --- a/tvhead.h +++ b/tvhead.h @@ -490,7 +490,6 @@ typedef struct pvr_data { typedef struct pvr_rec { LIST_ENTRY(pvr_rec) pvrr_global_link; - LIST_ENTRY(pvr_rec) pvrr_work_link; th_channel_t *pvrr_channel; @@ -518,14 +517,14 @@ typedef struct pvr_rec { void *pvrr_opaque; /* For write out code */ + th_subscription_t *pvrr_s; + + pthread_t pvrr_ptid; + + void *pvrr_timer; + } pvr_rec_t; -#define PVRR_WORK_SCHEDULED 0 -#define PVRR_WORK_RECORDING 1 -#define PVRR_WORK_DONE 2 -#define PVRR_WORK_MAX 3 - -extern struct pvr_rec_list pvrr_work_list[PVRR_WORK_MAX]; extern struct pvr_rec_list pvrr_global_list; config_entry_t *find_mux_config(const char *muxtype, const char *muxname);