/* * Packet / Buffer management * Copyright (C) 2007 Andreas Öman * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ #define _XOPEN_SOURCE 500 #include #include #include #include #include #include #include #include #include #include #include #include #include "tvhead.h" #include "buffer.h" int64_t store_mem_size; int64_t store_mem_size_max; int64_t store_disk_size; int64_t store_disk_size_max; int store_packets; static struct th_pkt_queue store_mem_queue; static struct th_pkt_queue store_disk_queue; static int store_chunk_size; static const char *store_path; static th_storage_t *curstore; static int store_tally; static void storage_wipe(void); static void storage_mem_enq(th_stream_t *st, th_pkt_t *pkt); static void storage_disk_enq(th_stream_t *st, th_pkt_t *pkt); static void storage_deref(th_storage_t *s); /* * */ void pkt_init(void) { store_path = NULL; if(store_path != NULL) storage_wipe(); TAILQ_INIT(&store_mem_queue); TAILQ_INIT(&store_disk_queue); store_mem_size_max = 1024 * 1024 * 10ULL; store_disk_size_max = 1024 * 1024 * 4000ULL; store_chunk_size = store_disk_size_max / 32; } /* * */ static void pkt_free(th_pkt_t *pkt) { assert(pkt->pkt_storage == NULL); free(pkt->pkt_payload); memset(pkt, 0xff, sizeof(th_pkt_t)); store_packets--; free(pkt); } /* * */ void pkt_deref(th_pkt_t *pkt) { assert(pkt->pkt_refcount > 0); if(pkt->pkt_refcount > 1) { pkt->pkt_refcount--; return; } pkt_free(pkt); } /* * */ th_pkt_t * pkt_ref(th_pkt_t *pkt) { pkt->pkt_refcount++; return pkt; } /* * */ th_pkt_t * pkt_alloc(void *data, size_t datalen, int64_t pts, int64_t dts) { th_pkt_t *pkt; pkt = calloc(1, sizeof(th_pkt_t)); pkt->pkt_payloadlen = datalen; if(datalen > 0) { pkt->pkt_payload = malloc(datalen + FF_INPUT_BUFFER_PADDING_SIZE); if(data != NULL) memcpy(pkt->pkt_payload, data, datalen); } pkt->pkt_dts = dts; pkt->pkt_pts = pts; pkt->pkt_refcount = 1; store_packets++; return pkt; } /* * */ th_pkt_t * pkt_copy(th_stream_t *st, th_pkt_t *orig) { th_pkt_t *pkt; pkt_load(st, orig); if(orig->pkt_payload == NULL) return NULL; pkt = malloc(sizeof(th_pkt_t)); memcpy(pkt, orig, sizeof(th_pkt_t)); pkt->pkt_payload = malloc(pkt->pkt_payloadlen + FF_INPUT_BUFFER_PADDING_SIZE); memcpy(pkt->pkt_payload, orig->pkt_payload, pkt->pkt_payloadlen); pkt->pkt_on_stream_queue = 0; pkt->pkt_storage = NULL; pkt->pkt_refcount = 1; return pkt; } /* * */ void pkt_store(th_stream_t *st, th_pkt_t *pkt) { if(pkt->pkt_on_stream_queue) return; pkt->pkt_on_stream_queue = 1; pkt->pkt_refcount++; TAILQ_INSERT_TAIL(&st->st_pktq, pkt, pkt_queue_link); /* Persistent buffer management */ storage_mem_enq(st, pkt); storage_disk_enq(st, pkt); if(pkt->pkt_storage) pwrite(pkt->pkt_storage->ts_fd, pkt->pkt_payload, pkt->pkt_payloadlen, pkt->pkt_storage_offset); } /* * Force flush of a packet */ void pkt_unstore(th_stream_t *st, th_pkt_t *pkt) { assert(pkt->pkt_on_stream_queue == 1); TAILQ_REMOVE(&st->st_pktq, pkt, pkt_queue_link); pkt->pkt_on_stream_queue = 0; if(pkt->pkt_storage != NULL) { storage_deref(pkt->pkt_storage); TAILQ_REMOVE(&store_disk_queue, pkt, pkt_disk_link); store_disk_size -= pkt->pkt_payloadlen; pkt->pkt_storage = NULL; } if(pkt->pkt_payload != NULL) { TAILQ_REMOVE(&store_mem_queue, pkt, pkt_mem_link); store_mem_size -= pkt->pkt_payloadlen; } pkt_deref(pkt); } /* * */ int pkt_load(th_stream_t *st, th_pkt_t *pkt) { if(pkt->pkt_payload == NULL && pkt->pkt_storage != NULL) { pkt->pkt_payload = malloc(pkt->pkt_payloadlen + FF_INPUT_BUFFER_PADDING_SIZE); pread(pkt->pkt_storage->ts_fd, pkt->pkt_payload, pkt->pkt_payloadlen, pkt->pkt_storage_offset); storage_mem_enq(st, pkt); } return pkt->pkt_payload == NULL ? -1 : 0; } /* * */ static void storage_deref(th_storage_t *s) { if(s->ts_refcount > 1) { s->ts_refcount--; return; } if(curstore == s) curstore = NULL; close(s->ts_fd); unlink(s->ts_filename); free(s->ts_filename); free(s); } /* * */ static void storage_mem_enq(th_stream_t *st, th_pkt_t *pkt) { TAILQ_INSERT_TAIL(&store_mem_queue, pkt, pkt_mem_link); store_mem_size += pkt->pkt_payloadlen; while(store_mem_size >= store_mem_size_max) { pkt = TAILQ_FIRST(&store_mem_queue); TAILQ_REMOVE(&store_mem_queue, pkt, pkt_mem_link); store_mem_size -= pkt->pkt_payloadlen; free(pkt->pkt_payload); pkt->pkt_payload = NULL; if(pkt->pkt_storage == NULL) pkt_unstore(st, pkt); } } /* * */ static void storage_disk_enq(th_stream_t *st, th_pkt_t *pkt) { th_storage_t *s; char fbuf[500]; int fd; if(curstore == NULL) { snprintf(fbuf, sizeof(fbuf), "%s/s%d", store_path, ++store_tally); fd = open(fbuf, O_RDWR | O_CREAT | O_TRUNC, 0644); if(fd == -1) { s = NULL; } else { s = calloc(1, sizeof(th_storage_t)); s->ts_fd = fd; s->ts_filename = strdup(fbuf); } curstore = s; } else { s = curstore; } if(s != NULL) { TAILQ_INSERT_TAIL(&store_disk_queue, pkt, pkt_disk_link); store_disk_size += pkt->pkt_payloadlen; s->ts_refcount++; pkt->pkt_storage = s; pkt->pkt_storage_offset = s->ts_offset; s->ts_offset += pkt->pkt_payloadlen; if(s->ts_offset > store_chunk_size) curstore = NULL; } while(store_disk_size > store_disk_size_max) { pkt = TAILQ_FIRST(&store_disk_queue); if(pkt->pkt_refcount > 1) printf("UNSTORE of reference packet %p\n", pkt); pkt_unstore(st, pkt); } } /** * Erase all old files */ static void storage_wipe(void) { DIR *dir; struct dirent *d; char fbuf[500]; if((dir = opendir(store_path)) != NULL) { while((d = readdir(dir)) != NULL) { if(d->d_name[0] == '.') continue; snprintf(fbuf, sizeof(fbuf), "%s/%s", store_path, d->d_name); unlink(fbuf); } } closedir(dir); }