Reimplement recording, still lacks a few details

* Starts too early
* Subscription sorting is not stable (qsort) so if multiple recording colides there will be problems
* Settings is not configurable
This commit is contained in:
Andreas Öman 2008-09-20 06:06:41 +00:00
parent 0504a58304
commit 3086de63e0
4 changed files with 441 additions and 52 deletions

View file

@ -5,7 +5,7 @@ SRCS = main.c access.c dtable.c tcp.c http.c notify.c epg.c xmltv.c spawn.c
SRCS += packet.c streaming.c
VPATH += dvr
SRCS += dvr_db.c
SRCS += dvr_db.c dvr_rec.c
SRCS += channels.c subscriptions.c transports.c

View file

@ -29,6 +29,8 @@ extern char *dvr_format;
extern char *dvr_file_postfix;
LIST_HEAD(dvr_rec_stream_list, dvr_rec_stream);
typedef enum {
DVR_SCHEDULED, /* Scheduled for recording (in the future) */
@ -40,12 +42,28 @@ typedef enum {
typedef struct dvr_entry {
int de_refcnt; /* Modification is protected under global_lock */
/**
* Upon dvr_entry_remove() this fields will be invalidated (and pointers
* NULLed)
*/
LIST_ENTRY(dvr_entry) de_global_link;
int de_id;
channel_t *de_channel;
LIST_ENTRY(dvr_entry) de_channel_link;
gtimer_t de_timer;
/**
* These meta fields will stay valid as long as reference count > 0
*/
time_t de_start;
time_t de_stop;
@ -58,15 +76,31 @@ typedef struct dvr_entry {
char *de_error;
dvr_entry_sched_state_t de_sched_state;
gtimer_t de_timer;
uint32_t de_dont_reschedule;
/**
* Fields for recording
*/
th_subscription_t *de_s;
/**
* Initialized upon SUBSCRIPTION_TRANSPORT_RUN
*/
struct dvr_rec_stream_list de_streams;
streaming_target_t de_st;
AVFormatContext *de_fctx;
enum {
DE_RS_WAIT_AUDIO_LOCK = 0,
DE_RS_WAIT_VIDEO_LOCK,
DE_RS_RUNNING,
DE_RS_COMMERCIAL,
} de_rec_state;
int de_header_written;
} dvr_entry_t;
/**
@ -76,12 +110,16 @@ void dvr_entry_create_by_event(event_t *e, const char *creator);
void dvr_init(void);
void dvr_rec_start(dvr_entry_t *de);
void dvr_rec_subscribe(dvr_entry_t *de);
void dvr_rec_unsubscribe(dvr_entry_t *de);
dvr_entry_t *dvr_entry_find_by_id(int id);
void dvr_entry_cancel(dvr_entry_t *de);
void dvr_entry_dec_ref(dvr_entry_t *de);
/**
* Query interface

View file

@ -64,13 +64,15 @@ dvr_entry_link(dvr_entry_t *de)
{
time_t now, preamble;
de->de_refcnt = 1;
LIST_INSERT_HEAD(&dvrentries, de, de_global_link);
time(&now);
preamble = de->de_start - 30;
if(now >= de->de_stop) {
if(now >= de->de_stop || de->de_dont_reschedule) {
de->de_sched_state = DVR_COMPLETED;
gtimer_arm_abs(&de->de_timer, dvr_timer_expire, de,
de->de_stop + dvr_retention_time);
@ -97,7 +99,7 @@ dvr_entry_create_by_event(event_t *e, const char *creator)
return;
LIST_FOREACH(de, &e->e_channel->ch_dvrs, de_channel_link)
if(de->de_start == e->e_start)
if(de->de_start == e->e_start && de->de_sched_state != DVR_COMPLETED)
return;
de = calloc(1, sizeof(dvr_entry_t));
@ -130,27 +132,42 @@ dvr_entry_create_by_event(event_t *e, const char *creator)
/**
*
*/
static void
dvr_entry_destroy(dvr_entry_t *de)
void
dvr_entry_dec_ref(dvr_entry_t *de)
{
assert(de->de_s == NULL); /* Can't have any subscriptions running */
hts_settings_remove("dvrdb/%d", de->de_id);
lock_assert(&global_lock);
LIST_REMOVE(de, de_channel_link);
gtimer_disarm(&de->de_timer);
LIST_REMOVE(de, de_global_link);
if(de->de_refcnt > 1) {
de->de_refcnt--;
return;
}
free(de->de_creator);
free(de->de_title);
free(de->de_desc);
free(de);
}
/**
*
*/
static void
dvr_entry_remove(dvr_entry_t *de)
{
hts_settings_remove("dvrdb/%d", de->de_id);
gtimer_disarm(&de->de_timer);
LIST_REMOVE(de, de_channel_link);
LIST_REMOVE(de, de_global_link);
dvrdb_changed();
dvr_entry_dec_ref(de);
}
@ -197,6 +214,8 @@ dvr_db_load_one(htsmsg_t *c, int id)
tvh_str_set(&de->de_desc, htsmsg_get_str(c, "description"));
tvh_str_set(&de->de_filename, htsmsg_get_str(c, "filename"));
tvh_str_set(&de->de_error, htsmsg_get_str(c, "error"));
htsmsg_get_u32(c, "noresched", &de->de_dont_reschedule);
dvr_entry_link(de);
}
@ -249,6 +268,8 @@ dvr_save(dvr_entry_t *de)
if(de->de_error != NULL)
htsmsg_add_str(m, "error", de->de_error);
htsmsg_add_u32(m, "noresched", de->de_dont_reschedule);
hts_settings_save(m, "dvrdb/%d", de->de_id);
htsmsg_destroy(m);
}
@ -261,7 +282,7 @@ static void
dvr_timer_expire(void *aux)
{
dvr_entry_t *de = aux;
dvr_entry_destroy(de);
dvr_entry_remove(de);
}
@ -272,16 +293,19 @@ dvr_timer_expire(void *aux)
static void
dvr_stop_recording(dvr_entry_t *de, const char *errmsg)
{
// dvr_rec_stop(de);
dvr_rec_unsubscribe(de);
de->de_sched_state = DVR_COMPLETED;
tvh_str_set(&de->de_error, errmsg);
errmsg = errmsg ?: "Recording completed OK";
tvhlog(LOG_INFO, "dvr", "\"%s\" on \"%s\": %s",
de->de_title, de->de_channel->ch_name, errmsg);
tvhlog(LOG_INFO, "dvr", "\"%s\" on \"%s\": "
"End of program: %s",
de->de_title, de->de_channel->ch_name,
de->de_error ?: "Program ended");
dvrdb_changed();
dvr_save(de);
gtimer_arm_abs(&de->de_timer, dvr_timer_expire, de,
de->de_stop + dvr_retention_time);
}
@ -315,7 +339,7 @@ dvr_timer_start_recording(void *aux)
dvrdb_changed();
// dvr_rec_start(de);
dvr_rec_subscribe(de);
gtimer_arm_abs(&de->de_timer, dvr_timer_stop_recording, de, de->de_stop);
}
@ -342,15 +366,16 @@ dvr_entry_cancel(dvr_entry_t *de)
{
switch(de->de_sched_state) {
case DVR_SCHEDULED:
dvr_entry_destroy(de);
dvr_entry_remove(de);
break;
case DVR_RECORDING:
de->de_dont_reschedule = 1;
dvr_stop_recording(de, "Aborted by user");
break;
case DVR_COMPLETED:
dvr_entry_destroy(de);
dvr_entry_remove(de);
break;
}
}
@ -362,7 +387,7 @@ dvr_entry_cancel(dvr_entry_t *de)
void
dvr_init(void)
{
dvr_storage = strdup("/tmp");
dvr_storage = strdup("/home/andoma/media/dvr");
dvr_format = strdup("matroska");
dvr_file_postfix = strdup("mkv");

View file

@ -22,12 +22,31 @@
#include <sys/stat.h>
#include <libavutil/avstring.h>
#include <libavcodec/avcodec.h>
#include "tvhead.h"
#include "streaming.h"
#include "dvr.h"
static void dvr_transport_available(dvr_entry_t *de);
static void dvr_transport_unavailable(dvr_entry_t *de, const char *errmsg);
typedef struct dvr_rec_stream {
LIST_ENTRY(dvr_rec_stream) drs_link;
int drs_source_index;
AVStream *drs_lavf_stream;
int drs_decoded;
} dvr_rec_stream_t;
/**
*
*/
static void dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp);
static void dvr_rec_stop(dvr_entry_t *de);
static void *dvr_thread(void *aux);
static void dvr_thread_new_pkt(dvr_entry_t *de, th_pkt_t *pkt);
static void dvr_thread_epilog(dvr_entry_t *de);
/**
*
@ -38,13 +57,15 @@ dvr_subscription_callback(struct th_subscription *s,
{
dvr_entry_t *de = opaque;
const char *notifymsg = NULL;
th_transport_t *t;
switch(event) {
case SUBSCRIPTION_EVENT_INVALID:
abort();
case SUBSCRIPTION_TRANSPORT_RUN:
dvr_transport_available(de);
t = s->ths_transport;
dvr_rec_start(de, &t->tht_streaming_pad);
return;
case SUBSCRIPTION_NO_INPUT:
@ -71,11 +92,12 @@ dvr_subscription_callback(struct th_subscription *s,
break;
case SUBSCRIPTION_TRANSPORT_LOST:
dvr_transport_unavailable(de, "Lost transport");
return;
dvr_rec_stop(de);
notifymsg = "Lost transport";
break;
case SUBSCRIPTION_DESTROYED:
dvr_transport_unavailable(de, NULL); /* Recording completed */
dvr_rec_stop(de); /* Recording completed */
return;
}
if(notifymsg != NULL)
@ -88,7 +110,7 @@ dvr_subscription_callback(struct th_subscription *s,
*
*/
void
dvr_rec_start(dvr_entry_t *de)
dvr_rec_subscribe(dvr_entry_t *de)
{
if(de->de_s != NULL)
return;
@ -99,6 +121,19 @@ dvr_rec_start(dvr_entry_t *de)
}
/**
*
*/
void
dvr_rec_unsubscribe(dvr_entry_t *de)
{
if(de->de_s == NULL)
return;
subscription_unsubscribe(de->de_s);
de->de_s = NULL;
}
@ -173,16 +208,18 @@ dvr_rec_fatal_error(dvr_entry_t *de, const char *fmt, ...)
{
}
/**
*
*/
static void
dvr_transport_available(dvr_entry_t *de)
dvr_rec_start(dvr_entry_t *de, streaming_pad_t *sp)
{
streaming_component_t *sc;
dvr_rec_stream_t *drs;
AVOutputFormat *fmt;
AVFormatContext *fctx;
th_stream_t *st;
th_muxstream_t *tms;
AVCodecContext *ctx;
AVCodec *codec;
enum CodecID codec_id;
@ -190,8 +227,8 @@ dvr_transport_available(dvr_entry_t *de)
const char *codec_name;
char urlname[512];
int err;
th_transport_t *t = de->de_s->ths_transport;
pthread_t ptid;
pthread_attr_t attr;
pvr_generate_filename(de);
@ -233,32 +270,35 @@ dvr_transport_available(dvr_entry_t *de)
av_set_parameters(fctx, NULL);
pthread_mutex_lock(sp->sp_mutex);
/**
* Setup each stream
*/
LIST_FOREACH(st, &t->tht_streams, st_link) {
switch(st->st_type) {
LIST_FOREACH(sc, &sp->sp_components, sc_link) {
switch(sc->sc_type) {
default:
continue;
case HTSTV_MPEG2VIDEO:
case SCT_MPEG2VIDEO:
codec_id = CODEC_ID_MPEG2VIDEO;
codec_type = CODEC_TYPE_VIDEO;
codec_name = "mpeg2 video";
break;
case HTSTV_MPEG2AUDIO:
case SCT_MPEG2AUDIO:
codec_id = CODEC_ID_MP2;
codec_type = CODEC_TYPE_AUDIO;
codec_name = "mpeg2 audio";
break;
case HTSTV_AC3:
case SCT_AC3:
codec_id = CODEC_ID_AC3;
codec_type = CODEC_TYPE_AUDIO;
codec_name = "AC3 audio";
break;
case HTSTV_H264:
case SCT_H264:
codec_id = CODEC_ID_H264;
codec_type = CODEC_TYPE_VIDEO;
codec_name = "h.264 video";
@ -273,13 +313,12 @@ dvr_transport_available(dvr_entry_t *de)
continue;
}
tms = calloc(1, sizeof(th_muxstream_t));
tms->tms_stream = st;
tms->tms_index = fctx->nb_streams;
drs = calloc(1, sizeof(dvr_rec_stream_t));
drs->drs_source_index = sc->sc_index;
tms->tms_avstream = av_new_stream(fctx, fctx->nb_streams);
drs->drs_lavf_stream = av_new_stream(fctx, fctx->nb_streams);
ctx = tms->tms_avstream->codec;
ctx = drs->drs_lavf_stream->codec;
ctx->codec_id = codec_id;
ctx->codec_type = codec_type;
@ -291,18 +330,305 @@ dvr_transport_available(dvr_entry_t *de)
continue;
}
// LIST_INSERT_HEAD(&tm->tm_streams, tms, tms_muxer_link0);
memcpy(tms->tms_avstream->language, tms->tms_stream->st_lang, 4);
memcpy(drs->drs_lavf_stream->language, sc->sc_lang, 4);
LIST_INSERT_HEAD(&de->de_streams, drs, drs_link);
}
/* Link to the pad */
streaming_target_init(&de->de_st);
streaming_target_connect(sp, &de->de_st);
de->de_st.st_status = ST_RUNNING;
de->de_fctx = fctx;+
pthread_mutex_unlock(sp->sp_mutex);
de->de_refcnt++;
/* Start the recorder thread */
pthread_attr_init(&attr);
pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED);
pthread_create(&ptid, &attr, dvr_thread, de);
}
/**
* Called from subscription callback when we no longer have
* access to stream
*/
static void
dvr_rec_stop(dvr_entry_t *de)
{
streaming_target_t *st = &de->de_st;
pthread_mutex_lock(&st->st_mutex);
if(st->st_status == ST_RUNNING) {
st->st_status = ST_STOP_REQ;
while(st->st_status != ST_ZOMBIE)
pthread_cond_wait(&st->st_cond, &st->st_mutex);
}
pthread_mutex_unlock(&st->st_mutex);
}
/**
*
*/
static void *
dvr_thread(void *aux)
{
dvr_entry_t *de = aux;
streaming_target_t *st = &de->de_st;
th_pktref_t *pr;
pthread_mutex_lock(&st->st_mutex);
de->de_header_written = 0;
de->de_rec_state = DE_RS_WAIT_AUDIO_LOCK;
while(st->st_status == ST_RUNNING) {
pr = TAILQ_FIRST(&st->st_queue);
if(pr == NULL) {
pthread_cond_wait(&st->st_cond, &st->st_mutex);
continue;
}
TAILQ_REMOVE(&st->st_queue, pr, pr_link);
pthread_mutex_unlock(&st->st_mutex);
dvr_thread_new_pkt(de, pr->pr_pkt);
pkt_ref_dec(pr->pr_pkt);
free(pr);
pthread_mutex_lock(&st->st_mutex);
}
dvr_thread_epilog(de);
streaming_target_disconnect(&de->de_st);
/* Signal back that we no longer is running */
st->st_status = ST_ZOMBIE;
pthread_cond_signal(&st->st_cond);
pthread_mutex_unlock(&st->st_mutex);
pthread_mutex_lock(&global_lock);
dvr_entry_dec_ref(de); /* Past this we may no longer
dereference de */
pthread_mutex_unlock(&global_lock);
/* Fade out ... */
return NULL;
}
/**
* Check if all streams of the given type has been decoded
*/
static int
is_all_decoded(dvr_entry_t *de, enum CodecType type)
{
dvr_rec_stream_t *drs;
AVStream *st;
LIST_FOREACH(drs, &de->de_streams, drs_link) {
st = drs->drs_lavf_stream;
if(st->codec->codec->type == type && drs->drs_decoded == 0)
return 0;
}
return 1;
}
/**
*
*/
static void
dvr_thread_new_pkt(dvr_entry_t *de, th_pkt_t *pkt)
{
AVFormatContext *fctx = de->de_fctx;
dvr_rec_stream_t *drs;
AVStream *st, *stx;
AVCodecContext *ctx;
AVPacket avpkt;
void *abuf;
AVFrame pic;
int r, data_size, i;
void *buf = pkt->pkt_payload;
size_t bufsize = pkt->pkt_payloadlen;
char txt[100];
LIST_FOREACH(drs, &de->de_streams, drs_link)
if(drs->drs_source_index == pkt->pkt_componentindex)
break;
if(drs == NULL)
return;
st = drs->drs_lavf_stream;
ctx = st->codec;
switch(de->de_rec_state) {
default:
break;
case DE_RS_WAIT_AUDIO_LOCK:
if(ctx->codec_type != CODEC_TYPE_AUDIO || drs->drs_decoded)
break;
data_size = AVCODEC_MAX_AUDIO_FRAME_SIZE;
abuf = av_malloc(data_size);
r = avcodec_decode_audio2(ctx, abuf, &data_size, buf, bufsize);
av_free(abuf);
if(r != 0 && data_size) {
tvhlog(LOG_DEBUG, "dvr", "%s - "
"Stream #%d: \"%s\" decoded a complete audio frame: "
"%d channels in %d Hz",
de->de_title, st->index, ctx->codec->name,
ctx->channels, ctx->sample_rate);
drs->drs_decoded = 1;
}
if(is_all_decoded(de, CODEC_TYPE_AUDIO))
de->de_rec_state = DE_RS_WAIT_VIDEO_LOCK;
break;
case DE_RS_WAIT_VIDEO_LOCK:
if(ctx->codec_type != CODEC_TYPE_VIDEO || drs->drs_decoded)
break;
r = avcodec_decode_video(st->codec, &pic, &data_size, buf, bufsize);
if(r != 0 && data_size) {
tvhlog(LOG_DEBUG, "dvr", "%s - "
"Stream #%d: \"%s\" decoded a complete video frame: "
"%d x %d at %.2fHz",
de->de_title, st->index, ctx->codec->name,
ctx->width, st->codec->height,
(float)ctx->time_base.den / (float)ctx->time_base.num);
drs->drs_decoded = 1;
}
if(!is_all_decoded(de, CODEC_TYPE_VIDEO))
break;
/* All Audio & Video decoded, start recording */
de->de_rec_state = DE_RS_RUNNING;
if(!de->de_header_written) {
if(av_write_header(fctx)) {
tvhlog(LOG_ERR, "dvr",
"%s - Unable to write header",
de->de_title);
break;
}
de->de_header_written = 1;
tvhlog(LOG_ERR, "dvr",
"%s - Header written to file, stream dump:",
de->de_title);
for(i = 0; i < fctx->nb_streams; i++) {
stx = fctx->streams[i];
avcodec_string(txt, sizeof(txt), stx->codec, 1);
tvhlog(LOG_ERR, "dvr", "%s - Stream #%d: %s [%d/%d]",
de->de_title, i, txt,
stx->time_base.num, stx->time_base.den);
}
}
/* FALLTHRU */
case DE_RS_RUNNING:
if(de->de_header_written == 0)
break;
if(pkt->pkt_commercial == COMMERCIAL_YES) {
de->de_rec_state = DE_RS_COMMERCIAL;
break;
}
av_init_packet(&avpkt);
avpkt.stream_index = st->index;
avpkt.dts = av_rescale_q(pkt->pkt_dts, AV_TIME_BASE_Q, st->time_base);
avpkt.pts = av_rescale_q(pkt->pkt_pts, AV_TIME_BASE_Q, st->time_base);
avpkt.data = buf;
avpkt.size = bufsize;
avpkt.duration =
av_rescale_q(pkt->pkt_duration, AV_TIME_BASE_Q, st->time_base);
avpkt.flags = pkt->pkt_frametype >= PKT_P_FRAME ? 0 : PKT_FLAG_KEY;
r = av_interleaved_write_frame(fctx, &avpkt);
break;
case DE_RS_COMMERCIAL:
if(pkt->pkt_commercial != COMMERCIAL_YES) {
LIST_FOREACH(drs, &de->de_streams, drs_link)
drs->drs_decoded = 0;
de->de_rec_state = DE_RS_WAIT_AUDIO_LOCK;
}
break;
}
}
/**
*
*/
static void
dvr_transport_unavailable(dvr_entry_t *de, const char *errmsg)
dvr_thread_epilog(dvr_entry_t *de)
{
AVFormatContext *fctx = de->de_fctx;
dvr_rec_stream_t *drs;
AVStream *st;
int i;
assert(fctx != NULL);
/* Write trailer if we've written anything at all */
if(de->de_header_written)
av_write_trailer(fctx);
/* Close lavf streams and format */
for(i = 0; i < fctx->nb_streams; i++) {
st = fctx->streams[i];
avcodec_close(st->codec);
free(st->codec);
free(st);
}
url_fclose(fctx->pb);
free(fctx);
de->de_fctx = NULL;
while((drs = LIST_FIRST(&de->de_streams)) != NULL) {
LIST_REMOVE(drs, drs_link);
free(drs);
}
}