epgdb: deferred write

This commit is contained in:
Jaroslav Kysela 2015-04-24 15:38:17 +02:00
parent d4f1c108fe
commit 99920b83e3
4 changed files with 89 additions and 50 deletions

View file

@ -31,6 +31,7 @@
#include "epggrab.h" #include "epggrab.h"
#define EPG_DB_VERSION 2 #define EPG_DB_VERSION 2
#define EPG_DB_ALLOC_STEP (1024*1024)
extern epg_object_tree_t epg_brands; extern epg_object_tree_t epg_brands;
extern epg_object_tree_t epg_seasons; extern epg_object_tree_t epg_seasons;
@ -261,7 +262,7 @@ void epg_done ( void )
* Save * Save
* *************************************************************************/ * *************************************************************************/
static int _epg_write ( int fd, htsmsg_t *m ) static int _epg_write ( sbuf_t *sb, htsmsg_t *m )
{ {
int ret = 1; int ret = 1;
size_t msglen; size_t msglen;
@ -270,7 +271,11 @@ static int _epg_write ( int fd, htsmsg_t *m )
int r = htsmsg_binary_serialize(m, &msgdata, &msglen, 0x10000); int r = htsmsg_binary_serialize(m, &msgdata, &msglen, 0x10000);
htsmsg_destroy(m); htsmsg_destroy(m);
if (!r) { if (!r) {
ret = tvh_write(fd, msgdata, msglen); ret = 0;
/* allocation helper - we fight with megabytes */
if (sb->sb_size - sb->sb_ptr < 32 * 1024)
sbuf_realloc(sb, (sb->sb_size - (sb->sb_size % EPG_DB_ALLOC_STEP)) + EPG_DB_ALLOC_STEP);
sbuf_append(sb, msgdata, msglen);
free(msgdata); free(msgdata);
} }
} else { } else {
@ -279,11 +284,31 @@ static int _epg_write ( int fd, htsmsg_t *m )
return ret; return ret;
} }
static int _epg_write_sect ( int fd, const char *sect ) static int _epg_write_sect ( sbuf_t *sb, const char *sect )
{ {
htsmsg_t *m = htsmsg_create_map(); htsmsg_t *m = htsmsg_create_map();
htsmsg_add_str(m, "__section__", sect); htsmsg_add_str(m, "__section__", sect);
return _epg_write(fd, m); return _epg_write(sb, m);
}
static void epg_save_tsk_callback ( void *p, int dearmed )
{
sbuf_t *sb = p;
int fd, r;
tvhinfo("epgdb", "save start");
fd = hts_settings_open_file(1, "epgdb.v%d", EPG_DB_VERSION);
if (fd >= 0) {
r = tvh_write(fd, sb->sb_data, sb->sb_ptr);
close(fd);
if (r)
tvherror("epgdb", "write error (size %d)", sb->sb_ptr);
else
tvhinfo("epgdb", "stored (size %d)", sb->sb_ptr);
} else
tvherror("epgdb", "unable to open epgdb file");
sbuf_free(sb);
free(sb);
} }
void epg_save_callback ( void *p ) void epg_save_callback ( void *p )
@ -293,63 +318,68 @@ void epg_save_callback ( void *p )
void epg_save ( void ) void epg_save ( void )
{ {
int fd; sbuf_t *sb = malloc(sizeof(*sb));
epg_object_t *eo; epg_object_t *eo;
epg_broadcast_t *ebc; epg_broadcast_t *ebc;
channel_t *ch; channel_t *ch;
epggrab_stats_t stats; epggrab_stats_t stats;
extern gtimer_t epggrab_save_timer; extern gtimer_t epggrab_save_timer;
if (!sb)
return;
tvhinfo("epgdb", "snapshot start");
sbuf_init_fixed(sb, EPG_DB_ALLOC_STEP);
if (epggrab_epgdb_periodicsave) if (epggrab_epgdb_periodicsave)
gtimer_arm(&epggrab_save_timer, epg_save_callback, NULL, epggrab_epgdb_periodicsave); gtimer_arm(&epggrab_save_timer, epg_save_callback, NULL, epggrab_epgdb_periodicsave);
fd = hts_settings_open_file(1, "epgdb.v%d", EPG_DB_VERSION);
if (fd < 0)
return;
memset(&stats, 0, sizeof(stats)); memset(&stats, 0, sizeof(stats));
if ( _epg_write_sect(fd, "config") ) goto error; if ( _epg_write_sect(sb, "config") ) goto error;
if (_epg_write(fd, epg_config_serialize())) goto error; if (_epg_write(sb, epg_config_serialize())) goto error;
if ( _epg_write_sect(fd, "brands") ) goto error; if ( _epg_write_sect(sb, "brands") ) goto error;
RB_FOREACH(eo, &epg_brands, uri_link) { RB_FOREACH(eo, &epg_brands, uri_link) {
if (_epg_write(fd, epg_brand_serialize((epg_brand_t*)eo))) goto error; if (_epg_write(sb, epg_brand_serialize((epg_brand_t*)eo))) goto error;
stats.brands.total++; stats.brands.total++;
} }
if ( _epg_write_sect(fd, "seasons") ) goto error; if ( _epg_write_sect(sb, "seasons") ) goto error;
RB_FOREACH(eo, &epg_seasons, uri_link) { RB_FOREACH(eo, &epg_seasons, uri_link) {
if (_epg_write(fd, epg_season_serialize((epg_season_t*)eo))) goto error; if (_epg_write(sb, epg_season_serialize((epg_season_t*)eo))) goto error;
stats.seasons.total++; stats.seasons.total++;
} }
if ( _epg_write_sect(fd, "episodes") ) goto error; if ( _epg_write_sect(sb, "episodes") ) goto error;
RB_FOREACH(eo, &epg_episodes, uri_link) { RB_FOREACH(eo, &epg_episodes, uri_link) {
if (_epg_write(fd, epg_episode_serialize((epg_episode_t*)eo))) goto error; if (_epg_write(sb, epg_episode_serialize((epg_episode_t*)eo))) goto error;
stats.episodes.total++; stats.episodes.total++;
} }
if ( _epg_write_sect(fd, "serieslinks") ) goto error; if ( _epg_write_sect(sb, "serieslinks") ) goto error;
RB_FOREACH(eo, &epg_serieslinks, uri_link) { RB_FOREACH(eo, &epg_serieslinks, uri_link) {
if (_epg_write(fd, epg_serieslink_serialize((epg_serieslink_t*)eo))) goto error; if (_epg_write(sb, epg_serieslink_serialize((epg_serieslink_t*)eo))) goto error;
stats.seasons.total++; stats.seasons.total++;
} }
if ( _epg_write_sect(fd, "broadcasts") ) goto error; if ( _epg_write_sect(sb, "broadcasts") ) goto error;
CHANNEL_FOREACH(ch) { CHANNEL_FOREACH(ch) {
RB_FOREACH(ebc, &ch->ch_epg_schedule, sched_link) { RB_FOREACH(ebc, &ch->ch_epg_schedule, sched_link) {
if (_epg_write(fd, epg_broadcast_serialize(ebc))) goto error; if (_epg_write(sb, epg_broadcast_serialize(ebc))) goto error;
stats.broadcasts.total++; stats.broadcasts.total++;
} }
} }
close(fd);
tasklet_arm_alloc(epg_save_tsk_callback, sb);
/* Stats */ /* Stats */
tvhlog(LOG_INFO, "epgdb", "saved"); tvhinfo("epgdb", "queued to save (size %d)", sb->sb_ptr);
tvhlog(LOG_INFO, "epgdb", " brands %d", stats.brands.total); tvhinfo("epgdb", " brands %d", stats.brands.total);
tvhlog(LOG_INFO, "epgdb", " seasons %d", stats.seasons.total); tvhinfo("epgdb", " seasons %d", stats.seasons.total);
tvhlog(LOG_INFO, "epgdb", " episodes %d", stats.episodes.total); tvhinfo("epgdb", " episodes %d", stats.episodes.total);
tvhlog(LOG_INFO, "epgdb", " broadcasts %d", stats.broadcasts.total); tvhinfo("epgdb", " broadcasts %d", stats.broadcasts.total);
return; return;
error: error:
tvhlog(LOG_ERR, "epgdb", "failed to store epg to disk"); tvhlog(LOG_ERR, "epgdb", "failed to store epg to disk");
hts_settings_remove("epgdb.v%d", EPG_DB_VERSION); hts_settings_remove("epgdb.v%d", EPG_DB_VERSION);
close(fd); sbuf_free(sb);
free(sb);
} }

