Commit 3d5e9585 authored by Sören Henning's avatar Sören Henning

worked on cassandra integration

parent d46cf2b9
package anomalydetection;
import java.time.Instant;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import anomalydetection.measurement.AnomalyScoredMeasurement;
import anomalydetection.storage.CassandraDriver;
import anomalydetection.timeseries.TimeSeries;
import teetime.framework.Execution;
public class AnomalyDetectionTool {
public static void main(final String[] args) {
String ipAddress = "192.168.99.100";
int port = 32770;
String keyspace = "demo2";
Cluster cluster = Cluster.builder().addContactPoint(ipAddress).withPort(port).build();
Session session = cluster.connect(keyspace);
CassandraDriver cassandraDriver = new CassandraDriver(session, "measurements");
cassandraDriver.storeMeasurement("abc", new AnomalyScoredMeasurement(Instant.now(), 1200, 1100, 0.11));
TimeSeries timeSeries = cassandraDriver.retrieveTimeSeries("abc", Instant.now().minusSeconds(60 * 60), Instant.now());
System.out.println(timeSeries);
session.close();
final AnomalyDetectionConfiguration configuration = new AnomalyDetectionConfiguration();
final Execution<AnomalyDetectionConfiguration> analysis = new Execution<AnomalyDetectionConfiguration>(configuration);
analysis.executeBlocking();
......
......@@ -2,25 +2,73 @@ package anomalydetection.storage;
import java.time.Instant;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import anomalydetection.measurement.AnomalyScoredMeasurement;
import anomalydetection.timeseries.TimeSeries;
import anomalydetection.timeseries.TimeSeriesPoint;
public class CassandraDriver implements StorageDriver {
public CassandraDriver() {
// TODO Auto-generated constructor stub
private final Session session;
private final String table;
public CassandraDriver(final Session session, final String table) {
this.session = session;
this.table = table;
createTableIfNotExists();
}
@Override
public TimeSeries retrieveTimeSeries(Instant start, Instant end) {
// TODO Auto-generated method stub
return null;
public TimeSeries retrieveTimeSeries(final String seriesId, final Instant start, final Instant end) {
Select statement = QueryBuilder.select("time", "measurement")
.from(this.table)
.where(QueryBuilder.eq("series_id", seriesId))
.and(QueryBuilder.gte("time", start.toEpochMilli()))
.and(QueryBuilder.lte("time", end.toEpochMilli()))
.orderBy(QueryBuilder.asc("time"));
System.out.println(statement.toString());
ResultSet results = session.execute(statement);
TimeSeries timeSeries = new TimeSeries();
for (Row row : results) {
Instant time = row.getTimestamp("time").toInstant();
double value = row.getDouble("measurement");
timeSeries.appendEnd(new TimeSeriesPoint(time, value));
}
return timeSeries;
}
@Override
public void storeMeasurement(AnomalyScoredMeasurement measurement) {
// TODO Auto-generated method stub
public void storeMeasurement(final String seriesId, final AnomalyScoredMeasurement measurement) {
final Insert statement = QueryBuilder
.insertInto(this.table)
.value("series_id", seriesId)
.value("time", measurement.getTime().toEpochMilli())
.value("measurement", measurement.getValue())
.value("prediction", measurement.getPrediction())
.value("anomalyscore", measurement.getAnomalyScore());
session.execute(statement);
}
private void createTableIfNotExists() {
session.execute(
"CREATE TABLE IF NOT EXISTS " + this.table + " (" +
"series_id text," +
"time timestamp," +
"measurement double," +
"prediction double," +
"anomalyscore double," +
"PRIMARY KEY (series_id, time)" +
");");
}
}
......@@ -7,8 +7,8 @@ import anomalydetection.timeseries.TimeSeries;
public interface StorageDriver {
public TimeSeries retrieveTimeSeries(final Instant start, final Instant end);
public TimeSeries retrieveTimeSeries(final String seriesid, final Instant start, final Instant end);
public void storeMeasurement(final AnomalyScoredMeasurement measurement);
public void storeMeasurement(final String seriesid, final AnomalyScoredMeasurement measurement);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment