simplified data processing
This commit is contained in:
parent
7aea1823e9
commit
e5a9b706b2
7 changed files with 65 additions and 159 deletions
|
@ -40,8 +40,11 @@ class DataController extends Controller {
|
|||
public function get(Model\Entity $entity) {
|
||||
$from = $this->view->request->getParameter('from');
|
||||
$to = $this->view->request->getParameter('to');
|
||||
$tuples = $this->view->request->getParameter('tuples');
|
||||
$groupBy = $this->view->request->getParameter('group');
|
||||
$class = $entity->getDefinition()->getInterpreter();
|
||||
|
||||
return $entity->getInterpreter($this->em, $from, $to);
|
||||
return new $class($entity, $this->em, $from, $to, $tuples, $groupBy);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -21,21 +21,25 @@
|
|||
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
namespace Volkszaehler\Interpreter\Iterator;
|
||||
namespace Volkszaehler\Interpreter;
|
||||
|
||||
use Volkszaehler\Util;
|
||||
|
||||
use Doctrine\DBAL;
|
||||
|
||||
/**
|
||||
* @author Steffen Vogel <info@steffenvogel.de>
|
||||
* @package default
|
||||
*/
|
||||
class DataAggregationIterator implements \Iterator, \Countable {
|
||||
class DataIterator implements \Iterator, \Countable {
|
||||
protected $stmt; // PDO statement
|
||||
|
||||
protected $current; // the current data
|
||||
protected $key; // key
|
||||
protected $size; // total readings in PDOStatement
|
||||
protected $iterator; // subiterator
|
||||
private $rowKey; // internal key for PDO statement
|
||||
|
||||
protected $rowCount; // num of readings in PDOStatement
|
||||
protected $tupleCount; // num of requested tuples
|
||||
protected $packageSize; // num of rows we aggregate in each tuple
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
|
@ -44,28 +48,40 @@ class DataAggregationIterator implements \Iterator, \Countable {
|
|||
* @param integer $size
|
||||
* @param integer $tuples
|
||||
*/
|
||||
public function __construct(\PDOStatement $stmt, $rows, $count) {
|
||||
$this->iterator = new DataIterator($stmt, $rows);
|
||||
public function __construct(\PDOStatement $stmt, $rowCount, $tupleCount) {
|
||||
$this->rowCount = $rowCount;
|
||||
$this->tupleCount = $tupleCount;
|
||||
|
||||
$this->stmt = $stmt;
|
||||
$this->stmt->setFetchMode(\PDO::FETCH_NUM);
|
||||
|
||||
$this->packageSize = floor($rows / $count);
|
||||
$this->size = $count;
|
||||
if ($this->rowCount > $this->tupleCount) {
|
||||
$this->packageSize = floor($this->rowCount / $this->tupleCount);
|
||||
$this->tupleCount = floor($this->rowCount / $this->packageSize) + $this->rowCount % $this->packageSize;
|
||||
}
|
||||
else {
|
||||
$this->packageSize = 1;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Aggregate data
|
||||
*/
|
||||
public function next() {
|
||||
$this->current = array(0, 0, 0);
|
||||
for ($i = 0; $i < $this->packageSize; $i++, $this->iterator->next()) {
|
||||
$tuple = $this->iterator->current();
|
||||
$package = array(0, 0, 0);
|
||||
for ($i = 0; $i < $this->packageSize && $this->valid(); $i++) {
|
||||
$tuple = $this->stmt->fetch();
|
||||
|
||||
$this->current[0] = $tuple[0];
|
||||
$this->current[1] += $tuple[1];
|
||||
$this->current[2] += $tuple[2];
|
||||
$package[0] = $tuple[0];
|
||||
$package[1] += $tuple[1];
|
||||
$package[2] += $tuple[2];
|
||||
|
||||
$this->rowKey++;
|
||||
}
|
||||
|
||||
|
||||
$this->key++;
|
||||
return $this->current;
|
||||
Util\Debug::log('key++', $this->key);
|
||||
return $this->current = $package;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -75,24 +91,18 @@ class DataAggregationIterator implements \Iterator, \Countable {
|
|||
* PDOStatement hasn't a rewind()
|
||||
*/
|
||||
public function rewind() {
|
||||
$this->iterator->rewind();
|
||||
// skip first readings to get an even divisor
|
||||
$skip = count($this->iterator) - count($this) * $this->packageSize;
|
||||
for ($i = 0; $i < $skip; $i++) {
|
||||
$this->iterator->next();
|
||||
}
|
||||
return $this->next();
|
||||
$this->key = $this->rowKey = 0;
|
||||
}
|
||||
|
||||
public function valid() {
|
||||
return $this->key <= $this->size;
|
||||
return $this->rowKey < $this->rowCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Getter & setter
|
||||
*/
|
||||
public function getPackageSize() { return $this->packageSize; }
|
||||
public function count() { return $this->size; }
|
||||
public function count() { return $this->tupleCount; }
|
||||
public function key() { return $this->key; }
|
||||
public function current() { return $this->current; }
|
||||
}
|
|
@ -43,6 +43,7 @@ abstract class Interpreter {
|
|||
|
||||
protected $from;
|
||||
protected $to;
|
||||
protected $groupBy;
|
||||
|
||||
protected $rowCount = NULL;
|
||||
protected $tupleCount = NULL;
|
||||
|
@ -55,14 +56,25 @@ abstract class Interpreter {
|
|||
* @param integer $from timestamp in ms since 1970
|
||||
* @param integer $to timestamp in ms since 1970
|
||||
*/
|
||||
public function __construct(Model\Channel $channel, ORM\EntityManager $em, $from, $to) {
|
||||
public function __construct(Model\Channel $channel, ORM\EntityManager $em, $from, $to, $tupleCount, $groupBy) {
|
||||
$this->channel = $channel;
|
||||
$this->tupleCount = $tupleCount;
|
||||
$this->groupBy = $groupBy;
|
||||
|
||||
// get dbal connection from EntityManager
|
||||
$this->conn = $em->getConnection();
|
||||
|
||||
$this->from = (isset($from)) ? self::parseDateTimeString($from, time() * 1000) : NULL;
|
||||
$this->to = (isset($to)) ? self::parseDateTimeString($to, (isset($this->from)) ? $this->from : time() * 1000) : NULL;
|
||||
$this->from = $from;
|
||||
$this->to = $to;
|
||||
|
||||
// parse interval
|
||||
if (isset($from)) {
|
||||
$this->from = self::parseDateTimeString($from, time() * 1000);
|
||||
}
|
||||
|
||||
if (isset($to)) {
|
||||
$this->to = self::parseDateTimeString($to, (isset($this->from)) ? $this->from : time() * 1000);
|
||||
}
|
||||
|
||||
if (isset($this->from) && isset($this->to) && $this->from > $this->to) {
|
||||
throw new \Exception('&from is larger than &to parameter');
|
||||
|
@ -75,13 +87,13 @@ abstract class Interpreter {
|
|||
* @param string|integer $groupBy
|
||||
* @return Volkszaehler\DataIterator
|
||||
*/
|
||||
protected function getData($count = NULL, $groupBy = NULL) {
|
||||
protected function getData() {
|
||||
// prepare sql
|
||||
$sql['from'] = ' FROM data';
|
||||
$sql['where'] = ' WHERE channel_id = ?' . self::buildDateTimeFilterSQL($this->from, $this->to);
|
||||
$sql['orderBy'] = ' ORDER BY timestamp ASC';
|
||||
|
||||
if ($groupBy && $sql['groupFields'] = self::buildGroupBySQL($groupBy)) {
|
||||
if ($this->groupBy && $sql['groupFields'] = self::buildGroupBySQL($this->groupBy)) {
|
||||
$sql['rowCount'] = 'SELECT COUNT(DISTINCT ' . $sql['groupFields'] . ')' . $sql['from'] . $sql['where'];
|
||||
$sql['fields'] = ' MAX(timestamp) AS timestamp, SUM(value) AS value, COUNT(timestamp) AS count';
|
||||
$sql['groupBy'] = ' GROUP BY ' . $sql['groupFields'];
|
||||
|
@ -98,15 +110,10 @@ abstract class Interpreter {
|
|||
// query for data
|
||||
$stmt = $this->conn->executeQuery('SELECT ' . $sql['fields'] . $sql['from'] . $sql['where'] . $sql['groupBy'] . $sql['orderBy'], array($this->channel->getId()));
|
||||
|
||||
// return iterators
|
||||
if ($sql['groupBy'] || is_null($count) || $this->rowCount < $count) {
|
||||
$this->tupleCount = $this->rowCount;
|
||||
return new Iterator\DataIterator($stmt, $this->rowCount);
|
||||
}
|
||||
else {
|
||||
$this->tupleCount = $count;
|
||||
return new Iterator\DataAggregationIterator($stmt, $this->rowCount, $count);
|
||||
}
|
||||
Util\Debug::log('rowcount', $this->rowCount);
|
||||
Util\Debug::log('tuplecount', $this->tupleCount);
|
||||
|
||||
return new DataIterator($stmt, $this->rowCount, $this->tupleCount);
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,100 +0,0 @@
|
|||
<?php
|
||||
/**
|
||||
* @package default
|
||||
* @copyright Copyright (c) 2010, The volkszaehler.org project
|
||||
* @license http://www.gnu.org/licenses/gpl.txt GNU Public License
|
||||
*/
|
||||
/*
|
||||
* This file is part of volkzaehler.org
|
||||
*
|
||||
* volkzaehler.org is free software: you can redistribute it and/or modify
|
||||
* it under the terms of the GNU General Public License as published by
|
||||
* the Free Software Foundation, either version 3 of the License, or
|
||||
* any later version.
|
||||
*
|
||||
* volkzaehler.org is distributed in the hope that it will be useful,
|
||||
* but WITHOUT ANY WARRANTY; without even the implied warranty of
|
||||
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
|
||||
* GNU General Public License for more details.
|
||||
*
|
||||
* You should have received a copy of the GNU General Public License
|
||||
* along with volkszaehler.org. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
namespace Volkszaehler\Interpreter\Iterator;
|
||||
|
||||
use Volkszaehler\Util;
|
||||
use Doctrine\DBAL;
|
||||
|
||||
/**
|
||||
* @author Steffen Vogel <info@steffenvogel.de>
|
||||
* @package default
|
||||
*/
|
||||
class DataIterator implements \Iterator, \Countable {
|
||||
protected $current; // the current data
|
||||
protected $key; // key
|
||||
protected $stmt; // PDOStatement
|
||||
protected $size; // total readings in PDOStatement
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param \PDOStatement $stmt
|
||||
* @param integer $size
|
||||
*/
|
||||
public function __construct(\PDOStatement $stmt, $size) {
|
||||
$this->size = $size;
|
||||
$this->stmt = $stmt;
|
||||
$this->stmt->setFetchMode(\PDO::FETCH_NUM);
|
||||
}
|
||||
|
||||
/**
|
||||
* @return array with data
|
||||
*/
|
||||
public function current() {
|
||||
return $this->current;
|
||||
}
|
||||
|
||||
/**
|
||||
* Fetch next row from database
|
||||
*/
|
||||
public function next() {
|
||||
$this->key++;
|
||||
return $this->current = $this->stmt->fetch();
|
||||
}
|
||||
|
||||
/**
|
||||
* @return integer the nth data row
|
||||
*/
|
||||
public function key() {
|
||||
return $this->key;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return boolean do we have another row in the resultset?
|
||||
*/
|
||||
public function valid() {
|
||||
return is_array($this->current);
|
||||
}
|
||||
|
||||
/**
|
||||
* Rewind the iterator
|
||||
*
|
||||
* Should only be called once
|
||||
* PDOStatement hasn't a rewind() aquivalent
|
||||
*/
|
||||
public function rewind() {
|
||||
$this->key = 0;
|
||||
return $this->current = $this->stmt->fetch();
|
||||
}
|
||||
|
||||
/**
|
||||
* Get total num of rows
|
||||
* @return integer
|
||||
*/
|
||||
public function count() {
|
||||
return $this->size;
|
||||
}
|
||||
}
|
||||
|
||||
?>
|
|
@ -85,14 +85,15 @@ class MeterInterpreter extends Interpreter {
|
|||
* @todo untested
|
||||
* @return array with timestamp, values, and pulse count
|
||||
*/
|
||||
public function processData($count, $groupBy, $callback) {
|
||||
$pulses = parent::getData($count, $groupBy);
|
||||
public function processData($callback) {
|
||||
$pulses = parent::getData();
|
||||
|
||||
$this->resolution = $this->channel->getProperty('resolution');
|
||||
$this->pulseCount = 0;
|
||||
|
||||
$tuples = array();
|
||||
$last = $pulses->rewind();
|
||||
$pulses->rewind();
|
||||
$last = $pulses->next();
|
||||
$next = $pulses->next();
|
||||
|
||||
while ($pulses->valid()) {
|
||||
|
|
|
@ -185,19 +185,6 @@ abstract class Entity {
|
|||
public function getUuid() { return $this->uuid; } // read only
|
||||
public function getType() { return $this->type; } // read only
|
||||
public function getDefinition() { return Definition\EntityDefinition::get($this->type); }
|
||||
|
||||
/**
|
||||
* Get interpreter to obtain data and statistical information for a given time interval
|
||||
*
|
||||
* @param Doctrine\ORM\EntityManager $em
|
||||
* @param integer $from timestamp in ms since 1970
|
||||
* @param integer $to timestamp in ms since 1970
|
||||
* @return Interpreter
|
||||
*/
|
||||
public function getInterpreter(\Doctrine\ORM\EntityManager $em, $from, $to) {
|
||||
$class = $this->getDefinition()->getInterpreter();
|
||||
return new $class($this, $em, $from, $to);
|
||||
}
|
||||
}
|
||||
|
||||
?>
|
||||
|
|
|
@ -202,8 +202,6 @@ class JSON extends View {
|
|||
$this->json['data']['uuid'] = $interpreter->getEntity()->getUuid();
|
||||
|
||||
$data = $interpreter->processData(
|
||||
$this->request->getParameter('tuples'),
|
||||
$this->request->getParameter('group'),
|
||||
function($tuple) {
|
||||
return array(
|
||||
$tuple[0],
|
||||
|
|
Loading…
Add table
Reference in a new issue