View file

@ -1066,6 +1066,14 @@ main(int argc, char **argv)
tvhftrace("main", imagecache_done); tvhftrace("main", imagecache_done);
tvhftrace("main", lang_code_done); tvhftrace("main", lang_code_done);
tvhftrace("main", api_done); tvhftrace("main", api_done);
tvhtrace("main", "tasklet enter");
pthread_cond_signal(&tasklet_cond);
pthread_join(tasklet_tid, NULL);
tvhtrace("main", "tasklet thread end");
tasklet_flush();
tvhtrace("main", "tasklet leave");
tvhftrace("main", hts_settings_done); tvhftrace("main", hts_settings_done);
tvhftrace("main", dvb_done); tvhftrace("main", dvb_done);
tvhftrace("main", lang_str_done); tvhftrace("main", lang_str_done);
@ -1076,13 +1084,6 @@ main(int argc, char **argv)
tvhftrace("main", idnode_done); tvhftrace("main", idnode_done);
tvhftrace("main", spawn_done); tvhftrace("main", spawn_done);
tvhtrace("main", "tasklet enter");
pthread_cond_signal(&tasklet_cond);
pthread_join(tasklet_tid, NULL);
tvhtrace("main", "tasklet thread end");
tasklet_flush();
tvhtrace("main", "tasklet leave");
tvhlog(LOG_NOTICE, "STOP", "Exiting HTS Tvheadend"); tvhlog(LOG_NOTICE, "STOP", "Exiting HTS Tvheadend");
tvhlog_end(); tvhlog_end();

