rewrite pvr code so most of the non-write-out code resides in the main thread

This commit is contained in:
Andreas Öman 2007-08-31 15:15:45 +00:00
parent 27600110c9
commit 1f471c1dce
5 changed files with 261 additions and 258 deletions

View file

@ -19,6 +19,8 @@
#ifndef DISPATCH_H
#define DISPATCH_H
extern time_t dispatch_clock;
#define DISPATCH_READ 0x1
#define DISPATCH_WRITE 0x2
#define DISPATCH_ERR 0x4

291
pvr.c
View file

@ -17,6 +17,7 @@
*/
#include <pthread.h>
#include <assert.h>
#include <sys/types.h>
#include <sys/stat.h>
@ -41,14 +42,13 @@
#include "pvr.h"
#include "pvr_rec.h"
#include "epg.h"
#include "dispatch.h"
pthread_mutex_t pvr_mutex = PTHREAD_MUTEX_INITIALIZER;
struct pvr_rec_list pvrr_work_list[PVRR_WORK_MAX];
struct pvr_rec_list pvrr_global_list;
static void pvr_database_load(void);
static void *pvr_main_thread(void *aux);
static void pvr_unrecord(pvr_rec_t *pvrr);
static void pvrr_fsm(pvr_rec_t *pvrr);
/****************************************************************************
*
@ -59,17 +59,10 @@ static void pvr_unrecord(pvr_rec_t *pvrr);
void
pvr_init(void)
{
pthread_t ptid;
pthread_attr_t attr;
pvr_database_load();
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&ptid, &attr, pvr_main_thread, NULL);
}
char
pvr_prog_status(event_t *e)
{
@ -119,51 +112,6 @@ pvr_get_tag_entry(int e)
*
*/
static void
pvrr_launch(pvr_rec_t *pvrr)
{
pthread_t ptid;
pthread_attr_t attr;
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&ptid, &attr, pvr_recorder_thread, pvrr);
}
static void *
pvr_main_thread(void *aux)
{
pvr_rec_t *pvrr;
time_t now;
pthread_mutex_lock(&pvr_mutex);
while(1) {
pthread_mutex_unlock(&pvr_mutex);
sleep(1);
pthread_mutex_lock(&pvr_mutex);
time(&now);
LIST_FOREACH(pvrr, &pvrr_work_list[PVRR_WORK_SCHEDULED], pvrr_work_link) {
/* We start 30 seconds early to allow
* - Transponder to lock
* - MPEG I-frame lock
* - Acquire additional info from TV network (if program is delayed
* and such things)
*/
if(pvrr->pvrr_start - 30 < now) {
LIST_REMOVE(pvrr, pvrr_work_link);
pvrr_launch(pvrr);
}
}
}
}
void
pvr_inform_status_change(pvr_rec_t *pvrr)
@ -183,7 +131,9 @@ pvr_inform_status_change(pvr_rec_t *pvrr)
static void
pvr_free(pvr_rec_t *pvrr)
{
LIST_REMOVE(pvrr, pvrr_work_link);
if(pvrr->pvrr_timer != NULL)
stimer_del(pvrr->pvrr_timer);
LIST_REMOVE(pvrr, pvrr_global_link);
free(pvrr->pvrr_title);
free(pvrr->pvrr_desc);
@ -199,61 +149,49 @@ pvr_free(pvr_rec_t *pvrr)
static void
pvr_unrecord(pvr_rec_t *pvrr)
{
pvr_rec_t *x;
pvr_inform_status_change(pvrr);
if(pvrr->pvrr_status == HTSTV_PVR_STATUS_SCHEDULED) {
x = LIST_NEXT(pvrr, pvrr_work_link);
pvr_free(pvrr);
} else {
pvrr->pvrr_status = HTSTV_PVR_STATUS_ABORTED;
pvrr_fsm(pvrr);
}
clients_enq_ref(-1);
pvr_database_save();
clients_enq_ref(-1);
}
static int
pvr_rec_cmp(pvr_rec_t *a, pvr_rec_t *b)
{
return a->pvrr_start - b->pvrr_start;
}
static int
pvr_glob_cmp(pvr_rec_t *a, pvr_rec_t *b)
{
return b->pvrr_start - a->pvrr_start;
}
static void
pvr_link_pvrr(pvr_rec_t *pvrr)
{
struct pvr_rec_list *l;
pvrr->pvrr_ref = tag_get();
LIST_INSERT_HEAD(&pvrr_global_list, pvrr, pvrr_global_link);
switch(pvrr->pvrr_status) {
case HTSTV_PVR_STATUS_FILE_ERROR:
case HTSTV_PVR_STATUS_DISK_FULL:
case HTSTV_PVR_STATUS_ABORTED:
case HTSTV_PVR_STATUS_BUFFER_ERROR:
case HTSTV_PVR_STATUS_NONE:
break;
case HTSTV_PVR_STATUS_SCHEDULED:
l = &pvrr_work_list[PVRR_WORK_SCHEDULED];
break;
case HTSTV_PVR_STATUS_WAIT_SUBSCRIPTION:
case HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START:
case HTSTV_PVR_STATUS_WAIT_KEY_FRAME:
case HTSTV_PVR_STATUS_RECORDING:
l = &pvrr_work_list[PVRR_WORK_RECORDING];
break;
default:
l = &pvrr_work_list[PVRR_WORK_DONE];
case HTSTV_PVR_STATUS_PAUSED_COMMERCIAL:
pvrr->pvrr_status = HTSTV_PVR_STATUS_SCHEDULED;
pvrr_fsm(pvrr);
break;
}
pvrr->pvrr_ref = tag_get();
LIST_INSERT_SORTED(&pvrr_global_list, pvrr, pvrr_global_link, pvr_glob_cmp);
LIST_INSERT_SORTED(l, pvrr, pvrr_work_link, pvr_rec_cmp);
clients_enq_ref(-1);
pvr_inform_status_change(pvrr);
clients_enq_ref(-1);
}
@ -346,6 +284,7 @@ pvr_database_save(void)
pvr_rec_t *pvrr, *next;
FILE *fp;
struct stat st;
char status;
fp = fopen(pvr_schedule_path_get(), "w");
if(fp == NULL)
@ -377,6 +316,8 @@ pvr_database_save(void)
if(pvrr->pvrr_desc)
fprintf(fp, "desc %s\n", pvrr->pvrr_desc);
status = pvrr->pvrr_status;
fprintf(fp, "status %c\n", pvrr->pvrr_status);
fprintf(fp, "end of record ------------------------------\n");
@ -437,7 +378,7 @@ pvr_database_load(void)
else if(!strcmp(key, "status"))
pvrr->pvrr_status = *val;
else if(!strcmp(key, "end")) {
if(pvrr->pvrr_channel == NULL ||
@ -447,6 +388,7 @@ pvr_database_load(void)
memset(pvrr, 0, sizeof(pvr_rec_t));
continue;
}
pvr_link_pvrr(pvrr);
pvrr = NULL;
}
@ -460,3 +402,170 @@ pvr_database_load(void)
}
/**
* wait for thread to exit
*/
static void
pvr_wait_thread(pvr_rec_t *pvrr)
{
pvr_data_t *pd;
pd = malloc(sizeof(pvr_data_t));
pd->tsb = NULL;
pthread_mutex_lock(&pvrr->pvrr_dq_mutex);
TAILQ_INSERT_TAIL(&pvrr->pvrr_dq, pd, link);
pthread_cond_signal(&pvrr->pvrr_dq_cond);
pthread_mutex_unlock(&pvrr->pvrr_dq_mutex);
pthread_join(pvrr->pvrr_ptid, NULL);
printf("%s: thread joined\n", pvrr->pvrr_printname);
}
/**
* pvrr finite state machine
*/
static void pvr_record_callback(struct th_subscription *s, uint8_t *pkt,
th_pid_t *pi);
static void
pvrr_fsm_timeout(void *aux)
{
pvr_rec_t *pvrr = aux;
pvrr->pvrr_timer = NULL;
pvrr_fsm(pvrr);
}
static void
pvrr_fsm(pvr_rec_t *pvrr)
{
time_t delta;
switch(pvrr->pvrr_status) {
case HTSTV_PVR_STATUS_NONE:
break;
case HTSTV_PVR_STATUS_SCHEDULED:
delta = pvrr->pvrr_start - 30 - dispatch_clock;
assert(pvrr->pvrr_timer == NULL);
if(delta > 0) {
pvrr->pvrr_timer = stimer_add(pvrr_fsm_timeout, pvrr, delta);
break;
}
delta = pvrr->pvrr_stop - dispatch_clock;
if(delta <= 0) {
syslog(LOG_NOTICE, "pvr: \"%s\" - Recording skipped, "
"program has already come to pass", pvrr->pvrr_printname);
pvrr->pvrr_status = HTSTV_PVR_STATUS_DONE;
pvr_inform_status_change(pvrr);
pvr_database_save();
break;
}
/* Add a timer that fires when recording ends */
pvrr->pvrr_timer = stimer_add(pvrr_fsm_timeout, pvrr, delta);
TAILQ_INIT(&pvrr->pvrr_dq);
pthread_cond_init(&pvrr->pvrr_dq_cond, NULL);
pthread_mutex_init(&pvrr->pvrr_dq_mutex, NULL);
pvrr->pvrr_s = channel_subscribe(pvrr->pvrr_channel, pvrr,
pvr_record_callback, 1000, "pvr");
printf("recording, stop timer fires at %ld\n", delta);
pvrr->pvrr_status = HTSTV_PVR_STATUS_WAIT_SUBSCRIPTION;
pvr_inform_status_change(pvrr);
break;
case HTSTV_PVR_STATUS_WAIT_SUBSCRIPTION:
subscription_unsubscribe(pvrr->pvrr_s);
pvrr->pvrr_status = HTSTV_PVR_STATUS_NO_TRANSPONDER;
pvr_inform_status_change(pvrr);
pvr_database_save();
break;
case HTSTV_PVR_STATUS_FILE_ERROR:
case HTSTV_PVR_STATUS_DISK_FULL:
case HTSTV_PVR_STATUS_ABORTED:
case HTSTV_PVR_STATUS_BUFFER_ERROR:
subscription_unsubscribe(pvrr->pvrr_s);
pvr_inform_status_change(pvrr);
pvr_database_save();
pvr_wait_thread(pvrr);
break;
case HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START:
case HTSTV_PVR_STATUS_WAIT_KEY_FRAME:
case HTSTV_PVR_STATUS_RECORDING:
case HTSTV_PVR_STATUS_PAUSED_COMMERCIAL:
delta = pvrr->pvrr_stop - dispatch_clock;
printf("____ DELTA = %ld\n", delta);
if(delta <= 0) {
/* recording completed */
printf("recording completed\n");
subscription_unsubscribe(pvrr->pvrr_s);
pvrr->pvrr_status = HTSTV_PVR_STATUS_DONE;
pvr_inform_status_change(pvrr);
pvr_database_save();
pvr_wait_thread(pvrr);
}
if(pvrr->pvrr_timer != NULL)
stimer_del(pvrr->pvrr_timer);
pvrr->pvrr_timer = stimer_add(pvrr_fsm_timeout, pvrr, delta);
break;
}
}
/*
* PVR data input callback
*/
static void
pvr_record_callback(struct th_subscription *s, uint8_t *pkt, th_pid_t *pi)
{
pvr_data_t *pd;
pvr_rec_t *pvrr = s->ths_opaque;
if(pkt == NULL)
return;
pd = malloc(sizeof(pvr_data_t));
pd->tsb = malloc(188);
memcpy(pd->tsb, pkt, 188);
pd->pi = *pi;
pthread_mutex_lock(&pvrr->pvrr_dq_mutex);
TAILQ_INSERT_TAIL(&pvrr->pvrr_dq, pd, link);
pvrr->pvrr_dq_len++;
pthread_cond_signal(&pvrr->pvrr_dq_cond);
pthread_mutex_unlock(&pvrr->pvrr_dq_mutex);
if(pvrr->pvrr_status == HTSTV_PVR_STATUS_WAIT_SUBSCRIPTION) {
/* ok, first packet, start recording thread */
printf("recording starting\n");
pvrr->pvrr_status = HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START;
pthread_create(&pvrr->pvrr_ptid, NULL, pvr_recorder_thread, pvrr);
}
}

