diff --git a/lib/hooks/pps_ts.cpp b/lib/hooks/pps_ts.cpp index 6a50a931d..6e35f060b 100644 --- a/lib/hooks/pps_ts.cpp +++ b/lib/hooks/pps_ts.cpp @@ -33,113 +33,28 @@ namespace villas { namespace node { -class TimeSync { -protected: - //Algorithms stability parameters - //Examples from Paper: - //const double p = 0.99; - //const double k1 = 1.1; - //const double k2 = 1.0; - //Tuned parameters: - const double p = 1.2; - const double k1 = 1.3; - const double k2 = 0.9; - const double c = 0.7; // ~= mü_max - const double tau = 1.; //Time between synchronizations - //We assume no skew of our internal clock - const double r = 1.; - - //Time estimate - uint64_t x; - //Skew correction - double s; - //Dampening Factor - double y; -public: - TimeSync() : x(0.5e9), s(1.), y(0.) - { - //Check stability - assert( tau < (p*(k2-p*(k1-k2)))/ - (c*(k1-p*(k1-k2))*(k1-p*(k1-k2))) ); - assert( 2*k1/3*p > k1-k2 ); - assert( k1-k2 > 0 ); - assert( 2 > p && p > 0 ); - } - - /** Returns the current time Error - * - * @actualTime The actual time from an external clock - * @nanoseconds the nanoseconds since the last synchronization event - */ - int64_t timeError(const uint64_t actualTime, const uint64_t nanoseconds) - { - uint64_t currentEstimate = timeFromNanoseconds(nanoseconds); - return (int64_t)actualTime + (int64_t)tau*1e9 - (int64_t)currentEstimate; - } - - /** synchronizes with external clock - * - * @actualTime nanoseconds at which event should occur - * @return nanoseconds estimate - */ - uint64_t synchronize(const uint64_t actualTime, const uint64_t nanoseconds) - { - double t_err; - uint64_t ret; - //Time estimate - x = timeFromNanoseconds(nanoseconds); - //Time Error - t_err = ((int64_t)actualTime + (int64_t)tau*1e9 - (int64_t)x)/1e9; - //Skew correction or internal frequency / external frequency - s = s + k1*(t_err) - k2*y; - //Dampening Factor - y = p*(t_err) + (1-p)*y; - - //Clip skew correction - if (s > 1.1) { - s = 1.1; - } else if (s < 0.9) { - s = 0.9; - } - printf("time error: %f, nanoseconds: %lu, x: %ld, s: %f, y: %f\n", t_err, nanoseconds, x, s, y); - - ret = x; - //If time estimate gets too large, reset x - if ( x > tau*1e9) { - x -= tau*1e9; - } - return ret; - } - - uint64_t time(const double partOfFullSecond) - { - return x + partOfFullSecond*tau*r*s*1e9; - } - - /** Returns time estimate based on the nanoseconds that passed since - * last synchrsonization - * - * @return nanoesecond estimate - */ - uint64_t timeFromNanoseconds(const uint64_t n) - { - return time(n / 1.e9); - } -}; - class PpsTsHook : public Hook { protected: double lastValue; double thresh; unsigned idx; - uint64_t lastSeqNr; - unsigned edgeCounter; - timespec realTime; - timespec lastEdge; uint64_t last_sequence; - TimeSync ts; - bool converged; + + bool isSynced; + bool isLocked; + struct timespec tsVirt; + double timeErr; // in seconds + double periodEst; // in seconds + double periodErrComp; // in seconds + double period; // in seconds + uintmax_t cntEdges; + uintmax_t cntSmps; + uintmax_t cntSmpsTotal; + unsigned horizonComp; + unsigned horizonEst; + uintmax_t filtLen; + uintmax_t *filtWin; public: PpsTsHook(struct vpath *p, struct vnode *n, int fl, int prio, bool en = true) : @@ -147,14 +62,23 @@ public: lastValue(0), thresh(1.5), idx(0), - lastSeqNr(0), - edgeCounter(0), - realTime({ 0, 0 }), - lastEdge({0,0}), last_sequence(0), - ts(), - converged(false) + isSynced(false), + isLocked(false), + timeErr(0.0), + period(0.0), + cntEdges(0), + horizonComp(1), + horizonEst(1), + filtLen(horizonEst + 1) { + filtWin = new uintmax_t[filtLen]; + memset(filtWin, 0, filtLen * sizeof(uintmax_t)); + } + + ~PpsTsHook() + { + delete[] filtWin; } virtual void parse(json_t *cfg) @@ -166,14 +90,18 @@ public: Hook::parse(cfg); - ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s?: f}", + double fSmps = 0; + ret = json_unpack_ex(cfg, &err, 0, "{ s: i, s?: f, s: F}", "signal_index", &idx, - "threshold", &thresh + "threshold", &thresh, + "expected_smp_rate", &fSmps ); if (ret) - throw ConfigError(cfg, err, "node-config-hook-pps_ts"); + throw ConfigError(cfg, err, "node-config-hook-pps_ts"); - info("parsed config thresh=%f signal_index=%d", thresh, idx); + period = 1.0 / fSmps; + + info("parsed config thresh=%f signal_index=%d nominal_period=%f", thresh, idx, period); state = State::PARSED; @@ -183,62 +111,59 @@ public: { assert(state == State::STARTED); - static const uint64_t targetVal = 0.5e9; /* Get value of PPS signal */ float value = smp->data[idx].f; // TODO check if it is really float - //uint64_t seqNr = smp->sequence; /* Detect Edge */ bool isEdge = lastValue < thresh && value > thresh; lastValue = value; - - auto now = time_now(); - uint64_t timediff = (now.tv_nsec - lastEdge.tv_nsec)+(now.tv_sec - lastEdge.tv_sec)*1e9; - - // nsec is between 0.5e9 and 1.5e9 when converged - realTime.tv_nsec = ts.timeFromNanoseconds(timediff); - realTime.tv_sec = lastEdge.tv_sec; - - // when not converged, nsec might be really high. - // using a loop assures no time jumps. - //TODO: can nsec also become negative? - while (realTime.tv_nsec >= 1e9) { - realTime.tv_sec++; - realTime.tv_nsec -= 1e9; - } - if (isEdge) { - edgeCounter++; - ts.synchronize(targetVal, timediff); - lastEdge = now; - converged = abs(ts.timeError(targetVal, timediff)) < 0.001; + if (isSynced) { + timeErr += 1.0 - (cntSmps * period); + filtWin[cntEdges % filtLen] = cntSmpsTotal; + /* Estimated sample period over last 'horizonEst' seconds */ + unsigned int tmp = cntEdges < filtLen ? cntEdges : horizonEst; + double cntSmpsAvg = (cntSmpsTotal - filtWin[(cntEdges - tmp) % filtLen]) / tmp; + periodEst = 1.0 / cntSmpsAvg; + info("cntSmpsAvg %f", cntSmpsAvg); + periodErrComp = timeErr / (cntSmpsAvg * horizonComp); + period = periodEst + periodErrComp; + } + else { + tsVirt.tv_sec = time(nullptr); + tsVirt.tv_nsec = 0; + isSynced = true; + cntEdges = 0; + cntSmpsTotal = 0; + } + cntSmps = 0; + cntEdges++; + + info("Time Error is: %f periodEst %f periodErrComp %f", timeErr, periodEst, periodErrComp); } - /*if(isEdge){ - info("Edge detected: seq=%lu, realTime.nsec=%lu, timeErr=%f , timePeriod=%f, changeVal=%f", seqNr,realTime.tv_nsec, timeErr, periodTime, changeVal); - }*/ + cntSmps++; + cntSmpsTotal++; - - //if (!converged || edgeCounter < 2) - if (edgeCounter < 2) + if (cntEdges < 5) return Hook::Reason::SKIP_SAMPLE; - /* Update timestamp */ - smp->ts.origin = realTime; + smp->ts.origin = tsVirt; smp->flags |= (int) SampleFlags::HAS_TS_ORIGIN; - if((smp->sequence - last_sequence) > 1 ) + struct timespec tsPeriod = time_from_double(period); + tsVirt = time_add(&tsVirt, &tsPeriod); + + + if ((smp->sequence - last_sequence) > 1) warning("Samples missed: %li sampled missed", smp->sequence - last_sequence); last_sequence = smp->sequence; return Hook::Reason::OK; } - - ~PpsTsHook(){ - } }; /* Register hook */