diff --git a/lib/hooks/stats.cpp b/lib/hooks/stats.cpp index c96d30819..e422f3bd4 100644 --- a/lib/hooks/stats.cpp +++ b/lib/hooks/stats.cpp @@ -37,11 +37,103 @@ namespace villas { namespace node { +class StatsHook; + +class StatsWriteHook : public Hook { + +protected: + StatsHook *parent; + +public: + StatsWriteHook(struct path *p, struct node *n, int fl, int prio, bool en = true) : + Hook(p, n, fl, prio, en) + { + state = STATE_CHECKED; + } + + virtual int process(sample *smp) + { + stats *s = node->stats; + + timespec now = time_now(); + + stats_update(s, STATS_METRIC_AGE, time_delta(&smp->ts.received, &now)); + + return HOOK_OK; + } +}; + +class StatsReadHook : public Hook { + +protected: + sample *last; + +public: + StatsReadHook(struct path *p, struct node *n, int fl, int prio, bool en = true) : + Hook(p, n, fl, prio, en) + { + state = STATE_CHECKED; + } + + virtual void start() + { + assert(state == STATE_PREPARED); + + last = nullptr; + + state = STATE_STARTED; + } + + virtual void stop() + { + assert(state == STATE_STARTED); + + if (last) + sample_decref(last); + + state = STATE_STOPPED; + } + + virtual int process(sample *smp) + { + stats *s = node->stats; + + if (last) { + if (smp->flags & last->flags & SAMPLE_HAS_TS_RECEIVED) + stats_update(s, STATS_METRIC_GAP_RECEIVED, time_delta(&last->ts.received, &smp->ts.received)); + + if (smp->flags & last->flags & SAMPLE_HAS_TS_ORIGIN) + stats_update(s, STATS_METRIC_GAP_SAMPLE, time_delta(&last->ts.origin, &smp->ts.origin)); + + if ((smp->flags & SAMPLE_HAS_TS_ORIGIN) && (smp->flags & SAMPLE_HAS_TS_RECEIVED)) + stats_update(s, STATS_METRIC_OWD, time_delta(&smp->ts.origin, &smp->ts.received)); + + if (smp->flags & last->flags & SAMPLE_HAS_SEQUENCE) { + int dist = smp->sequence - (int32_t) last->sequence; + if (dist != 1) + stats_update(s, STATS_METRIC_SMPS_REORDERED, dist); + } + } + + sample_incref(smp); + + if (last) + sample_decref(last); + + last = smp; + + return HOOK_OK; + } +}; + class StatsHook : public Hook { protected: struct stats stats; + StatsReadHook *readHook; + StatsWriteHook *writeHook; + enum stats_format format; int verbose; int warmup; @@ -50,8 +142,6 @@ protected: AFILE *output; char *uri; - sample *last; - public: StatsHook(struct path *p, struct node *n, int fl, int prio, bool en = true) : @@ -68,8 +158,14 @@ public: /* Register statistic object to path. * * This allows the path code to update statistics. */ - if (node) - node->stats = &stats; + node->stats = &stats; + + /* Add child hooks */ + readHook = new StatsReadHook(p, n, fl, prio, en); + writeHook = new StatsWriteHook(p, n, fl, prio, en); + + vlist_push(&node->in.hooks, (void *) readHook); + vlist_push(&node->out.hooks, (void *) writeHook); } ~StatsHook() @@ -165,37 +261,6 @@ public: state = STATE_PARSED; } - - virtual int process(sample *smp) - { - struct stats *s = &stats; - - if (last) { - if (smp->flags & last->flags & SAMPLE_HAS_TS_RECEIVED) - stats_update(s, STATS_METRIC_GAP_RECEIVED, time_delta(&last->ts.received, &smp->ts.received)); - - if (smp->flags & last->flags & SAMPLE_HAS_TS_ORIGIN) - stats_update(s, STATS_METRIC_GAP_SAMPLE, time_delta(&last->ts.origin, &smp->ts.origin)); - - if ((smp->flags & SAMPLE_HAS_TS_ORIGIN) && (smp->flags & SAMPLE_HAS_TS_RECEIVED)) - stats_update(s, STATS_METRIC_OWD, time_delta(&smp->ts.origin, &smp->ts.received)); - - if (smp->flags & last->flags & SAMPLE_HAS_SEQUENCE) { - int dist = smp->sequence - (int32_t) last->sequence; - if (dist != 1) - stats_update(s, STATS_METRIC_SMPS_REORDERED, dist); - } - } - - sample_incref(smp); - - if (last) - sample_decref(last); - - last = smp; - - return HOOK_OK; - } }; /* Register hook */