2
pvr.h
View file

@ -20,8 +20,6 @@
#define PVR_H
extern char *pvrpath;
extern pthread_mutex_t pvr_mutex;
extern struct pvr_rec_list pvrr_work_list[PVRR_WORK_MAX];
extern struct pvr_rec_list pvrr_global_list;
typedef enum {

211
pvr_rec.c
View file

@ -79,129 +79,72 @@ static int pwo_end(pvr_rec_t *pvrr);
static void pvr_generate_filename(pvr_rec_t *pvrr);
static void pvr_record_callback(struct th_subscription *s, uint8_t *pkt,
th_pid_t *pi);
/*
* Main decoder thread
* Recording thread
*/
void *
pvr_recorder_thread(void *aux)
{
pvr_rec_t *pvrr = aux;
th_channel_t *ch = pvrr->pvrr_channel;
pvr_data_t *pd;
time_t now;
int x;
char *t, txt2[50];
int x, run = 1;
struct ts_pid_head pids;
ts_pid_t *tsp;
void *opaque = NULL;
th_subscription_t *s;
char txt[50], txt2[50], *t;
th_subscription_t *s = pvrr->pvrr_s;
void *opaque;
time_t now;
LIST_INIT(&pids);
pthread_mutex_lock(&pvr_mutex);
LIST_INSERT_HEAD(&pvrr_work_list[PVRR_WORK_RECORDING], pvrr, pvrr_work_link);
pvr_generate_filename(pvrr);
time(&now);
if(pvrr->pvrr_stop <= now) {
syslog(LOG_NOTICE,
"pvr: \"%s\" - Recording skipped, program has already come to pass",
pvrr->pvrr_printname);
goto done;
}
TAILQ_INIT(&pvrr->pvrr_dq);
pthread_cond_init(&pvrr->pvrr_dq_cond, NULL);
pthread_mutex_init(&pvrr->pvrr_dq_mutex, NULL);
s = channel_subscribe(ch, pvrr, pvr_record_callback, 1000, "pvr");
/* Wait for a transponder to become available */
x = 0;
while(1) {
if(s->ths_transport != NULL)
break;
x++;
pthread_mutex_unlock(&pvr_mutex);
sleep(1);
pthread_mutex_lock(&pvr_mutex);
time(&now);
if(now >= pvrr->pvrr_stop) {
syslog(LOG_ERR,
"pvr: \"%s\" - Could not allocate transponder, recording failed",
pvrr->pvrr_printname);
pvrr->pvrr_status = HTSTV_PVR_STATUS_NO_TRANSPONDER;
goto err;
}
}
pthread_mutex_unlock(&pvr_mutex);
time(&now);
opaque = pwo_init(s, pvrr);
if(opaque == NULL) {
pvrr->pvrr_status = HTSTV_PVR_STATUS_FILE_ERROR;
goto err;
return NULL;
}
if(x > 2) {
snprintf(txt, sizeof(txt),
", %d seconds delayed due to unavailable transponder", x);
} else {
txt[0] = 0;
}
ctime_r(&pvrr->pvrr_stop, txt2);
t = strchr(txt2, '\n');
if(t != NULL)
*t = 0;
syslog(LOG_INFO, "pvr: \"%s\" - Recording started%s, ends at %s",
pvrr->pvrr_printname, txt, txt2);
syslog(LOG_INFO, "pvr: \"%s\" - Recording started, ends at %s",
pvrr->pvrr_printname, txt2);
pvrr->pvrr_status = HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START;
pvr_inform_status_change(pvrr);
while(
pvrr->pvrr_status == HTSTV_PVR_STATUS_RECORDING ||
pvrr->pvrr_status == HTSTV_PVR_STATUS_WAIT_KEY_FRAME ||
pvrr->pvrr_status == HTSTV_PVR_STATUS_PAUSED_COMMERCIAL ||
pvrr->pvrr_status == HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START
) {
time(&now);
LIST_INIT(&pids);
if(pvrr->pvrr_status == HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START &&
now >= pvrr->pvrr_start) {
pvrr->pvrr_status = HTSTV_PVR_STATUS_WAIT_KEY_FRAME;
while(run) {
switch(pvrr->pvrr_status) {
case HTSTV_PVR_STATUS_PAUSED_WAIT_FOR_START:
time(&now);
if(now >= pvrr->pvrr_start)
pvrr->pvrr_status = HTSTV_PVR_STATUS_WAIT_KEY_FRAME;
break;
case HTSTV_PVR_STATUS_RECORDING:
case HTSTV_PVR_STATUS_WAIT_KEY_FRAME:
case HTSTV_PVR_STATUS_PAUSED_COMMERCIAL:
break;
default:
run = 0;
continue;
}
if(pvrr->pvrr_stop < now) {
pvrr->pvrr_status = HTSTV_PVR_STATUS_DONE;
syslog(LOG_INFO, "pvr: \"%s\" - Recording completed",
pvrr->pvrr_printname);
break;
}
pthread_mutex_lock(&pvrr->pvrr_dq_mutex);
while((pd = TAILQ_FIRST(&pvrr->pvrr_dq)) == NULL)
@ -221,25 +164,30 @@ pvr_recorder_thread(void *aux)
break;
}
x = pvr_proc_tsb(pvrr, &pids, pd, s);
free(pd->tsb);
free(pd);
if(pd->tsb == NULL) {
run = 0;
} else {
if(x != 0) {
x = pvr_proc_tsb(pvrr, &pids, pd, s);
free(pd->tsb);
switch(errno) {
case ENOSPC:
pvrr->pvrr_status = HTSTV_PVR_STATUS_DISK_FULL;
syslog(LOG_INFO, "pvr: \"%s\" - Disk full, aborting",
pvrr->pvrr_printname);
break;
default:
pvrr->pvrr_status = HTSTV_PVR_STATUS_FILE_ERROR;
syslog(LOG_INFO, "pvr: \"%s\" - File error, aborting",
pvrr->pvrr_printname);
break;
if(x != 0) {
switch(errno) {
case ENOSPC:
pvrr->pvrr_status = HTSTV_PVR_STATUS_DISK_FULL;
syslog(LOG_INFO, "pvr: \"%s\" - Disk full, aborting",
pvrr->pvrr_printname);
break;
default:
pvrr->pvrr_status = HTSTV_PVR_STATUS_FILE_ERROR;
syslog(LOG_INFO, "pvr: \"%s\" - File error, aborting",
pvrr->pvrr_printname);
break;
}
}
}
free(pd);
}
pwo_end(pvrr);
@ -250,65 +198,12 @@ pvr_recorder_thread(void *aux)
free(tsp);
}
pthread_mutex_lock(&pvr_mutex);
err:
subscription_unsubscribe(s);
/*
* Drain any pending blocks
*/
pthread_mutex_lock(&pvrr->pvrr_dq_mutex);
while((pd = TAILQ_FIRST(&pvrr->pvrr_dq)) != NULL) {
TAILQ_REMOVE(&pvrr->pvrr_dq, pd, link);
free(pd->tsb);
free(pd);
}
pthread_mutex_unlock(&pvrr->pvrr_dq_mutex);
done:
pvr_inform_status_change(pvrr);
LIST_REMOVE(pvrr, pvrr_work_link);
LIST_INSERT_HEAD(&pvrr_work_list[PVRR_WORK_DONE], pvrr, pvrr_work_link);
pvr_database_save();
pthread_mutex_unlock(&pvr_mutex);
pvrr->pvrr_ptid = 0;
return NULL;
}
/*
* Data input callback
*/
static void
pvr_record_callback(struct th_subscription *s, uint8_t *pkt, th_pid_t *pi)
{
pvr_data_t *pd;
pvr_rec_t *pvrr = s->ths_opaque;
if(pkt == NULL)
return;
pd = malloc(sizeof(pvr_data_t));
pd->tsb = malloc(188);
memcpy(pd->tsb, pkt, 188);
pd->pi = *pi;
pthread_mutex_lock(&pvrr->pvrr_dq_mutex);
TAILQ_INSERT_TAIL(&pvrr->pvrr_dq, pd, link);
pvrr->pvrr_dq_len++;
pthread_cond_signal(&pvrr->pvrr_dq_cond);
pthread_mutex_unlock(&pvrr->pvrr_dq_mutex);
}
/**

View file

@ -490,7 +490,6 @@ typedef struct pvr_data {
typedef struct pvr_rec {
LIST_ENTRY(pvr_rec) pvrr_global_link;
LIST_ENTRY(pvr_rec) pvrr_work_link;
th_channel_t *pvrr_channel;
@ -518,14 +517,14 @@ typedef struct pvr_rec {
void *pvrr_opaque; /* For write out code */
th_subscription_t *pvrr_s;
pthread_t pvrr_ptid;
void *pvrr_timer;
} pvr_rec_t;
#define PVRR_WORK_SCHEDULED 0
#define PVRR_WORK_RECORDING 1
#define PVRR_WORK_DONE 2
#define PVRR_WORK_MAX 3
extern struct pvr_rec_list pvrr_work_list[PVRR_WORK_MAX];
extern struct pvr_rec_list pvrr_global_list;
config_entry_t *find_mux_config(const char *muxtype, const char *muxname);