From ab4d284b9c882f9813c40a1033174f06d4eb17fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andreas=20=C3=96man?= Date: Sun, 18 May 2008 10:18:05 +0000 Subject: [PATCH] Make streaming over HTSP work --- Makefile | 2 +- htsclient.c | 2 +- htsp.c | 84 +++++++++++++++++++++++++++-- htsp.h | 3 ++ htsp_muxer.c | 139 +++++++++++++++++++++++++++++++++++------------- htsp_muxer.h | 5 +- iptv_output.c | 2 +- pvr.c | 2 +- rpc.c | 110 +++++++++----------------------------- rtsp.c | 2 +- subscriptions.c | 3 +- subscriptions.h | 3 +- xbmsp.c | 2 +- 13 files changed, 223 insertions(+), 136 deletions(-) diff --git a/Makefile b/Makefile index f37371e8..ace2b3fe 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ SRCS = main.c dispatch.c channels.c transports.c teletext.c psi.c \ SRCS += http.c -SRCS += htsp.c rpc.c +SRCS += htsp.c htsp_muxer.c rpc.c SRCS += pvr.c autorec.c ffmuxer.c diff --git a/htsclient.c b/htsclient.c index c95c6d1f..7aaf7d2e 100644 --- a/htsclient.c +++ b/htsclient.c @@ -230,7 +230,7 @@ cr_channel_subscribe(client_t *c, char **argv, int argc) snprintf(tmp, sizeof(tmp), "HTS Client @ %s", inet_ntoa(si->sin_addr)); - s = subscription_create(ch, weight, tmp, client_subscription_callback, c); + s = subscription_create(ch, weight, tmp, client_subscription_callback, c, 0); if(s == NULL) return 1; diff --git a/htsp.c b/htsp.c index b49320c1..ec85fcaa 100644 --- a/htsp.c +++ b/htsp.c @@ -38,6 +38,7 @@ #include "htsp_muxer.h" #include "tcp.h" #include "epg.h" +#include "access.h" #include @@ -140,6 +141,75 @@ htsp_async_channel_update(channel_t *ch) } +/** + * Subscribe to channel + */ +static htsmsg_t * +htsp_subscribe(rpc_session_t *ses, htsmsg_t *in, void *opaque) +{ + htsp_t *htsp = opaque; + channel_t *ch; + const char *txt; + th_subscription_t *s; + htsmsg_t *r; + uint32_t tag; + + if((txt = htsmsg_get_str(in, "channel")) == NULL) + return rpc_error(ses, "missing argument: channel"); + + if((ch = channel_find(txt, 0, NULL)) == NULL) + return rpc_error(ses, "Channel not found"); + + LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link) { + if(s->ths_channel == ch) { + subscription_set_weight(s, 200); + return rpc_ok(ses); + } + } + + + tag = tag_get(); + + r = htsmsg_create(); + htsmsg_add_u32(r, "seq", ses->rs_seq); + htsmsg_add_u32(r, "id", tag); + htsp_send_msg(htsp, r, 0); + + htsp_muxer_subscribe(htsp, ch, 200, tag); + + return NULL; +} + + +/** + * Unsubscribe from channel + */ +static htsmsg_t * +htsp_unsubscribe(rpc_session_t *ses, htsmsg_t *in, void *opaque) +{ + htsp_t *htsp = opaque; + uint32_t id; + + if(htsmsg_get_u32(in, "id", &id)) + return rpc_error(ses, "missing argument: id"); + + htsp_muxer_unsubscribe(htsp, id); + + return rpc_ok(ses); +} + + + +/** + * HTSP specific methods + */ +static rpc_cmd_t htsp_rpc[] = { + { "subscribe", htsp_subscribe, ACCESS_STREAMING }, + { "unsubscribe", htsp_unsubscribe, ACCESS_STREAMING }, + { NULL, NULL, 0 }, +}; + + /* * */ @@ -165,15 +235,16 @@ htsp_input(htsp_t *htsp, const void *buf, int len) printf("INPUT:\n"); htsmsg_print(in); - out = rpc_dispatch(&htsp->htsp_rpc, in, NULL, htsp, + out = rpc_dispatch(&htsp->htsp_rpc, in, htsp_rpc, htsp, (struct sockaddr *)&htsp->htsp_tcp_session.tcp_peer_addr); htsmsg_destroy(in); - printf("OUTPUT:\n"); - htsmsg_print(out); - - htsp_send_msg(htsp, out, 0); + if(out != NULL) { + printf("OUTPUT:\n"); + htsmsg_print(out); + htsp_send_msg(htsp, out, 0); + } if(!was_async && htsp->htsp_rpc.rs_is_async) { printf("Session went into async mode\n"); @@ -244,6 +315,8 @@ htsp_data_input(htsp_t *htsp) static void htsp_disconnect(htsp_t *htsp) { + htsp_muxer_cleanup(htsp); + LIST_REMOVE(htsp, htsp_global_link); free(htsp->htsp_buf); @@ -279,6 +352,7 @@ htsp_tcp_callback(tcpevent_t event, void *tcpsession) break; case TCP_DISCONNECT: + htsp->htsp_zombie = 1; htsp_disconnect(htsp); break; diff --git a/htsp.h b/htsp.h index fb946d3e..03244afa 100644 --- a/htsp.h +++ b/htsp.h @@ -37,6 +37,9 @@ typedef struct htsp { rpc_session_t htsp_rpc; int htsp_async_init_sent; + int htsp_zombie; + + struct th_subscription_list htsp_subscriptions; } htsp_t; diff --git a/htsp_muxer.c b/htsp_muxer.c index 34882910..a2841112 100644 --- a/htsp_muxer.c +++ b/htsp_muxer.c @@ -28,89 +28,156 @@ #include #include #include -#include #include "tvhead.h" #include "channels.h" #include "subscriptions.h" #include "dispatch.h" -#include "epg.h" -#include "pvr.h" +#include "htsp.h" #include "htsp_muxer.h" +#include "mux.h" +static void +htsp_packet_input(void *opaque, th_muxstream_t *tms, th_pkt_t *pkt) +{ + htsp_t *htsp = opaque; + htsmsg_t *m = htsmsg_create(); + th_muxer_t *tm = tms->tms_muxer; + // th_stream_t *st = tms->tms_stream; + th_subscription_t *s = tm->tm_subscription; + /* + * Build a message for this frame + */ -/* + htsmsg_add_str(m, "method", "muxpkt"); + htsmsg_add_u32(m, "id", s->ths_u32); + + htsmsg_add_u64(m, "stream", tms->tms_index); + htsmsg_add_u64(m, "dts", pkt->pkt_dts); + htsmsg_add_u64(m, "pts", pkt->pkt_pts); + htsmsg_add_u32(m, "duration", pkt->pkt_duration); + htsmsg_add_u32(m, "com", pkt->pkt_commercial); + + /** + * Since we will serialize directly we use 'binptr' which is a binary + * object that just points to data, thus avoiding a copy. + */ + htsmsg_add_binptr(m, "payload", pkt->pkt_payload, pkt->pkt_payloadlen); + + htsp_send_msg(htsp, m, 1); +} + +/** * Called when a subscription gets/loses access to a transport */ static void -client_subscription_callback(struct th_subscription *s, - subscription_event_t event, void *opaque) +htsp_subscription_callback(struct th_subscription *s, + subscription_event_t event, void *opaque) { - htsp_connection_t *hc = opaque; + htsp_t *htsp = opaque; + th_muxer_t *tm; + th_muxstream_t *tms; + int index = 0; + htsmsg_t *m, *sub; + th_stream_t *st; switch(event) { case TRANSPORT_AVAILABLE: + tm = muxer_init(s, htsp_packet_input, htsp); + + m = htsmsg_create(); + + htsmsg_add_str(m, "method", "subscription_start"); + htsmsg_add_u32(m, "id", s->ths_u32); + + LIST_FOREACH(tms, &tm->tm_streams, tms_muxer_link0) { + tms->tms_index = index++; + + st = tms->tms_stream; + + sub = htsmsg_create(); + htsmsg_add_u32(sub, "index", tms->tms_index); + htsmsg_add_str(sub, "type", htstvstreamtype2txt(st->st_type)); + htsmsg_add_str(sub, "language", st->st_lang); + + htsmsg_add_msg(m, "stream", sub); + } + htsmsg_print(m); + htsp_send_msg(htsp, m, 0); + + muxer_play(tm, AV_NOPTS_VALUE); break; case TRANSPORT_UNAVAILABLE: + if(htsp->htsp_zombie == 0) { + m = htsmsg_create(); + htsmsg_add_str(m, "method", "subscription_stop"); + htsmsg_add_u32(m, "id", s->ths_u32); + + htsmsg_add_str(m, "reason", "unknown"); + htsp_send_msg(htsp, m, 0); + } + + muxer_deinit(s->ths_muxer, s); break; } } -/* + +/** * */ void -htsp_muxer_subscribe(htsp_connection_t *hc, th_channel_t *ch, int weight) +htsp_muxer_subscribe(htsp_t *htsp, channel_t *ch, int weight, uint32_t tag) { th_subscription_t *s; - th_muxer_t *m; - LIST_FOREACH(s, &hc->hc_subscriptions, ths_subscriber_link) { - if(s->ths_channel == ch) { - subscription_set_weight(s, weight); - return; - } - } - - m = calloc(1, sizeof(th_muxer_t)); - - LIST_INSERT_HEAD(&hc->hc_subscriptions, s, ths_subscriber_link); + s = subscription_create(ch, weight, "HTSP", htsp_subscription_callback, htsp, + tag); + LIST_INSERT_HEAD(&htsp->htsp_subscriptions, s, ths_subscriber_link); } -/* + +/** + * + */ +static void +htsp_subscription_destroy(th_subscription_t *s) +{ + LIST_REMOVE(s, ths_subscriber_link); + subscription_unsubscribe(s); +} + + +/** * */ void -htsp_muxer_unsubscribe(htsp_connection_t *hc, th_channel_t *ch) +htsp_muxer_unsubscribe(htsp_t *htsp, uint32_t id) { th_subscription_t *s; - htsp_muxerer_t *htsp; - LIST_FOREACH(s, &hc->hc_subscriptions, ths_subscriber_link) { - if(s->ths_channel == ch) + LIST_FOREACH(s, &htsp->htsp_subscriptions, ths_subscriber_link) { + if(s->ths_u32 == id) break; } - if(s != NULL) { - LIST_REMOVE(s, ths_subscriber_link); - free(s->hs_opaque); - subscription_unsubscribe(s); - } + if(s != NULL) + htsp_subscription_destroy(s); } -/* + +/** * */ void -htsp_muxer_cleanup(htsp_connection_t *hc) +htsp_muxer_cleanup(htsp_t *htsp) { th_subscription_t *s; - while((s = LIST_FIRST(&hc->hc_subscriptions)) != NULL) { - LIST_REMOVE(s, ths_subscriber_link); - subscription_unsubscribe(s); - } + while((s = LIST_FIRST(&htsp->htsp_subscriptions)) != NULL) + htsp_subscription_destroy(s); } + diff --git a/htsp_muxer.h b/htsp_muxer.h index e5a3adcf..0cbf0844 100644 --- a/htsp_muxer.h +++ b/htsp_muxer.h @@ -19,9 +19,10 @@ #ifndef HTSP_MUXER_H_ #define HTSP_MUXER_H_ -void htsp_muxer_subscribe(htsp_t *htsp, channel_t *ch, int weight); +void htsp_muxer_subscribe(htsp_t *htsp, channel_t *ch, + int weight, uint32_t tag); -void htsp_muxer_unsubscribe(htsp_t *htsp, channel_t *ch); +void htsp_muxer_unsubscribe(htsp_t *htsp, uint32_t id); void htsp_muxer_cleanup(htsp_t *htsp); diff --git a/iptv_output.c b/iptv_output.c index 21496af9..230bf4cf 100644 --- a/iptv_output.c +++ b/iptv_output.c @@ -187,7 +187,7 @@ output_multicast_load(struct config_head *head) snprintf(title2, sizeof(title2), "IPTV-OUT: %s", title); - subscription_create(ch, 900, title2, iptv_subscription_callback, om); + subscription_create(ch, 900, title2, iptv_subscription_callback, om, 0); return; err: diff --git a/pvr.c b/pvr.c index 29d8877f..a0dd501f 100644 --- a/pvr.c +++ b/pvr.c @@ -605,7 +605,7 @@ pvr_fsm(pvr_rec_t *pvrr) pvrr->pvrr_s = subscription_create(pvrr->pvrr_channel, 1000, "pvr", pvr_subscription_callback, - pvrr); + pvrr, 0); pvrr->pvrr_error = HTSTV_PVR_STATUS_DONE; /* assume everything will work out ok */ break; diff --git a/rpc.c b/rpc.c index 27c19647..3b5b8a6d 100644 --- a/rpc.c +++ b/rpc.c @@ -226,11 +226,11 @@ rpc_event_info(rpc_session_t *ses, htsmsg_t *in, void *opaque) #if 0 -/* +/** * record method */ -static void -htsp_record(htsp_connection_t *hc, htsp_msg_t *m) +static htsmsg_t * +rpc_record(rpc_session_t *ses, htsmsg_t *in, void *opaque) { htsp_msg_t *r; uint32_t u32; @@ -277,103 +277,39 @@ htsp_record(htsp_connection_t *hc, htsp_msg_t *m) } #endif -#if 0 - -/* - * subscribe +/** + * Common methods and their required auth level */ -static void -htsp_subscribe(htsp_connection_t *hc, htsp_msg_t *m) -{ - htsp_msg_t *r; - const char *str, *errtxt = NULL; - th_channel_t *ch; - int weight; - - r = htsp_create_msg(); - htsp_add_u32(r, "seq", hc->hc_seq); - - if((str = htsp_get_string(m, "channel")) != NULL) { - if((ch = channel_find(str, 0)) != NULL) { - - weight = htsp_get_u32(m, "weight") ?: 100; - if(weight > hc->hc_maxweight) - weight = hc->hc_maxweight; - - htsp_stream_subscribe(hc, ch, weight); - } else { - errtxt = "Channel not found"; - } - } else { - errtxt = "Missing 'channel' field"; - } - - htsp_send_msg(hc, NULL, r); - htsp_free_msg(r); -} - -/* - * unsubscribe - */ -static void -htsp_subscribe(htsp_connection_t *hc, htsp_msg_t *m) -{ - htsp_msg_t *r; - const char *str, *errtxt = NULL; - th_channel_t *ch; - - r = htsp_create_msg(); - htsp_add_u32(r, "seq", hc->hc_seq); - - if((str = htsp_get_string(m, "channel")) != NULL) { - if((ch = channel_find(str, 0)) != NULL) { - htsp_stream_unsubscribe(hc, ch); - } else { - errtxt = "Channel not found"; - } - } else { - errtxt = "Missing 'channel' field"; - } - - htsp_send_msg(hc, NULL, r); - htsp_free_msg(r); -} -#endif - -/* - * List of all methods and their required auth level - */ - static rpc_cmd_t common_rpc[] = { { "auth", rpc_auth, 0 }, { "async", rpc_async, ACCESS_STREAMING }, { "channelsList", rpc_channels_list, ACCESS_STREAMING }, { "eventInfo", rpc_event_info, ACCESS_STREAMING }, -#if 0 - { "subscribe", rpc_subscribe, 1 }, - { "unsubscribe", rpc_unsubscribe, 1 }, -#endif - // { "record", rpc_record, 2 }, { NULL, NULL, 0 }, }; -/* +/** * */ -static htsmsg_t * +static int rpc_dispatch_set(rpc_session_t *ses, htsmsg_t *in, rpc_cmd_t *cmds, - void *opaque, const char *method, struct sockaddr *peer) + void *opaque, const char *method, struct sockaddr *peer, + htsmsg_t **outp) { for(; cmds->rc_name; cmds++) { if(strcmp(cmds->rc_name, method)) continue; if(access_verify(ses->rs_username, ses->rs_password, peer, - cmds->rc_privmask)) - return rpc_unauthorized(ses); - return cmds->rc_func(ses, in, opaque); + cmds->rc_privmask)) { + *outp = rpc_unauthorized(ses); + return 0; + } + + *outp = cmds->rc_func(ses, in, opaque); + return 0; } - return NULL; + return -1; } @@ -386,6 +322,7 @@ rpc_dispatch(rpc_session_t *ses, htsmsg_t *in, rpc_cmd_t *cmds, void *opaque, { const char *method; htsmsg_t *out; + int r; if(htsmsg_get_u32(in, "seq", &ses->rs_seq)) ses->rs_seq = 0; @@ -393,11 +330,14 @@ rpc_dispatch(rpc_session_t *ses, htsmsg_t *in, rpc_cmd_t *cmds, void *opaque, if((method = htsmsg_get_str(in, "method")) == NULL) return rpc_error(ses, "Method not specified"); - out = rpc_dispatch_set(ses, in, common_rpc, opaque, method, peer); - if(out == NULL && cmds != NULL) - out = rpc_dispatch_set(ses, in, cmds, opaque, method, peer); + r = rpc_dispatch_set(ses, in, common_rpc, opaque, method, peer, &out); + if(r == -1 && cmds != NULL) + r = rpc_dispatch_set(ses, in, cmds, opaque, method, peer, &out); - return out ?: rpc_error(ses, "Method not found"); + if(r == -1) + return rpc_error(ses, "Method not found"); + + return out; } diff --git a/rtsp.c b/rtsp.c index b3ae0ec3..1715491f 100644 --- a/rtsp.c +++ b/rtsp.c @@ -193,7 +193,7 @@ rtsp_session_create(channel_t *ch, struct sockaddr_in *dst) LIST_INSERT_HEAD(&rtsp_sessions, rs, rs_global_link); rs->rs_s = subscription_create(ch, 600, "RTSP", - rtsp_subscription_callback, rs); + rtsp_subscription_callback, rs, 0); /* Initialize RTP */ diff --git a/subscriptions.c b/subscriptions.c index 6a166be7..4ac3babf 100644 --- a/subscriptions.c +++ b/subscriptions.c @@ -127,7 +127,7 @@ subscription_sort(th_subscription_t *a, th_subscription_t *b) th_subscription_t * subscription_create(channel_t *ch, unsigned int weight, const char *name, - subscription_callback_t *cb, void *opaque) + subscription_callback_t *cb, void *opaque, uint32_t u32) { th_subscription_t *s; @@ -137,6 +137,7 @@ subscription_create(channel_t *ch, unsigned int weight, const char *name, s->ths_title = strdup(name); s->ths_total_err = 0; s->ths_weight = weight; + s->ths_u32 = u32; time(&s->ths_start); LIST_INSERT_SORTED(&subscriptions, s, ths_global_link, subscription_sort); diff --git a/subscriptions.h b/subscriptions.h index e34e4451..ecb22488 100644 --- a/subscriptions.h +++ b/subscriptions.h @@ -26,7 +26,8 @@ void subscription_set_weight(th_subscription_t *s, unsigned int weight); th_subscription_t *subscription_create(channel_t *ch, unsigned int weight, const char *name, subscription_callback_t *cb, - void *opaque); + void *opaque, + uint32_t u32); void subscriptions_init(void); diff --git a/xbmsp.c b/xbmsp.c index ada7f811..e45da1d4 100644 --- a/xbmsp.c +++ b/xbmsp.c @@ -814,7 +814,7 @@ xbmsp_input_file_open(xbmsp_t *xbmsp, uint32_t msgid, uint8_t *buf, int len) xs->xs_xbmsp = xbmsp; xs->xs_subscription = subscription_create(ch, 100, xbmsp->xbmsp_logname, - xbmsp_subscription_callback, xs); + xbmsp_subscription_callback, xs, 0); LIST_INSERT_HEAD(&xbmsp->xbmsp_subscriptions, xs, xs_link); xbmsp_send_handle(xbmsp, msgid, xs->xs_handle);