tweaked data aggregation by introducing pdo based iterators

added new group interpreter
This commit is contained in:
Steffen Vogel 2010-07-28 00:41:19 +02:00
parent b23a36bad5
commit b14f0a6966
6 changed files with 404 additions and 134 deletions

View file

@ -0,0 +1,98 @@
<?php
/**
* @package default
* @copyright Copyright (c) 2010, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
namespace Volkszaehler;
/**
* @author Steffen Vogel <info@steffenvogel.de>
*/
use Doctrine\DBAL;
class DataAggregationIterator extends DataIterator {
protected $packageSize; // count of readings in tuple
protected $aggregatedSize; // total readings
protected $aggregatedKey = -1;
public function __construct(DBAL\Statement $stmt, $size, $tuples) {
parent::__construct($stmt, $size);
if ($tuples < $this->size) { // return $tuples values
$this->packageSize = floor($this->size / $tuples);
$this->aggregatedSize = $tuples;
}
else { // return all values or grouped by year, month, week...
$this->packageSize = 1;
$this->aggregatedSize = $this->size;
}
}
/**
* aggregate data
*/
public function next() {
$current = array (0, 0);
for ($c = 0; $c < $this->packageSize; $c++) {
parent::next();
if (parent::valid()) {
$tuple = parent::current();
$current[1] += $tuple[1];
}
else {
$this->current = FALSE;
return;
}
}
$this->aggregatedKey++;
$this->current = $current;
$this->current[0] = $tuple[0];
$this->current[2] = $this->packageSize;
}
public function current() {
return $this->current;
}
public function key() {
return $this->aggregatedKey;
}
public function rewind() {
parent::rewind();
$offset = $this->size - 1 - $this->aggregatedSize * $this->packageSize;
for ($i = 0; $i < $offset; $i++) {
parent::next();
}
$this->next();
}
/**
* getter & setter
*/
public function getPackageSize() { return $this->packageSize; }
}
?>

View file

@ -0,0 +1,72 @@
<?php
/**
* @package default
* @copyright Copyright (c) 2010, The volkszaehler.org project
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
namespace Volkszaehler;
/**
* @author Steffen Vogel <info@steffenvogel.de>
*/
use Doctrine\DBAL;
class DataIterator implements \Iterator, \Countable {
protected $current;
protected $key; // incrementing key
protected $pdoStmt; // PDOStatement
protected $size; // total readings in PDOStatement
public function __construct(DBAL\Statement $stmt, $size) {
$this->size = $size;
$this->pdoStmt = $stmt->getWrappedStatement();
$this->pdoStmt->setFetchMode(\PDO::FETCH_NUM);
}
public function current() {
return $this->current;
}
public function next() {
$this->key++;
$this->current = $this->pdoStmt->fetch();
}
public function key() {
return $this->key;
}
public function valid() {
return (boolean) $this->current;
}
/**
* NoRewindIterator
*/
public function rewind() {
$this->key = 0;
$this->current = $this->pdoStmt->fetch();
}
public function count() { return $this->size; }
}
?>

View file

@ -0,0 +1,92 @@
<?php
/**
* @copyright Copyright (c) 2010, The volkszaehler.org project
* @package default
* @license http://www.opensource.org/licenses/gpl-license.php GNU Public License
*/
/*
* This file is part of volkzaehler.org
*
* volkzaehler.org is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* any later version.
*
* volkzaehler.org is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
*/
namespace Volkszaehler\Interpreter;
use Doctrine\ORM;
use Volkszaehler\Model;
class GroupInterpreter {
protected $channelInterpreter;
public function __construct(Model\Group $group, ORM\EntityManager $em, $from, $to) {
foreach ($group->getChannels() as $channel) {
if (isset($indicator) && $indicator != $channel->getIndicator) {
throw new \Exception('we only can aggregate channels of the same indicator');
}
else {
$indicator = $channel->getIndicator();
}
$this->channelInterpreter[] = $channel->getInterpreter($em, $from, $to);
}
}
/**
*
* @todo to be implemented
*/
public function getValues($groupBy = NULL) {
}
/**
*
*/
public function getMin() {
$min = current($this->channelInterpreter)->getMax();
foreach ($this->channelInterpreter as $channel) {
$arr = $channel->getMax();
if ($arr['value '] < $min['value']) {
$min = $arr;
}
}
return $min;
}
/**
*
*/
public function getMax() {
$max = current($this->channelInterpreter)->getMax();
foreach ($this->channelInterpreter as $channel) {
$arr = $channel->getMax();
if ($arr['value '] > $max['value']) {
$max = $arr;
}
}
return $max;
}
/**
*
*/
public function getAverage() {
$sum = 0;
foreach ($this->channels as $channel) {
$sum += $channel->getAverage();
}
return ($sum / count($this->channelInterpreter));
}
}