View file

@ -696,6 +696,8 @@ static inline void sbuf_alloc(sbuf_t *sb, int len)
sbuf_alloc_(sb, len); sbuf_alloc_(sb, len);
} }
void sbuf_realloc(sbuf_t *sb, int len);
void sbuf_append(sbuf_t *sb, const void *data, int len); void sbuf_append(sbuf_t *sb, const void *data, int len);
void sbuf_cut(sbuf_t *sb, int off); void sbuf_cut(sbuf_t *sb, int off);

View file

@ -333,20 +333,7 @@ sbuf_reset(sbuf_t *sb, int max_len)
void void
sbuf_reset_and_alloc(sbuf_t *sb, int len) sbuf_reset_and_alloc(sbuf_t *sb, int len)
{ {
if (sb->sb_data) { sbuf_realloc(sb, len);
if (len != sb->sb_size) {
void *n = realloc(sb->sb_data, len);
if (n) {
sb->sb_data = n;
sb->sb_size = len;
}
}
} else {
sb->sb_data = malloc(len);
sb->sb_size = len;
}
if (sb->sb_data == NULL)
sbuf_alloc_fail(len);
sb->sb_ptr = sb->sb_err = 0; sb->sb_ptr = sb->sb_err = 0;
} }
@ -366,6 +353,25 @@ sbuf_alloc_(sbuf_t *sb, int len)
sbuf_alloc_fail(sb->sb_size); sbuf_alloc_fail(sb->sb_size);
} }
void
sbuf_realloc(sbuf_t *sb, int len)
{
if (sb->sb_data) {
if (len != sb->sb_size) {
void *n = realloc(sb->sb_data, len);
if (n) {
sb->sb_data = n;
sb->sb_size = len;
}
}
} else {
sb->sb_data = malloc(len);
sb->sb_size = len;
}
if (sb->sb_data == NULL)
sbuf_alloc_fail(len);
}
void void
sbuf_append(sbuf_t *sb, const void *data, int len) sbuf_append(sbuf_t *sb, const void *data, int len)
{ {