From f98f580ee003b2e4d4570ade214edb0d0368fbaa Mon Sep 17 00:00:00 2001 From: Jaroslav Kysela Date: Thu, 11 Dec 2014 20:29:51 +0100 Subject: [PATCH] SAT>IP: Move complete I/O to the separate thread --- src/input/mpegts/satip/satip_frontend.c | 687 ++++++++++++++---------- src/input/mpegts/satip/satip_private.h | 29 +- 2 files changed, 417 insertions(+), 299 deletions(-) diff --git a/src/input/mpegts/satip/satip_frontend.c b/src/input/mpegts/satip/satip_frontend.c index d10480b8..d1547131 100644 --- a/src/input/mpegts/satip/satip_frontend.c +++ b/src/input/mpegts/satip/satip_frontend.c @@ -29,10 +29,6 @@ #include #endif -static int -satip_frontend_tune1 - ( satip_frontend_t *lfe, mpegts_mux_instance_t *mmi ); - /* * */ @@ -56,6 +52,43 @@ satip_frontend_bindaddr( satip_frontend_t *lfe ) return bindaddr; } +static void +satip_frontend_signal_cb( void *aux ) +{ + satip_frontend_t *lfe = aux; + mpegts_mux_instance_t *mmi = LIST_FIRST(&lfe->mi_mux_active); + streaming_message_t sm; + signal_status_t sigstat; + service_t *svc; + + if (mmi == NULL) + return; + if (!lfe->sf_tables) { + psi_tables_default(mmi->mmi_mux); + psi_tables_dvb(mmi->mmi_mux); + lfe->sf_tables = 1; + } + sigstat.status_text = signal2str(lfe->sf_status); + sigstat.snr = mmi->mmi_stats.snr; + sigstat.signal = mmi->mmi_stats.signal; + sigstat.ber = mmi->mmi_stats.ber; + sigstat.unc = mmi->mmi_stats.unc; + sigstat.signal_scale = mmi->mmi_stats.signal_scale; + sigstat.snr_scale = mmi->mmi_stats.snr_scale; + sigstat.ec_bit = mmi->mmi_stats.ec_bit; + sigstat.tc_bit = mmi->mmi_stats.tc_bit; + sigstat.ec_block = mmi->mmi_stats.ec_block; + sigstat.tc_block = mmi->mmi_stats.tc_block; + sm.sm_type = SMT_SIGNAL_STATUS; + sm.sm_data = &sigstat; + LIST_FOREACH(svc, &lfe->mi_transports, s_active_link) { + pthread_mutex_lock(&svc->s_stream_mutex); + streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm)); + pthread_mutex_unlock(&svc->s_stream_mutex); + } + gtimer_arm_ms(&lfe->sf_monitor_timer, satip_frontend_signal_cb, lfe, 250); +} + /* ************************************************************************** * Class definition * *************************************************************************/ @@ -346,9 +379,9 @@ satip_frontend_match_satcfg ( satip_frontend_t *lfe2, mpegts_mux_t *mm2 ) dvb_mux_conf_t *mc1, *mc2; int position, high1, high2; - if (lfe2->sf_mmi == NULL) + if (lfe2->sf_req == NULL || lfe2->sf_req->sf_mmi == NULL) return 0; - mm1 = lfe2->sf_mmi->mmi_mux; + mm1 = lfe2->sf_req->sf_mmi->mmi_mux; position = satip_satconf_get_position(lfe2, mm2); if (position <= 0 || lfe2->sf_position != position) return 0; @@ -391,7 +424,7 @@ satip_frontend_is_enabled ( mpegts_input_t *mi, mpegts_mux_t *mm, int flags ) return satip_frontend_match_satcfg(lfe2, mm); } if (lfe2->sf_master == lfe->sf_number && lfe2->sf_running) { - if (lfe2->sf_mmi == NULL) + if (lfe2->sf_req == NULL || lfe2->sf_req->sf_mmi == NULL) return 0; return satip_frontend_match_satcfg(lfe2, mm); } @@ -404,6 +437,7 @@ satip_frontend_stop_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) { satip_frontend_t *lfe = (satip_frontend_t*)mi; + satip_tune_req_t *tr; char buf1[256], buf2[256]; mi->mi_display_name(mi, buf1, sizeof(buf1)); @@ -412,25 +446,19 @@ satip_frontend_stop_mux gtimer_disarm(&lfe->sf_monitor_timer); - /* Stop thread */ - lfe->sf_shutdown = 1; - if (lfe->sf_dvr_pipe.wr > 0) { - tvh_write(lfe->sf_dvr_pipe.wr, "", 1); - tvhtrace("satip", "%s - waiting for dvr thread", buf1); - pthread_join(lfe->sf_dvr_thread, NULL); - tvh_pipe_close(&lfe->sf_dvr_pipe); - tvhdebug("satip", "%s - stopped dvr thread", buf1); + /* Stop tune */ + tvh_write(lfe->sf_dvr_pipe.wr, "", 1); + + pthread_mutex_lock(&lfe->sf_dvr_lock); + tr = lfe->sf_req; + if (tr && tr != lfe->sf_req_thread) { + free(tr->sf_pids_tuned); + free(tr->sf_pids); + free(tr); } - lfe->sf_shutdown = 0; - lfe->sf_running = 0; - lfe->sf_mmi = NULL; - - udp_close(lfe->sf_rtp); lfe->sf_rtp = NULL; - udp_close(lfe->sf_rtcp); lfe->sf_rtcp = NULL; - - free(lfe->sf_pids); lfe->sf_pids = NULL; - free(lfe->sf_pids_tuned); lfe->sf_pids_tuned = NULL; + lfe->sf_req = NULL; + pthread_mutex_unlock(&lfe->sf_dvr_lock); } static int @@ -438,33 +466,62 @@ satip_frontend_start_mux ( mpegts_input_t *mi, mpegts_mux_instance_t *mmi ) { satip_frontend_t *lfe = (satip_frontend_t*)mi; + satip_tune_req_t *tr; + char buf1[256], buf2[256]; + if (lfe->sf_positions > 0) { lfe->sf_position = satip_satconf_get_position(lfe, mmi->mmi_mux); if (lfe->sf_position <= 0) return SM_CODE_TUNING_FAILED; } - return satip_frontend_tune1((satip_frontend_t*)mi, mmi); + + lfe->mi_display_name((mpegts_input_t*)lfe, buf1, sizeof(buf1)); + mpegts_mux_nice_name(mmi->mmi_mux, buf2, sizeof(buf2)); + tvhdebug("satip", "%s - starting %s", buf1, buf2); + + tr = calloc(1, sizeof(*tr)); + tr->sf_mmi = mmi; + + tr->sf_pids_size = 512; + tr->sf_pids = calloc(tr->sf_pids_size, sizeof(uint16_t)); + tr->sf_pids_tuned = calloc(tr->sf_pids_size, sizeof(uint16_t)); + + pthread_mutex_lock(&lfe->sf_dvr_lock); + lfe->sf_req = tr; + lfe->sf_running = 1; + lfe->sf_tables = 0; + lfe->sf_status = SIGNAL_NONE; + pthread_mutex_unlock(&lfe->sf_dvr_lock); + + /* notify thread that we are ready */ + tvh_write(lfe->sf_dvr_pipe.wr, "s", 1); + + gtimer_arm_ms(&lfe->sf_monitor_timer, satip_frontend_signal_cb, lfe, 50); + + return 0; } static int satip_frontend_add_pid( satip_frontend_t *lfe, int pid) { + satip_tune_req_t *tr; int mid, div; if (pid < 0 || pid >= 8191) return 0; pthread_mutex_lock(&lfe->sf_dvr_lock); - if (lfe->sf_pids_count >= lfe->sf_pids_size) { - lfe->sf_pids_size += 64; - lfe->sf_pids = realloc(lfe->sf_pids, - lfe->sf_pids_size * sizeof(uint16_t)); - lfe->sf_pids_tuned = realloc(lfe->sf_pids_tuned, - lfe->sf_pids_size * sizeof(uint16_t)); + tr = lfe->sf_req; + if (tr->sf_pids_count >= tr->sf_pids_size) { + tr->sf_pids_size += 64; + tr->sf_pids = realloc(tr->sf_pids, + tr->sf_pids_size * sizeof(uint16_t)); + tr->sf_pids_tuned = realloc(tr->sf_pids_tuned, + tr->sf_pids_size * sizeof(uint16_t)); } - if (lfe->sf_pids_count == 0) { - lfe->sf_pids[lfe->sf_pids_count++] = pid; + if (tr->sf_pids_count == 0) { + tr->sf_pids[tr->sf_pids_count++] = pid; pthread_mutex_unlock(&lfe->sf_dvr_lock); return 1; } @@ -473,48 +530,48 @@ satip_frontend_add_pid( satip_frontend_t *lfe, int pid) printf("Insert PID: %i\n", pid); if (pid == 0) printf("HERE!!!\n"); - { int i; for (i = 0; i < lfe->sf_pids_count; i++) - printf("Bpid[%i] = %i\n", i, lfe->sf_pids[i]); } + { int i; for (i = 0; i < tr->sf_pids_count; i++) + printf("Bpid[%i] = %i\n", i, tr->sf_pids[i]); } #endif /* insert pid to the sorted array */ - mid = div = lfe->sf_pids_count / 2; + mid = div = tr->sf_pids_count / 2; while (1) { - assert(mid >= 0 && mid < lfe->sf_pids_count); + assert(mid >= 0 && mid < tr->sf_pids_count); if (div > 1) div /= 2; - if (lfe->sf_pids[mid] == pid) { + if (tr->sf_pids[mid] == pid) { pthread_mutex_unlock(&lfe->sf_dvr_lock); return 0; } - if (lfe->sf_pids[mid] < pid) { - if (mid + 1 >= lfe->sf_pids_count) { - lfe->sf_pids[lfe->sf_pids_count++] = pid; + if (tr->sf_pids[mid] < pid) { + if (mid + 1 >= tr->sf_pids_count) { + tr->sf_pids[tr->sf_pids_count++] = pid; break; } - if (lfe->sf_pids[mid + 1] > pid) { + if (tr->sf_pids[mid + 1] > pid) { mid++; - if (mid < lfe->sf_pids_count) - memmove(&lfe->sf_pids[mid + 1], &lfe->sf_pids[mid], - (lfe->sf_pids_count - mid) * sizeof(uint16_t)); - lfe->sf_pids[mid] = pid; - lfe->sf_pids_count++; + if (mid < tr->sf_pids_count) + memmove(&tr->sf_pids[mid + 1], &tr->sf_pids[mid], + (tr->sf_pids_count - mid) * sizeof(uint16_t)); + tr->sf_pids[mid] = pid; + tr->sf_pids_count++; break; } mid += div; } else { - if (mid == 0 || lfe->sf_pids[mid - 1] < pid) { - memmove(&lfe->sf_pids[mid+1], &lfe->sf_pids[mid], - (lfe->sf_pids_count - mid) * sizeof(uint16_t)); - lfe->sf_pids[mid] = pid; - lfe->sf_pids_count++; + if (mid == 0 || tr->sf_pids[mid - 1] < pid) { + memmove(&tr->sf_pids[mid+1], &tr->sf_pids[mid], + (tr->sf_pids_count - mid) * sizeof(uint16_t)); + tr->sf_pids[mid] = pid; + tr->sf_pids_count++; break; } mid -= div; } } #if 0 - { int i; for (i = 0; i < lfe->sf_pids_count; i++) - printf("Apid[%i] = %i\n", i, lfe->sf_pids[i]); } + { int i; for (i = 0; i < tr->sf_pids_count; i++) + printf("Apid[%i] = %i\n", i, tr->sf_pids[i]); } #endif pthread_mutex_unlock(&lfe->sf_dvr_lock); return 1; @@ -525,6 +582,7 @@ satip_frontend_open_pid ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner ) { satip_frontend_t *lfe = (satip_frontend_t*)mi; + satip_tune_req_t *tr; mpegts_pid_t *mp; int change = 0; @@ -532,9 +590,11 @@ satip_frontend_open_pid return NULL; if (pid == MPEGTS_FULLMUX_PID) { + pthread_mutex_lock(&lfe->sf_dvr_lock); + tr = lfe->sf_req; if (lfe->sf_device->sd_fullmux_ok) { - if (!lfe->sf_pids_any) - lfe->sf_pids_any = change = 1; + if (!tr->sf_pids_any) + tr->sf_pids_any = change = 1; } else { mpegts_service_t *s; elementary_stream_t *st; @@ -545,14 +605,16 @@ satip_frontend_open_pid change |= satip_frontend_add_pid(lfe, st->es_pid); } } + pthread_mutex_unlock(&lfe->sf_dvr_lock); } else { change |= satip_frontend_add_pid(lfe, mp->mp_pid); } if (change) { pthread_mutex_lock(&lfe->sf_dvr_lock); - if (!lfe->sf_pids_any_tuned || - lfe->sf_pids_any != lfe->sf_pids_any_tuned) + tr = lfe->sf_req; + if (!tr->sf_pids_any_tuned || + tr->sf_pids_any != tr->sf_pids_any_tuned) tvh_write(lfe->sf_dvr_pipe.wr, "c", 1); pthread_mutex_unlock(&lfe->sf_dvr_lock); } @@ -565,43 +627,48 @@ satip_frontend_close_pid ( mpegts_input_t *mi, mpegts_mux_t *mm, int pid, int type, void *owner ) { satip_frontend_t *lfe = (satip_frontend_t*)mi; + satip_tune_req_t *tr; int change = 0; int mid, div, cnt; /* remove PID */ if (pid == MPEGTS_FULLMUX_PID) { + pthread_mutex_lock(&lfe->sf_dvr_lock); + tr = lfe->sf_req; if (lfe->sf_device->sd_fullmux_ok) { - if (lfe->sf_pids_any) { - lfe->sf_pids_any = 0; + if (tr->sf_pids_any) { + tr->sf_pids_any = 0; change = 1; } } + pthread_mutex_unlock(&lfe->sf_dvr_lock); goto finish; } pthread_mutex_lock(&lfe->sf_dvr_lock); - if (lfe->sf_pids) { - mid = div = (cnt = lfe->sf_pids_count) / 2; + tr = lfe->sf_req; + if (tr->sf_pids) { + mid = div = (cnt = tr->sf_pids_count) / 2; while (cnt > 0) { if (div > 1) div /= 2; - if (lfe->sf_pids[mid] == pid) { - if (mid + 1 < lfe->sf_pids_count) - memmove(&lfe->sf_pids[mid], &lfe->sf_pids[mid+1], - (lfe->sf_pids_count - mid - 1) * sizeof(uint16_t)); - lfe->sf_pids_count--; + if (tr->sf_pids[mid] == pid) { + if (mid + 1 < tr->sf_pids_count) + memmove(&tr->sf_pids[mid], &tr->sf_pids[mid+1], + (tr->sf_pids_count - mid - 1) * sizeof(uint16_t)); + tr->sf_pids_count--; change = 1; break; - } else if (lfe->sf_pids[mid] < pid) { - if (mid + 1 > lfe->sf_pids_count) + } else if (tr->sf_pids[mid] < pid) { + if (mid + 1 > tr->sf_pids_count) break; - if (lfe->sf_pids[mid + 1] > pid) + if (tr->sf_pids[mid + 1] > pid) break; mid += div; } else { if (mid == 0) break; - if (lfe->sf_pids[mid - 1] < pid) + if (tr->sf_pids[mid - 1] < pid) break; mid -= div; } @@ -615,8 +682,8 @@ finish: if (change) { pthread_mutex_lock(&lfe->sf_dvr_lock); - if (!lfe->sf_pids_any_tuned || - lfe->sf_pids_any != lfe->sf_pids_any_tuned) + if (!tr->sf_pids_any_tuned || + tr->sf_pids_any != tr->sf_pids_any_tuned) tvh_write(lfe->sf_dvr_pipe.wr, "c", 1); pthread_mutex_unlock(&lfe->sf_dvr_lock); } @@ -800,51 +867,55 @@ static int satip_frontend_pid_changed( http_client_t *rtsp, satip_frontend_t *lfe, const char *name ) { + satip_tune_req_t *tr; char *add, *del; - int i, j, r, count, any = lfe->sf_pids_any; + int i, j, r, count, any; int deleted; int max_pids_len = lfe->sf_device->sd_pids_len; - if (!lfe->sf_running || lfe->sf_shutdown) + if (!lfe->sf_running || !lfe->sf_req) return 0; pthread_mutex_lock(&lfe->sf_dvr_lock); - if (lfe->sf_pids_count > lfe->sf_device->sd_pids_max) + tr = lfe->sf_req_thread; + any = tr->sf_pids_any; + + if (tr->sf_pids_count > lfe->sf_device->sd_pids_max) any = lfe->sf_device->sd_fullmux_ok ? 1 : 0; if (any) { - if (lfe->sf_pids_any_tuned) { + if (tr->sf_pids_any_tuned) { pthread_mutex_unlock(&lfe->sf_dvr_lock); return 0; } - lfe->sf_pids_any_tuned = 1; - memcpy(lfe->sf_pids_tuned, lfe->sf_pids, - lfe->sf_pids_count * sizeof(uint16_t)); - lfe->sf_pids_tcount = lfe->sf_pids_count; + tr->sf_pids_any_tuned = 1; + memcpy(tr->sf_pids_tuned, tr->sf_pids, + tr->sf_pids_count * sizeof(uint16_t)); + tr->sf_pids_tcount = tr->sf_pids_count; pthread_mutex_unlock(&lfe->sf_dvr_lock); r = satip_rtsp_play(rtsp, "all", NULL, NULL, max_pids_len); r = r == 0 ? 1 : r; } else if (!lfe->sf_device->sd_pids_deladd || - lfe->sf_pids_any_tuned || - lfe->sf_pids_tcount == 0) { + tr->sf_pids_any_tuned || + tr->sf_pids_tcount == 0) { - lfe->sf_pids_any_tuned = 0; - count = lfe->sf_pids_count; + tr->sf_pids_any_tuned = 0; + count = tr->sf_pids_count; if (count > lfe->sf_device->sd_pids_max) count = lfe->sf_device->sd_pids_max; add = alloca(1 + count * 5); add[0] = '\0'; /* prioritize higher PIDs (tables are low prio) */ satip_frontend_store_pids(add, - &lfe->sf_pids[lfe->sf_pids_count - count], + &tr->sf_pids[tr->sf_pids_count - count], count); - memcpy(lfe->sf_pids_tuned, lfe->sf_pids, - lfe->sf_pids_count * sizeof(uint16_t)); - lfe->sf_pids_tcount = lfe->sf_pids_count; + memcpy(tr->sf_pids_tuned, tr->sf_pids, + tr->sf_pids_count * sizeof(uint16_t)); + tr->sf_pids_tcount = tr->sf_pids_count; pthread_mutex_unlock(&lfe->sf_dvr_lock); if (!count || add[0] == '\0') @@ -855,65 +926,65 @@ satip_frontend_pid_changed( http_client_t *rtsp, } else { - add = alloca(1 + lfe->sf_pids_count * 5); - del = alloca(1 + lfe->sf_pids_tcount * 5); + add = alloca(1 + tr->sf_pids_count * 5); + del = alloca(1 + tr->sf_pids_tcount * 5); add[0] = del[0] = '\0'; #if 0 - for (i = 0; i < lfe->sf_pids_count; i++) - printf("pid[%i] = %i\n", i, lfe->sf_pids[i]); - for (i = 0; i < lfe->sf_pids_tcount; i++) - printf("tuned[%i] = %i\n", i, lfe->sf_pids_tuned[i]); + for (i = 0; i < tr->sf_pids_count; i++) + printf("pid[%i] = %i\n", i, tr->sf_pids[i]); + for (i = 0; i < tr->sf_pids_tcount; i++) + printf("tuned[%i] = %i\n", i, tr->sf_pids_tuned[i]); #endif i = j = deleted = 0; - while (i < lfe->sf_pids_count && j < lfe->sf_pids_tcount) { - if (lfe->sf_pids[i] == lfe->sf_pids_tuned[j]) { + while (i < tr->sf_pids_count && j < tr->sf_pids_tcount) { + if (tr->sf_pids[i] == tr->sf_pids_tuned[j]) { i++; j++; - } else if (lfe->sf_pids[i] < lfe->sf_pids_tuned[j]) { + } else if (tr->sf_pids[i] < tr->sf_pids_tuned[j]) { i++; } else { - sprintf(del + strlen(del), ",%i", lfe->sf_pids_tuned[j++]); + sprintf(del + strlen(del), ",%i", tr->sf_pids_tuned[j++]); deleted++; } } - while (j < lfe->sf_pids_tcount) { - sprintf(del + strlen(del), ",%i", lfe->sf_pids_tuned[j++]); + while (j < tr->sf_pids_tcount) { + sprintf(del + strlen(del), ",%i", tr->sf_pids_tuned[j++]); deleted++; } - count = lfe->sf_pids_count + (lfe->sf_pids_tcount - deleted); + count = tr->sf_pids_count + (tr->sf_pids_tcount - deleted); if (count > lfe->sf_device->sd_pids_max) count = lfe->sf_device->sd_pids_max; /* prioritize higher PIDs (tables are low prio) */ /* count means "skip count" in following code */ - count = lfe->sf_pids_count - count; + count = tr->sf_pids_count - count; i = j = 0; - while (i < lfe->sf_pids_count && j < lfe->sf_pids_tcount) { - if (lfe->sf_pids[i] == lfe->sf_pids_tuned[j]) { + while (i < tr->sf_pids_count && j < tr->sf_pids_tcount) { + if (tr->sf_pids[i] == tr->sf_pids_tuned[j]) { i++; j++; - } else if (lfe->sf_pids[i] < lfe->sf_pids_tuned[j]) { + } else if (tr->sf_pids[i] < tr->sf_pids_tuned[j]) { if (count > 0) { count--; } else { - sprintf(add + strlen(add), ",%i", lfe->sf_pids[i]); + sprintf(add + strlen(add), ",%i", tr->sf_pids[i]); } i++; } else { j++; } } - while (i < lfe->sf_pids_count) { + while (i < tr->sf_pids_count) { if (count > 0) count--; else - sprintf(add + strlen(add), ",%i", lfe->sf_pids[i++]); + sprintf(add + strlen(add), ",%i", tr->sf_pids[i++]); } - memcpy(lfe->sf_pids_tuned, lfe->sf_pids, - lfe->sf_pids_count * sizeof(uint16_t)); - lfe->sf_pids_tcount = lfe->sf_pids_count; + memcpy(tr->sf_pids_tuned, tr->sf_pids, + tr->sf_pids_count * sizeof(uint16_t)); + tr->sf_pids_tcount = tr->sf_pids_count; pthread_mutex_unlock(&lfe->sf_dvr_lock); if (add[0] == '\0' && del[0] == '\0') @@ -935,46 +1006,118 @@ satip_frontend_input_thread ( void *aux ) #define UDP_PKT_SIZE 1472 /* this is maximum UDP payload (standard ethernet) */ #define RTP_PKT_SIZE (UDP_PKT_SIZE - 12) /* minus RTP minimal RTP header */ #define HTTP_CMD_NONE 9874 - satip_frontend_t *lfe = aux, *lfe_master = lfe; - mpegts_mux_instance_t *mmi = lfe->sf_mmi; + satip_frontend_t *lfe = aux, *lfe_master; + satip_tune_req_t *tr = NULL; + mpegts_mux_instance_t *mmi; http_client_t *rtsp; + udp_connection_t *rtp = NULL, *rtcp = NULL; dvb_mux_t *lm; char buf[256]; struct iovec *iovec; - uint8_t rtcp[2048]; + uint8_t b[2048]; uint8_t *p; sbuf_t sb; - int pos, nfds, i, r, tc; + int pos, nfds, i, r, tc, rtp_port, start = 0; size_t c; tvhpoll_event_t ev[3]; tvhpoll_t *efd; - int changing = 0, ms = -1, fatal = 0, running = 1; - uint32_t seq = -1, nseq; + int changing, ms, fatal, running, play2, exit_flag, rtsp_flags, position, reply; + uint32_t seq, nseq, unc; udp_multirecv_t um; - int play2 = 1, position, rtsp_flags = 0, reply; uint64_t u64, u64_2; - lfe->mi_display_name((mpegts_input_t*)lfe, buf, sizeof(buf)); - - if (lfe->sf_rtp == NULL || lfe->sf_rtcp == NULL || mmi == NULL) - return NULL; - - lm = (dvb_mux_t *)mmi->mmi_mux; - - if (lfe->sf_master) { - lfe_master = satip_frontend_find_by_number(lfe->sf_device, lfe->sf_master); - if (lfe_master == NULL) - lfe_master = lfe; - } + /* If set - the thread will be cancelled */ + exit_flag = 0; /* Setup poll */ efd = tvhpoll_create(4); + + /* + * New tune + */ +new_tune: + udp_close(rtcp); + udp_close(rtp); + rtcp = rtp = NULL; + rtsp = NULL; + lfe_master = NULL; + memset(ev, 0, sizeof(ev)); ev[0].events = TVHPOLL_IN; ev[0].fd = lfe->sf_dvr_pipe.rd; ev[0].data.ptr = NULL; tvhpoll_add(efd, ev, 1); + lfe->mi_display_name((mpegts_input_t*)lfe, buf, sizeof(buf)); + + while (!start) { + + nfds = tvhpoll_wait(efd, ev, 1, -1); + + if (!tvheadend_running) { exit_flag = 1; goto done; } + if (nfds <= 0) continue; + + if (ev[0].data.ptr == NULL) { + c = read(lfe->sf_dvr_pipe.rd, b, 1); + if (c == 1) { + if (b[0] == 'e') { + tvhtrace("satip", "%s - input thread received shutdown", buf); + exit_flag = 1; + goto done; + } else if (b[0] == 's') { + tvhtrace("satip", "%s - start", buf); + start = 1; + } + } + } + + } + + start = 0; + + lfe->mi_display_name((mpegts_input_t*)lfe, buf, sizeof(buf)); + + pthread_mutex_lock(&lfe->sf_dvr_lock); + lfe->sf_req_thread = tr = lfe->sf_req; + pthread_mutex_unlock(&lfe->sf_dvr_lock); + + if (!lfe->sf_req_thread) + goto new_tune; + + mmi = tr->sf_mmi; + changing = 0; + ms = -1; + fatal = 0; + running = 1; + seq = -1; + play2 = 1; + rtsp_flags = 0; + + if (udp_bind_double(&rtp, &rtcp, + "satip", "rtp", "rtpc", + satip_frontend_bindaddr(lfe), lfe->sf_udp_rtp_port, + NULL, SATIP_BUF_SIZE, 16384) < 0) + goto done; + + rtp_port = ntohs(IP_PORT(rtp->ip)); + + tvhtrace("satip", "%s - local RTP port %i RTCP port %i", + lfe->mi_name, + ntohs(IP_PORT(rtp->ip)), + ntohs(IP_PORT(rtcp->ip))); + + if (rtp == NULL || rtcp == NULL || mmi == NULL) + goto done; + + lm = (dvb_mux_t *)mmi->mmi_mux; + + lfe_master = lfe; + if (lfe->sf_master) { + lfe_master = satip_frontend_find_by_number(lfe->sf_device, lfe->sf_master); + if (lfe_master == NULL) + lfe_master = lfe; + } + pthread_mutex_lock(&lfe->sf_device->sd_tune_mutex); u64 = lfe_master->sf_last_tune; i = lfe_master->sf_tdelay; @@ -999,22 +1142,29 @@ satip_frontend_input_thread ( void *aux ) tc = 1; nfds = tvhpoll_wait(efd, ev, 1, r); - if (!tvheadend_running) goto fast_exit; - + if (!tvheadend_running) { exit_flag = 1; goto done; } if (nfds < 0) continue; - if (nfds == 0) break; if (ev[0].data.ptr == NULL) { - c = read(lfe->sf_dvr_pipe.rd, rtcp, 1); - if (c == 1 && rtcp[0] == 'c') { - tc = 0; - ms = 20; - changing = 1; - continue; + c = read(lfe->sf_dvr_pipe.rd, b, 1); + if (c == 1) { + if (b[0] == 'c') { + tc = 0; + ms = 20; + changing = 1; + continue; + } else if (b[0] == 'e') { + tvhtrace("satip", "%s - input thread received shutdown", buf); + exit_flag = 1; + goto done; + } else if (b[0] == 's') { + start = 1; + goto done; + } } - tvhtrace("satip", "%s - input thread received fast shutdown", buf); - goto fast_exit; + tvhtrace("satip", "%s - input thread received mux close", buf); + goto done; } } @@ -1026,20 +1176,17 @@ satip_frontend_input_thread ( void *aux ) rtsp = http_client_connect(lfe, RTSP_VERSION_1_0, "rstp", lfe->sf_device->sd_info.addr, 554, satip_frontend_bindaddr(lfe)); - if (rtsp == NULL) { -fast_exit: - tvhpoll_destroy(efd); - return NULL; - } + if (rtsp == NULL) + goto done; /* Setup poll */ memset(ev, 0, sizeof(ev)); ev[0].events = TVHPOLL_IN; - ev[0].fd = lfe->sf_rtp->fd; - ev[0].data.ptr = lfe->sf_rtp; + ev[0].fd = rtp->fd; + ev[0].data.ptr = rtp; ev[1].events = TVHPOLL_IN; - ev[1].fd = lfe->sf_rtcp->fd; - ev[1].data.ptr = lfe->sf_rtcp; + ev[1].fd = rtcp->fd; + ev[1].data.ptr = rtcp; ev[2].events = TVHPOLL_IN; ev[2].fd = rtsp->hc_fd; ev[2].data.ptr = rtsp; @@ -1051,12 +1198,17 @@ fast_exit: rtsp_flags |= SATIP_SETUP_PIDS0; if (lfe->sf_device->sd_pilot_on) rtsp_flags |= SATIP_SETUP_PILOT_ON; - r = satip_rtsp_setup(rtsp, - position, lfe->sf_number, - lfe->sf_rtp_port, &lm->lm_tuning, - rtsp_flags); + r = 0xa59234; + pthread_mutex_lock(&lfe->sf_dvr_lock); + if (lfe->sf_req == lfe->sf_req_thread) + r = satip_rtsp_setup(rtsp, + position, lfe->sf_number, + rtp_port, &lm->lm_tuning, + rtsp_flags); + pthread_mutex_unlock(&lfe->sf_dvr_lock); if (r < 0) { - tvherror("satip", "%s - failed to tune", buf); + if (r != 0xa59234) + tvherror("satip", "%s - failed to tune", buf); goto done; } reply = 1; @@ -1068,17 +1220,28 @@ fast_exit: nfds = tvhpoll_wait(efd, ev, 1, ms); - if (!tvheadend_running) + if (!tvheadend_running) { + exit_flag = 1; running = 0; + } if (nfds > 0 && ev[0].data.ptr == NULL) { - c = read(lfe->sf_dvr_pipe.rd, rtcp, 1); - if (c == 1 && rtcp[0] == 'c') { - ms = 20; - changing = 1; - continue; + c = read(lfe->sf_dvr_pipe.rd, b, 1); + if (c == 1) { + if (b[0] == 'c') { + ms = 20; + changing = 1; + continue; + } else if (b[0] == 'e') { + tvhtrace("satip", "%s - input thread received shutdown", buf); + exit_flag = 1; running = 0; + continue; + } else if (b[0] == 's') { + start = 1; running = 0; + goto done; + } } - tvhtrace("satip", "%s - input thread received shutdown", buf); + tvhtrace("satip", "%s - input thread received mux close", buf); running = 0; continue; } @@ -1116,8 +1279,8 @@ fast_exit: r = rtsp_setup_decode(rtsp, 1); if (!running) break; - if (r < 0 || rtsp->hc_rtp_port != lfe->sf_rtp_port || - rtsp->hc_rtpc_port != lfe->sf_rtp_port + 1) { + if (r < 0 || rtsp->hc_rtp_port != rtp_port || + rtsp->hc_rtpc_port != rtp_port + 1) { tvhlog(LOG_ERR, "satip", "%s - RTSP SETUP error %d (%s) [%i-%i]", buf, r, strerror(-r), rtsp->hc_cmd, rtsp->hc_code); fatal = 1; @@ -1126,11 +1289,16 @@ fast_exit: rtsp->hc_host, lfe->sf_number, rtsp->hc_rtsp_session, rtsp->hc_rtsp_stream_id); if (lfe->sf_play2) { - r = satip_rtsp_setup(rtsp, position, lfe->sf_number, - lfe->sf_rtp_port, &lm->lm_tuning, - rtsp_flags | SATIP_SETUP_PLAY); + r = 0xa59234; + pthread_mutex_lock(&lfe->sf_dvr_lock); + if (lfe->sf_req == lfe->sf_req_thread) + r = satip_rtsp_setup(rtsp, position, lfe->sf_number, + rtp_port, &lm->lm_tuning, + rtsp_flags | SATIP_SETUP_PLAY); + pthread_mutex_unlock(&lfe->sf_dvr_lock); if (r < 0) { - tvherror("satip", "%s - failed to tune2", buf); + if (r != 0xa59234) + tvherror("satip", "%s - failed to tune2", buf); fatal = 1; } reply = 1; @@ -1176,17 +1344,21 @@ fast_exit: reply = 1; } - if (ev[0].data.ptr == lfe->sf_rtcp) { - c = recv(lfe->sf_rtcp->fd, rtcp, sizeof(rtcp), MSG_DONTWAIT); - if (c > 0) - satip_frontend_decode_rtcp(lfe, buf, mmi, rtcp, c); + if (ev[0].data.ptr == rtcp) { + c = recv(rtcp->fd, b, sizeof(b), MSG_DONTWAIT); + if (c > 0) { + pthread_mutex_lock(&lfe->sf_dvr_lock); + if (lfe->sf_req == lfe->sf_req_thread) + satip_frontend_decode_rtcp(lfe, buf, mmi, b, c); + pthread_mutex_unlock(&lfe->sf_dvr_lock); + } continue; } - if (ev[0].data.ptr != lfe->sf_rtp) + if (ev[0].data.ptr != rtp) continue; - tc = udp_multirecv_read(&um, lfe->sf_rtp->fd, RTP_PKTS, &iovec); + tc = udp_multirecv_read(&um, rtp->fd, RTP_PKTS, &iovec); if (tc < 0) { if (ERRNO_AGAIN(errno)) @@ -1200,7 +1372,7 @@ fast_exit: break; } - for (i = 0; i < tc; i++) { + for (i = 0, unc = 0; i < tc; i++) { p = iovec[i].iov_base; c = iovec[i].iov_len; @@ -1224,15 +1396,20 @@ fast_exit: if (seq == -1) seq = nseq; else if (((seq + 1) & 0xffff) != nseq) - mmi->mmi_stats.unc += ((c - pos) / 188) * - (uint32_t)((uint16_t)nseq-(uint16_t)(seq+1)); + unc += ((c - pos) / 188) * (uint32_t)((uint16_t)nseq-(uint16_t)(seq+1)); seq = nseq; /* Process */ tsdebug_write((mpegts_mux_t *)lm, p + pos, c - pos); sbuf_append(&sb, p + pos, c - pos); } - mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, - &sb, NULL, NULL); + pthread_mutex_lock(&lfe->sf_dvr_lock); + if (lfe->sf_req == lfe->sf_req_thread) { + mmi->mmi_stats.unc += unc; + mpegts_input_recv_packets((mpegts_input_t*)lfe, mmi, + &sb, NULL, NULL); + } else + fatal = 1; + pthread_mutex_unlock(&lfe->sf_dvr_lock); } /* Do not send the SMT_SIGNAL_STATUS packets - we are out of service */ @@ -1242,19 +1419,19 @@ fast_exit: udp_multirecv_free(&um); ev[0].events = TVHPOLL_IN; - ev[0].fd = lfe->sf_rtp->fd; - ev[0].data.ptr = lfe->sf_rtp; + ev[0].fd = rtp->fd; + ev[0].data.ptr = rtp; ev[1].events = TVHPOLL_IN; - ev[1].fd = lfe->sf_rtcp->fd; - ev[1].data.ptr = lfe->sf_rtcp; + ev[1].fd = rtcp->fd; + ev[1].data.ptr = rtcp; ev[2].events = TVHPOLL_IN; ev[2].fd = lfe->sf_dvr_pipe.rd; ev[2].data.ptr = NULL; tvhpoll_rem(efd, ev, 3); if (rtsp->hc_rtsp_stream_id >= 0) { - snprintf((char *)rtcp, sizeof(rtcp), "/stream=%li", rtsp->hc_rtsp_stream_id); - r = rtsp_teardown(rtsp, (char *)rtcp, NULL); + snprintf((char *)b, sizeof(b), "/stream=%li", rtsp->hc_rtsp_stream_id); + r = rtsp_teardown(rtsp, (char *)b, NULL); if (r < 0) { tvhtrace("satip", "%s - bad teardown", buf); } else { @@ -1275,7 +1452,7 @@ fast_exit: } } /* for sure - the second sequence */ - r = rtsp_teardown(rtsp, (char *)rtcp, NULL); + r = rtsp_teardown(rtsp, (char *)b, NULL); if (r < 0) { tvhtrace("satip", "%s - bad teardown2", buf); } else { @@ -1298,115 +1475,36 @@ fast_exit: } done: - if (lfe->sf_teardown_delay) { + pthread_mutex_lock(&lfe->sf_dvr_lock); + if (tr && tr != lfe->sf_req) { + free(tr->sf_pids); + free(tr->sf_pids_tuned); + free(tr); + } + lfe->sf_req_thread = tr = NULL; + pthread_mutex_unlock(&lfe->sf_dvr_lock); + + udp_close(rtcp); + udp_close(rtp); + rtcp = rtp = NULL; + + if (lfe->sf_teardown_delay && lfe_master) { pthread_mutex_lock(&lfe->sf_device->sd_tune_mutex); lfe->sf_last_tune = lfe_master->sf_last_tune = getmonoclock(); pthread_mutex_unlock(&lfe->sf_device->sd_tune_mutex); } - http_client_close(rtsp); + if (rtsp) + http_client_close(rtsp); + + if (!exit_flag) + goto new_tune; + tvhpoll_destroy(efd); return NULL; #undef PKTS } -/* ************************************************************************** - * Tuning - * *************************************************************************/ - -static void -satip_frontend_signal_cb( void *aux ) -{ - satip_frontend_t *lfe = aux; - mpegts_mux_instance_t *mmi = LIST_FIRST(&lfe->mi_mux_active); - streaming_message_t sm; - signal_status_t sigstat; - service_t *svc; - - if (mmi == NULL) - return; - if (!lfe->sf_tables) { - psi_tables_default(mmi->mmi_mux); - psi_tables_dvb(mmi->mmi_mux); - lfe->sf_tables = 1; - } - sigstat.status_text = signal2str(lfe->sf_status); - sigstat.snr = mmi->mmi_stats.snr; - sigstat.signal = mmi->mmi_stats.signal; - sigstat.ber = mmi->mmi_stats.ber; - sigstat.unc = mmi->mmi_stats.unc; - sigstat.signal_scale = mmi->mmi_stats.signal_scale; - sigstat.snr_scale = mmi->mmi_stats.snr_scale; - sigstat.ec_bit = mmi->mmi_stats.ec_bit; - sigstat.tc_bit = mmi->mmi_stats.tc_bit; - sigstat.ec_block = mmi->mmi_stats.ec_block; - sigstat.tc_block = mmi->mmi_stats.tc_block; - sm.sm_type = SMT_SIGNAL_STATUS; - sm.sm_data = &sigstat; - LIST_FOREACH(svc, &lfe->mi_transports, s_active_link) { - pthread_mutex_lock(&svc->s_stream_mutex); - streaming_pad_deliver(&svc->s_streaming_pad, streaming_msg_clone(&sm)); - pthread_mutex_unlock(&svc->s_stream_mutex); - } - gtimer_arm_ms(&lfe->sf_monitor_timer, satip_frontend_signal_cb, lfe, 250); -} - -static int -satip_frontend_tune0 - ( satip_frontend_t *lfe, mpegts_mux_instance_t *mmi ) -{ - if (udp_bind_double(&lfe->sf_rtp, &lfe->sf_rtcp, - "satip", "rtp", "rtpc", - satip_frontend_bindaddr(lfe), lfe->sf_udp_rtp_port, - NULL, SATIP_BUF_SIZE, 16384) < 0) - return SM_CODE_TUNING_FAILED; - - lfe->sf_rtp_port = ntohs(IP_PORT(lfe->sf_rtp->ip)); - - assert(lfe->sf_pids == NULL); - assert(lfe->sf_pids_tuned == NULL); - - lfe->sf_pids_count = 0; - lfe->sf_pids_tcount = 0; - lfe->sf_pids_size = 512; - lfe->sf_pids = calloc(lfe->sf_pids_size, sizeof(uint16_t)); - lfe->sf_pids_tuned = calloc(lfe->sf_pids_size, sizeof(uint16_t)); - lfe->sf_pids_any = 0; - lfe->sf_pids_any_tuned = 0; - lfe->sf_status = SIGNAL_NONE; - - lfe->sf_mmi = mmi; - lfe->sf_running = 1; - lfe->sf_tables = 0; - - tvhtrace("satip", "%s - local RTP port %i RTCP port %i", - lfe->mi_name, - ntohs(IP_PORT(lfe->sf_rtp->ip)), - ntohs(IP_PORT(lfe->sf_rtcp->ip))); - - tvh_pipe(O_NONBLOCK, &lfe->sf_dvr_pipe); - tvhthread_create(&lfe->sf_dvr_thread, NULL, - satip_frontend_input_thread, lfe); - - gtimer_arm_ms(&lfe->sf_monitor_timer, satip_frontend_signal_cb, lfe, 50); - - return 0; -} - -static int -satip_frontend_tune1 - ( satip_frontend_t *lfe, mpegts_mux_instance_t *mmi ) -{ - char buf1[256], buf2[256]; - - lfe->mi_display_name((mpegts_input_t*)lfe, buf1, sizeof(buf1)); - mpegts_mux_nice_name(mmi->mmi_mux, buf2, sizeof(buf2)); - tvhdebug("satip", "%s - starting %s", buf1, buf2); - - /* Tune */ - return satip_frontend_tune0(lfe, mmi); -} - /* ************************************************************************** * Creation/Config * *************************************************************************/ @@ -1542,6 +1640,10 @@ satip_frontend_create } } + tvh_pipe(O_NONBLOCK, &lfe->sf_dvr_pipe); + tvhthread_create(&lfe->sf_dvr_thread, NULL, + satip_frontend_input_thread, lfe); + return lfe; } @@ -1578,11 +1680,24 @@ satip_frontend_save ( satip_frontend_t *lfe, htsmsg_t *fe ) void satip_frontend_delete ( satip_frontend_t *lfe ) { + char buf1[256]; + lock_assert(&global_lock); + lfe->mi_display_name((mpegts_input_t *)lfe, buf1, sizeof(buf1)); + /* Ensure we're stopped */ mpegts_input_stop_all((mpegts_input_t*)lfe); + /* Stop thread */ + if (lfe->sf_dvr_pipe.wr > 0) { + tvh_write(lfe->sf_dvr_pipe.wr, "e", 1); + tvhtrace("satip", "%s - waiting for control thread", buf1); + pthread_join(lfe->sf_dvr_thread, NULL); + tvh_pipe_close(&lfe->sf_dvr_pipe); + tvhdebug("satip", "%s - stopped control thread", buf1); + } + /* Stop timer */ gtimer_disarm(&lfe->sf_monitor_timer); diff --git a/src/input/mpegts/satip/satip_private.h b/src/input/mpegts/satip/satip_private.h index b87f6eff..43cc1ff6 100644 --- a/src/input/mpegts/satip/satip_private.h +++ b/src/input/mpegts/satip/satip_private.h @@ -30,6 +30,7 @@ typedef struct satip_device_info satip_device_info_t; typedef struct satip_device satip_device_t; +typedef struct satip_tune_req satip_tune_req_t; typedef struct satip_frontend satip_frontend_t; typedef struct satip_satconf satip_satconf_t; @@ -85,6 +86,18 @@ struct satip_device pthread_mutex_t sd_tune_mutex; }; +struct satip_tune_req { + mpegts_mux_instance_t *sf_mmi; + + uint16_t *sf_pids; + uint16_t *sf_pids_tuned; + int sf_pids_any; + int sf_pids_any_tuned; + int sf_pids_size; + int sf_pids_count; + int sf_pids_tcount; /*< tuned count */ +}; + struct satip_frontend { mpegts_input_t; @@ -114,25 +127,15 @@ struct satip_frontend pthread_t sf_dvr_thread; th_pipe_t sf_dvr_pipe; pthread_mutex_t sf_dvr_lock; - pthread_cond_t sf_dvr_cond; - uint16_t *sf_pids; - uint16_t *sf_pids_tuned; - int sf_pids_any; - int sf_pids_any_tuned; - int sf_pids_size; - int sf_pids_count; - int sf_pids_tcount; /*< tuned count */ + int sf_thread; int sf_running; - int sf_shutdown; int sf_tables; int sf_position; - udp_connection_t *sf_rtp; - udp_connection_t *sf_rtcp; - int sf_rtp_port; - mpegts_mux_instance_t *sf_mmi; signal_state_t sf_status; gtimer_t sf_monitor_timer; uint64_t sf_last_tune; + satip_tune_req_t *sf_req; + satip_tune_req_t *sf_req_thread; /* * Configuration