View file

@ -29,11 +29,16 @@ namespace Volkszaehler\Interpreter;
* @author Steffen Vogel <info@steffenvogel.de>
*
*/
use Volkszaehler;
use Doctrine\ORM\Query;
interface InterpreterInterface {
public function getValues($from = NULL, $to = NULL, $groupBy = NULL);
public function getMin($from = NULL, $to = NULL);
public function getMax($from = NULL, $to = NULL);
public function getAverage($from = NULL, $to = NULL);
function getConsumption();
function getValues();
function getMin();
function getMax();
function getAverage();
}
/**
@ -47,14 +52,20 @@ abstract class Interpreter implements InterpreterInterface {
protected $channel;
protected $em;
protected $from;
protected $to;
/**
*
* @param $channel
* @param $em
*/
public function __construct(\Volkszaehler\Model\Channel $channel, \Doctrine\ORM\EntityManager $em) {
public function __construct(\Volkszaehler\Model\Channel $channel, \Doctrine\ORM\EntityManager $em, $from = NULL, $to = NULL) {
$this->channel = $channel;
$this->em = $em;
$this->from = $from;
$this->to = $to;
}
/**
@ -64,50 +75,90 @@ abstract class Interpreter implements InterpreterInterface {
* @param mixed $groupBy
* @todo split in two functions
*/
protected function getData($from = NULL, $to = NULL, $groupBy = NULL) {
protected function getData($groupBy = NULL) {
// get dbal connection from EntityManager
$conn = $this->em->getConnection();
// prepare sql
$params = array(':id' => $this->channel->getId());
$sqlFrom = ' FROM data';
$sqlWhere = ' WHERE channel_id = :id' . self::buildTimeFilterSQL($this->from, $this->to);
$sqlOrderBy = ' ORDER BY timestamp ASC';
if ($sqlGroupBy = self::buildGroupBySQL($groupBy)) {
$sqlRowCount = 'SELECT COUNT(DISTINCT ' . $sqlGroupBy . ')' . $sqlFrom . $sqlWhere;
$sqlGroupBy = ' GROUP BY ' . $sqlGroupBy;
$sqlFields = ' MAX(timestamp) AS timestamp, SUM(value) AS value, COUNT(timestamp) AS count';
}
else {
$sqlRowCount = 'SELECT COUNT(*)' . $sqlFrom . $sqlWhere;
$sqlFields = ' timestamp, value';
}
$rowCount = $conn->fetchColumn($sqlRowCount, $params, 0);
$stmt = $conn->prepare('SELECT ' . $sqlFields . $sqlFrom . $sqlWhere . $sqlGroupBy . $sqlOrderBy);
$stmt->execute($params);
if ($sqlGroupBy || is_null($groupBy)) { // aggregation by sql or skip it
return new Volkszaehler\DataIterator($stmt, $rowCount);
}
elseif (is_numeric($groupBy) ) { // aggregation by php
$tuples = (int) $groupBy;
return new Volkszaehler\DataAggregationIterator($stmt, $rowCount, $tuples);
}
else {
throw new \Exception('invalid groupBy parameter');
}
}
/**
* builds sql query part for grouping data by date functions
*
* @param string $groupBy
* @return string $sql the sql part
* @todo make compatible with: MSSql (Transact-SQL), Sybase, Firebird/Interbase, IBM, Informix, MySQL, Oracle, DB2, PostgreSQL, SQLite
*/
protected static function buildGroupBySQL($groupBy) {
$ts = 'FROM_UNIXTIME(timestamp/1000)'; // just for saving space
switch ($groupBy) {
case 'year':
$sqlGroupBy = 'YEAR(' . $ts . ')';
return 'YEAR(' . $ts . ')';
break;
case 'month':
$sqlGroupBy = 'YEAR(' . $ts . '), MONTH(' . $ts . ')';
return 'YEAR(' . $ts . '), MONTH(' . $ts . ')';
break;
case 'week':
$sqlGroupBy = 'YEAR(' . $ts . '), WEEKOFYEAR(' . $ts . ')';
return 'YEAR(' . $ts . '), WEEKOFYEAR(' . $ts . ')';
break;
case 'day':
$sqlGroupBy = 'YEAR(' . $ts . '), DAYOFYEAR(' . $ts . ')';
return 'YEAR(' . $ts . '), DAYOFYEAR(' . $ts . ')';
break;
case 'hour':
$sqlGroupBy = 'YEAR(' . $ts . '), DAYOFYEAR(' . $ts . '), HOUR(' . $ts . ')';
return 'YEAR(' . $ts . '), DAYOFYEAR(' . $ts . '), HOUR(' . $ts . ')';
break;
case 'minute':
$sqlGroupBy = 'YEAR(' . $ts . '), DAYOFYEAR(' . $ts . '), HOUR(' . $ts . '), MINUTE(' . $ts . ')';
return 'YEAR(' . $ts . '), DAYOFYEAR(' . $ts . '), HOUR(' . $ts . '), MINUTE(' . $ts . ')';
break;
case 'second':
$sqlGroupBy = 'YEAR(' . $ts . '), DAYOFYEAR(' . $ts . '), HOUR(' . $ts . '), MINUTE(' . $ts . '), SECOND(' . $ts . ')';
return 'YEAR(' . $ts . '), DAYOFYEAR(' . $ts . '), HOUR(' . $ts . '), MINUTE(' . $ts . '), SECOND(' . $ts . ')';
break;
default:
if (is_numeric($groupBy)) { // lets agrregate it with php
$groupBy = (int) $groupBy;
$sqlGroupBy = FALSE;
}
else {
throw new \Exception('\'' . $groupBy . '\' is not an unknown grouping mode');
}
return FALSE;
}
}
$sql = 'SELECT';
$sql .= ($sqlGroupBy === FALSE) ? ' timestamp, value' : ' MAX(timestamp) AS timestamp, SUM(value) AS value, COUNT(timestamp) AS count';
$sql .= ' FROM data WHERE channel_id = ' . (int) $this->channel->getId();
protected static function buildTimeFilterSQL($from = NULL, $to = NULL) {
$sql = '';
if (isset($from)) {
$sql .= ' && timestamp > ' . $from;
@ -117,52 +168,7 @@ abstract class Interpreter implements InterpreterInterface {
$sql .= ' && timestamp < ' . $to;
}
if ($sqlGroupBy !== FALSE) {
$sql .= ' GROUP BY ' . $sqlGroupBy;
}
$sql .= ' ORDER BY timestamp DESC';
$rsm = new \Doctrine\ORM\Query\ResultsetMapping;
$rsm->addScalarResult('timestamp', 'timestamp');
$rsm->addScalarResult('value', 'value');
if ($sqlGroupBy) {
$rsm->addScalarResult('count', 'count');
}
$query = $this->em->createNativeQuery($sql, $rsm);
$result = $query->getResult();
$totalCount = count($result);
if (is_int($groupBy) && $groupBy < $totalCount) { // return $groupBy values
$packageSize = floor($totalCount / $groupBy);
$packageCount = $groupBy;
}
else { // return all values or grouped by year, month, week...
$packageSize = 1;
$packageCount = $totalCount;
}
$packages = array();
$reading = reset($result);
for ($i = 1; $i <= $packageCount; $i++) {
$package = array('timestamp' => (int) $reading['timestamp'], // last timestamp in package
'value' => (float) $reading['value'], // sum of values
'count' => ($sqlGroupBy === FALSE) ? 1 : $reading['count']); // total count of values or pulses in the package
while ($package['count'] < $packageSize) {
$reading = next($result);
$package['value'] += $reading['value'];
$package['count']++;
}
$packages[] = $package;
$reading = next($result);
}
return array_reverse($packages); // start with oldest ts and ends with newest ts (reverse array order due to descending order in sql statement)
return $sql;
}
}

View file

@ -30,20 +30,21 @@ namespace Volkszaehler\Interpreter;
* @author Steffen Vogel (info@steffenvogel.de)
*
*/
use Volkszaehler;
class MeterInterpreter extends Interpreter {
/**
* calculates the consumption for interval speciefied by $from and $to
*
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
* @todo untested
*/
public function getConsumption($from = NULL, $to = NULL) { // TODO untested
public function getConsumption() {
$sql = 'SELECT SUM(value) AS count
FROM data
WHERE
channel_id = ' . (int) $this->id . ' &&
' . $this->getTimeFilter($from, $to) . '
' . self::buildTimeFilterSQL($this->from, $this->to) . '
GROUP BY channel_id';
$result = $this->dbh->query($sql)->rewind();
@ -53,11 +54,9 @@ class MeterInterpreter extends Interpreter {
/**
*
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
*/
public function getMin($from = NULL, $to = NULL) {
$data = $this->getData($from, $to);
public function getMin() {
$data = $this->getData();
$min = current($data);
foreach ($data as $reading) {
@ -69,60 +68,62 @@ class MeterInterpreter extends Interpreter {
}
/**
*
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
* @return Ambigous <mixed, unknown>
*/
public function getMax($from = NULL, $to = NULL) {
$data = $this->getData($from, $to);
public function getMax() {
$data = $this->getData();
$min = current($data);
$max = current($data);
foreach ($data as $reading) {
if ($reading['value '] > $min['value']) {
$min = $reading;
if ($reading['value '] > $max['value']) {
$max = $reading;
}
}
return $min;
return $max;
}
/**
*
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
* @todo calculate timeinterval if no params were given
*/
public function getAverage($from = NULL, $to = NULL) {
return $this->getConsumption($from, $to) / ($to - $from) / 1000; // return W
public function getAverage() {
return $this->getConsumption() / ($this->to - $this->from) / 1000; // return W
}
/**
* just a passthru of raw data
*
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
* just a passthrough of raw data
*/
public function getPulses($from = NULL, $to = NULL, $groupBy = NULL) {
return parent::getData($from, $to, $groupBy);
public function getPulses($groupBy = NULL) {
return parent::getData($groupBy);
}
/**
* raw pulses to power conversion
*
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
* @todo untested
* @return array with timestamp and values in [W]
*/
public function getValues($from = NULL, $to = NULL, $groupBy = NULL) {
$pulses = parent::getData($from, $to, $groupBy);
$pulseCount = count($pulses);
public function getValues($groupBy = NULL) {
$pulses = parent::getData($groupBy);
$count = $pulses->count();
for ($i = 1; $i < $pulseCount; $i++) {
$delta = $pulses[$i]['timestamp'] - $pulses[$i-1]['timestamp'];
$values = array();
foreach ($pulses as $pulse) {
if (isset($last)) {
$delta = $pulse[0] - $last[0];
$last = $pulse;
$pulses[$i]['timestamp'] -= $delta/2;
$pulses[$i]['value'] *= 3600000/(($this->channel->getResolution() / 1000) * $delta); // TODO untested
$values[] = array(
(int) ($pulse[0] - $delta / 2), // timestamp
round($pulse[1] * (3600000 / (($this->channel->getResolution() / 1000) * $delta)), 5), // value
$pulse[2]
);
}
else {
$last = $pulse;
}
}
return $pulses; // returns W
return $values;
}
}

View file

@ -32,51 +32,52 @@ namespace Volkszaehler\Interpreter;
class SensorInterpreter extends Interpreter {
/**
*
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
* @todo untested
*/
public function getValues($from = NULL, $to = NULL, $groupBy = NULL) {
$data = parent::getData($from, $to, $groupBy);
public function getValues($groupBy = NULL) {
$data = parent::getData($groupBy);
array_walk($data, function(&$reading) {
$reading['value'] /= $reading['count']; // calculate average (ungroup the sql sum() function)
});
$values = array();
foreach ($data as $reading) {
$values[] = array(
$reading[0],
$reading[1] / $reading[2], // calculate average (ungroup the sql sum() function)
$reading[2]
);
}
return $data;
return $values;
}
/**
*
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
*
* @todo adapt to doctrine orm
* @todo untested
*/
public function getMin($from = NULL, $to = NULL) {
return $this->dbh->query('SELECT value, timestamp FROM data WHERE channel_id = ' . (int) $this->id . self::buildFilterTime($from, $to) . ' ORDER BY value ASC', 1)->current();
public function getMin() {
return $this->dbh->query('SELECT value, timestamp FROM data WHERE channel_id = ' . (int) $this->id . self::buildFilterTime($this->from, $this->to) . ' ORDER BY value ASC', 1)->current();
}
/**
*
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
*
* @todo adapt to doctrine orm
* @todo untested
*/
public function getMax($from = NULL, $to = NULL) { // TODO untested
return $this->dbh->query('SELECT value, timestamp FROM data WHERE channel_id = ' . (int) $this->id . self::buildFilterTime($from, $to) . ' ORDER BY value DESC', 1)->current();
public function getMax() {
return $this->dbh->query('SELECT value, timestamp FROM data WHERE channel_id = ' . (int) $this->id . self::buildFilterTime($this->from, $this->to) . ' ORDER BY value DESC', 1)->current();
}
/**
*
* @param integer $from timestamp in ms since 1970
* @param integer $to timestamp in ms since 1970
*
* @todo adapt to doctrine orm
* @todo untested
*/
public function getAverage($from = NULL, $to = NULL) { // TODO untested
return $this->dbh->query('SELECT AVG(value) AS value FROM data WHERE channel_id = ' . (int) $this->id . self::buildFilterTime($from, $to))->current();
public function getAverage() {
return $this->dbh->query('SELECT AVG(value) AS value FROM data WHERE channel_id = ' . (int) $this->id . self::buildFilterTime($this->from, $this->to))->current();
}
/**
* @todo to be implemented
*/
public function getConsumption() {
}
}