diff --git a/Makefile b/Makefile index a957fe71..31bb7a95 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,7 @@ -include ../config.mak -SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c +SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \ + subscriptions.c SRCS += pvr.c pvr_rec.c diff --git a/dvb.c b/dvb.c index b08d16bb..2d017996 100644 --- a/dvb.c +++ b/dvb.c @@ -39,6 +39,7 @@ #include "dvb.h" #include "channels.h" #include "transports.h" +#include "subscriptions.h" #include "teletext.h" #include "epg.h" #include "psi.h" diff --git a/htsclient.c b/htsclient.c index 9ff5d60b..9d5b0361 100644 --- a/htsclient.c +++ b/htsclient.c @@ -30,7 +30,7 @@ #include "tvhead.h" #include "channels.h" -#include "transports.h" +#include "subscriptions.h" #include "pvr.h" #include "epg.h" #include "teletext.h" diff --git a/iptv_output.c b/iptv_output.c index 485ad9ee..6ba85297 100644 --- a/iptv_output.c +++ b/iptv_output.c @@ -32,7 +32,7 @@ #include "iptv_output.h" #include "dispatch.h" #include "channels.h" -#include "transports.h" +#include "subscriptions.h" typedef struct output_multicast { int fd; diff --git a/main.c b/main.c index c12ba213..83f686f4 100644 --- a/main.c +++ b/main.c @@ -46,6 +46,7 @@ #include "pvr.h" #include "dispatch.h" #include "transports.h" +#include "subscriptions.h" #include "iptv_output.h" #include "rtsp.h" @@ -177,7 +178,7 @@ main(int argc, char **argv) pvr_init(); output_multicast_setup(); - transport_scheduler_init(); + subscriptions_init(); rtsp_start(); diff --git a/pvr.c b/pvr.c index eb36e629..17d7e40b 100644 --- a/pvr.c +++ b/pvr.c @@ -37,7 +37,7 @@ #include "tvhead.h" #include "channels.h" -#include "transports.h" +#include "subscriptions.h" #include "htsclient.h" #include "pvr.h" #include "pvr_rec.h" diff --git a/rtsp.c b/rtsp.c index 923fb44e..df9e883c 100644 --- a/rtsp.c +++ b/rtsp.c @@ -30,7 +30,7 @@ #include "tvhead.h" #include "channels.h" -#include "transports.h" +#include "subscriptions.h" #include "pvr.h" #include "epg.h" #include "teletext.h" diff --git a/subscriptions.c b/subscriptions.c new file mode 100644 index 00000000..df87f684 --- /dev/null +++ b/subscriptions.c @@ -0,0 +1,209 @@ +/* + * tvheadend, transport and subscription functions + * Copyright (C) 2007 Andreas Öman + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include +#include + +#include + +#include + +#include "tvhead.h" +#include "dispatch.h" +#include "dvb_dvr.h" +#include "teletext.h" +#include "transports.h" + +#include "v4l.h" +#include "dvb_dvr.h" +#include "iptv_input.h" +#include "psi.h" + +/* + * subscriptions_mutex protects all operations concerning subscription lists + */ + +static pthread_mutex_t subscription_mutex = PTHREAD_MUTEX_INITIALIZER; + +/* + * + */ +void +subscription_lock(void) +{ + pthread_mutex_lock(&subscription_mutex); +} + +void +subscription_unlock(void) +{ + pthread_mutex_unlock(&subscription_mutex); +} + + +struct th_subscription_list subscriptions; + +static void +subscription_reschedule(void) +{ + th_subscription_t *s; + th_transport_t *t; + + LIST_FOREACH(s, &subscriptions, ths_global_link) { + if(s->ths_transport != NULL) + continue; /* Got a transport, we're happy */ + + t = transport_find(s->ths_channel, s->ths_weight); + + if(t == NULL) + continue; + + s->ths_transport = t; + LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link); + } +} + + + + +static void +auto_reschedule(void *aux) +{ + stimer_add(auto_reschedule, NULL, 10); + + pthread_mutex_lock(&subscription_mutex); + subscription_reschedule(); + pthread_mutex_unlock(&subscription_mutex); +} + + + + + +void +subscription_unsubscribe(th_subscription_t *s) +{ + pthread_mutex_lock(&subscription_mutex); + + s->ths_callback(s, NULL, NULL, AV_NOPTS_VALUE); + + LIST_REMOVE(s, ths_global_link); + LIST_REMOVE(s, ths_channel_link); + + if(s->ths_transport != NULL) { + LIST_REMOVE(s, ths_transport_link); + transport_purge(s->ths_transport); + } + + if(s->ths_pkt != NULL) + free(s->ths_pkt); + + free(s->ths_title); + free(s); + + subscription_reschedule(); + + pthread_mutex_unlock(&subscription_mutex); +} + + + + + +static int +subscription_sort(th_subscription_t *a, th_subscription_t *b) +{ + return b->ths_weight - a->ths_weight; +} + + +th_subscription_t * +subscription_create(th_channel_t *ch, void *opaque, + void (*callback)(struct th_subscription *s, + uint8_t *pkt, th_pid_t *pi, int64_t pcr), + unsigned int weight, + const char *name) +{ + th_subscription_t *s; + + pthread_mutex_lock(&subscription_mutex); + + s = malloc(sizeof(th_subscription_t)); + s->ths_pkt = NULL; + s->ths_callback = callback; + s->ths_opaque = opaque; + s->ths_title = strdup(name); + s->ths_total_err = 0; + time(&s->ths_start); + s->ths_weight = weight; + LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort); + + s->ths_channel = ch; + LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link); + + s->ths_transport = NULL; + + subscription_reschedule(); + + if(s->ths_transport == NULL) + syslog(LOG_NOTICE, "No transponder available for subscription \"%s\" " + "to channel \"%s\"", + s->ths_title, s->ths_channel->ch_name); + + pthread_mutex_unlock(&subscription_mutex); + + return s; +} + +void +subscription_set_weight(th_subscription_t *s, unsigned int weight) +{ + if(s->ths_weight == weight) + return; + + pthread_mutex_lock(&subscription_mutex); + + LIST_REMOVE(s, ths_global_link); + s->ths_weight = weight; + LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort); + + subscription_reschedule(); + + pthread_mutex_unlock(&subscription_mutex); +} + + +void +subscriptions_init(void) +{ + stimer_add(auto_reschedule, NULL, 60); +} diff --git a/subscriptions.h b/subscriptions.h new file mode 100644 index 00000000..c90bb2a5 --- /dev/null +++ b/subscriptions.h @@ -0,0 +1,43 @@ +/* + * tvheadend, subscription functions + * Copyright (C) 2007 Andreas Öman + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ + +#ifndef SUBSCRIPTIONS_H +#define SUBSCRIPTIONS_H + +#include + +void subscription_unsubscribe(th_subscription_t *s); + +void subscription_set_weight(th_subscription_t *s, unsigned int weight); + +void subscription_lock(void); + +void subscription_unlock(void); + +typedef void (subscription_callback_t)(struct th_subscription *s, + uint8_t *pkt, th_pid_t *pi, + int64_t pcr); + +th_subscription_t *subscription_create(th_channel_t *ch, void *opaque, + subscription_callback_t *ths_callback, + unsigned int weight, + const char *name); + +void subscriptions_init(void); + +#endif /* SUBSCRIPTIONS_H */ diff --git a/transports.c b/transports.c index 1ec67cd1..d7bd9e07 100644 --- a/transports.c +++ b/transports.c @@ -42,55 +42,14 @@ #include "dvb_dvr.h" #include "teletext.h" #include "transports.h" +#include "subscriptions.h" #include "v4l.h" #include "dvb_dvr.h" #include "iptv_input.h" #include "psi.h" -/* - * transport_mutex protects all operations concerning subscription lists - */ - -static pthread_mutex_t subscription_mutex = PTHREAD_MUTEX_INITIALIZER; - -/* - * - */ void -subscription_lock(void) -{ - pthread_mutex_lock(&subscription_mutex); -} - -void -subscription_unlock(void) -{ - pthread_mutex_unlock(&subscription_mutex); -} - - -/* - * - */ - -static void -subscription_lock_check(const char *file, const int line) -{ - if(pthread_mutex_trylock(&subscription_mutex) == EBUSY) - return; - - fprintf(stderr, "GLW lock not held at %s : %d, crashing\n", - file, line); - abort(); -} - -#define subscription_lock_assert() subscription_lock_check(__FILE__, __LINE__) - - -struct th_subscription_list subscriptions; - -static void transport_purge(th_transport_t *t) { if(LIST_FIRST(&t->tht_subscriptions)) @@ -149,7 +108,7 @@ transport_start(th_transport_t *t, unsigned int weight) -static th_transport_t * +th_transport_t * transport_find(th_channel_t *ch, unsigned int weight) { th_transport_t *t; @@ -176,134 +135,6 @@ transport_find(th_channel_t *ch, unsigned int weight) -static void -subscription_reschedule(void) -{ - th_subscription_t *s; - th_transport_t *t; - - LIST_FOREACH(s, &subscriptions, ths_global_link) { - if(s->ths_transport != NULL) - continue; /* Got a transport, we're happy */ - - t = transport_find(s->ths_channel, s->ths_weight); - - if(t == NULL) - continue; - - s->ths_transport = t; - LIST_INSERT_HEAD(&t->tht_subscriptions, s, ths_transport_link); - } -} - - - - -static void -auto_reschedule(void *aux) -{ - stimer_add(auto_reschedule, NULL, 10); - - pthread_mutex_lock(&subscription_mutex); - subscription_reschedule(); - pthread_mutex_unlock(&subscription_mutex); -} - - - - - -void -subscription_unsubscribe(th_subscription_t *s) -{ - pthread_mutex_lock(&subscription_mutex); - - s->ths_callback(s, NULL, NULL, AV_NOPTS_VALUE); - - LIST_REMOVE(s, ths_global_link); - LIST_REMOVE(s, ths_channel_link); - - if(s->ths_transport != NULL) { - LIST_REMOVE(s, ths_transport_link); - transport_purge(s->ths_transport); - } - - if(s->ths_pkt != NULL) - free(s->ths_pkt); - - free(s->ths_title); - free(s); - - subscription_reschedule(); - - pthread_mutex_unlock(&subscription_mutex); -} - - - - - -static int -subscription_sort(th_subscription_t *a, th_subscription_t *b) -{ - return b->ths_weight - a->ths_weight; -} - - -th_subscription_t * -subscription_create(th_channel_t *ch, void *opaque, - void (*callback)(struct th_subscription *s, - uint8_t *pkt, th_pid_t *pi, int64_t pcr), - unsigned int weight, - const char *name) -{ - th_subscription_t *s; - - pthread_mutex_lock(&subscription_mutex); - - s = malloc(sizeof(th_subscription_t)); - s->ths_pkt = NULL; - s->ths_callback = callback; - s->ths_opaque = opaque; - s->ths_title = strdup(name); - s->ths_total_err = 0; - time(&s->ths_start); - s->ths_weight = weight; - LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort); - - s->ths_channel = ch; - LIST_INSERT_HEAD(&ch->ch_subscriptions, s, ths_channel_link); - - s->ths_transport = NULL; - - subscription_reschedule(); - - if(s->ths_transport == NULL) - syslog(LOG_NOTICE, "No transponder available for subscription \"%s\" " - "to channel \"%s\"", - s->ths_title, s->ths_channel->ch_name); - - pthread_mutex_unlock(&subscription_mutex); - - return s; -} - -void -subscription_set_weight(th_subscription_t *s, unsigned int weight) -{ - if(s->ths_weight == weight) - return; - - pthread_mutex_lock(&subscription_mutex); - - LIST_REMOVE(s, ths_global_link); - s->ths_weight = weight; - LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort); - - subscription_reschedule(); - - pthread_mutex_unlock(&subscription_mutex); -} /* @@ -315,8 +146,6 @@ transport_flush_subscribers(th_transport_t *t) { th_subscription_t *s; - subscription_lock_assert(); - while((s = LIST_FIRST(&t->tht_subscriptions)) != NULL) { LIST_REMOVE(s, ths_transport_link); s->ths_transport = NULL; @@ -331,8 +160,6 @@ transport_compute_weight(struct th_transport_list *head) th_subscription_t *s; int w = 0; - subscription_lock_assert(); - LIST_FOREACH(t, head, tht_adapter_link) { LIST_FOREACH(s, &t->tht_subscriptions, ths_transport_link) { if(s->ths_weight > w) @@ -428,12 +255,12 @@ transport_recv_tsb(th_transport_t *t, int pid, uint8_t *tsb, int scanpcr, break; default: - pthread_mutex_lock(&subscription_mutex); + subscription_lock(); LIST_FOREACH(s, &t->tht_subscriptions, ths_transport_link) { s->ths_total_err += err; s->ths_callback(s, tsb, pi, pcr); } - pthread_mutex_unlock(&subscription_mutex); + subscription_unlock(); break; } } @@ -515,14 +342,6 @@ transport_monitor_init(th_transport_t *t) stimer_add(transport_monitor, t, 5); } - -void -transport_scheduler_init(void) -{ - stimer_add(auto_reschedule, NULL, 60); -} - - th_pid_t * transport_add_pid(th_transport_t *t, uint16_t pid, tv_streamtype_t type) { diff --git a/transports.h b/transports.h index 198681f3..afc440f4 100644 --- a/transports.h +++ b/transports.h @@ -1,5 +1,5 @@ /* - * tvheadend, transport and subscription functions + * tvheadend, transport functions * Copyright (C) 2007 Andreas Öman * * This program is free software: you can redistribute it and/or modify @@ -21,12 +21,6 @@ #include -void subscription_unsubscribe(th_subscription_t *s); -void subscription_set_weight(th_subscription_t *s, unsigned int weight); - -void subscription_lock(void); -void subscription_unlock(void); - unsigned int transport_compute_weight(struct th_transport_list *head); void transport_flush_subscribers(th_transport_t *t); @@ -40,18 +34,12 @@ th_pid_t *transport_add_pid(th_transport_t *t, uint16_t pid, tv_streamtype_t type); int transport_set_channel(th_transport_t *th, th_channel_t *ch); + void transport_link(th_transport_t *t, th_channel_t *ch); -void transport_scheduler_init(void); +th_transport_t *transport_find(th_channel_t *ch, unsigned int weight); -typedef void (subscription_callback_t)(struct th_subscription *s, - uint8_t *pkt, th_pid_t *pi, - int64_t pcr); - -th_subscription_t *subscription_create(th_channel_t *ch, void *opaque, - subscription_callback_t *ths_callback, - unsigned int weight, - const char *name); +void transport_purge(th_transport_t *t); #endif /* TRANSPORTS_H */