Commit 49cba9e5 authored by Sören Henning's avatar Sören Henning

seriesId is bound to storage driver

parent 43ea1d49
......@@ -43,7 +43,7 @@ public class AnomalyDetectionConfiguration extends Configuration {
// final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),new
// RegressionForecaster());
final AnomalyDetectionStage anomalyDetector = new StorableAnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),
new RegressionForecaster(), new CassandraDriver(session, "measurements"));
new RegressionForecaster(), new CassandraDriver(session, "measurements", "temp"));
final AlertPrinterStage alerter = new AlertPrinterStage();
final PrinterStage printer = new PrinterStage(); // TODO Temp
final JSONExportStage jsonExporter = new JSONExportStage(new File("values.json")); // TODO Temp
......
......@@ -5,7 +5,7 @@ import anomalydetection.storage.StorageDriver;
import teetime.framework.AbstractConsumerStage;
/**
* Dummy stage that should later handles the storage of measurements
* This stage stores incoming {@code Measurement}s using a given {@code StorageDriver}.
*
* @author Sören Henning
*
......@@ -20,7 +20,7 @@ public class StorageStage extends AbstractConsumerStage<AnomalyScoredMeasurement
@Override
protected void execute(final AnomalyScoredMeasurement measurement) {
storageDriver.storeMeasurement("temp", measurement); // TODO seriesid
storageDriver.storeMeasurement(measurement);
}
}
......@@ -17,6 +17,7 @@ public class CassandraDriver implements StorageDriver {
private final Session session;
private final String table;
private final String seriesId;
private String seriesIdColumn = "series_id";
private String timeColumn = "time";
......@@ -24,9 +25,10 @@ public class CassandraDriver implements StorageDriver {
private String predictionColumn = "prediction";
private String anomalyscoreColumn = "anomalyscore";
public CassandraDriver(final Session session, final String table) {
public CassandraDriver(final Session session, final String table, final String seriesId) {
this.session = session;
this.table = table;
this.seriesId = seriesId;
if (session.getLoggedKeyspace() == null) {
throw new IllegalArgumentException("No keyspace set.");
......@@ -76,10 +78,10 @@ public class CassandraDriver implements StorageDriver {
}
@Override
public TimeSeries retrieveTimeSeries(final String seriesId, final Instant start, final Instant end) {
public TimeSeries retrieveTimeSeries(final Instant start, final Instant end) {
final Select statement = QueryBuilder.select(this.timeColumn, this.measurementColumn)
.from(this.table)
.where(QueryBuilder.eq(this.seriesIdColumn, seriesId))
.where(QueryBuilder.eq(this.seriesIdColumn, this.seriesId))
.and(QueryBuilder.gte(this.timeColumn, start.toEpochMilli()))
.and(QueryBuilder.lte(this.timeColumn, end.toEpochMilli()))
.orderBy(QueryBuilder.asc(this.timeColumn));
......@@ -97,10 +99,10 @@ public class CassandraDriver implements StorageDriver {
}
@Override
public void storeMeasurement(final String seriesId, final AnomalyScoredMeasurement measurement) {
public void storeMeasurement(final AnomalyScoredMeasurement measurement) {
final Insert statement = QueryBuilder
.insertInto(this.table)
.value(this.seriesIdColumn, seriesId)
.value(this.seriesIdColumn, this.seriesId)
.value(this.timeColumn, measurement.getTime().toEpochMilli())
.value(this.measurementColumn, measurement.getValue())
.value(this.predictionColumn, measurement.getPrediction())
......
......@@ -7,8 +7,8 @@ import anomalydetection.timeseries.TimeSeries;
public interface StorageDriver {
public TimeSeries retrieveTimeSeries(final String seriesid, final Instant start, final Instant end);
public TimeSeries retrieveTimeSeries(final Instant start, final Instant end);
public void storeMeasurement(final String seriesid, final AnomalyScoredMeasurement measurement);
public void storeMeasurement(final AnomalyScoredMeasurement measurement);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment