Commit c044bf1e authored by Sören Henning's avatar Sören Henning

worked on #16

parent 683c888a
package teead;
import java.io.File;
import java.time.Duration;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import teead.aggregation.MeanAggregator;
import teead.forecast.ARIMAForecaster;
import teead.storage.CassandraDriver;
import teetime.framework.Configuration;
public class AnomalyDetectionConfiguration extends Configuration {
public AnomalyDetectionConfiguration() {
String ipAddress = "192.168.99.100";
int port = 32770;
String keyspace = "demo3";
Cluster cluster = Cluster.builder().addContactPoint(ipAddress).withPort(port).build();
Session session = cluster.connect(keyspace);
final MeasurementGenerator gen = new MeasurementGenerator();
gen.setFunction(x -> 500 * Math.sin(x / 60) + 2000);
gen.setNoise(250);
gen.setAnomalyProbability(0.01);
gen.setAnomayStrength(1000);
gen.setMinStepDistance(Duration.ofMillis(500));
gen.setMinStepDistance(Duration.ofSeconds(2));
// Create the stages
final RealTimeMeasurementGeneratorStage generator = new RealTimeMeasurementGeneratorStage(gen, 3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.sin(x / 60) + 2000, 250, 0.01, 1000,
// Duration.ofSeconds(1),3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.pow(Math.sin(x / 60), 1 / 101) + 2000, 250, 0.01, 1000,
// Duration.ofSeconds(5), 3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 0.5 * x + 1000, 250, 0.01, 1000, Duration.ofSeconds(1),
// 900);
// final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(30), new MeanAggregator(), new
// ARIMAForecaster("192.168.99.100", 6311));
// final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),new
// RegressionForecaster());
final AnomalyDetectionStage anomalyDetector = new StorableAnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),
new ARIMAForecaster("192.168.99.100", 32777), new CassandraDriver(session, "measurements", "temp2"));
final AlertPrinterStage alerter = new AlertPrinterStage();
final PrinterStage printer = new PrinterStage(); // TODO Temp
final JSONExportStage jsonExporter = new JSONExportStage(new File("values.json")); // TODO Temp
// Connect the stages
super.connectPorts(generator.getOutputPort(), anomalyDetector.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(0.9), alerter.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(), printer.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(), jsonExporter.getInputPort());
// cluster.close();
}
}
package teead;
import java.io.File;
import java.time.Duration;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.Session;
import teead.aggregation.MeanAggregator;
import teead.forecast.ARIMAForecaster;
import teead.storage.CassandraAdapter;
import teetime.framework.Configuration;
public class AnomalyDetectionConfiguration extends Configuration {
public AnomalyDetectionConfiguration() {
String ipAddress = "192.168.99.100";
int port = 32770;
String keyspace = "demo3";
Cluster cluster = Cluster.builder().addContactPoint(ipAddress).withPort(port).build();
Session session = cluster.connect(keyspace);
final MeasurementGenerator gen = new MeasurementGenerator();
gen.setFunction(x -> 500 * Math.sin(x / 60) + 2000);
gen.setNoise(250);
gen.setAnomalyProbability(0.01);
gen.setAnomayStrength(1000);
gen.setMinStepDistance(Duration.ofMillis(500));
gen.setMinStepDistance(Duration.ofSeconds(2));
// Create the stages
final RealTimeMeasurementGeneratorStage generator = new RealTimeMeasurementGeneratorStage(gen, 3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.sin(x / 60) + 2000, 250, 0.01, 1000,
// Duration.ofSeconds(1),3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 500 * Math.pow(Math.sin(x / 60), 1 / 101) + 2000, 250, 0.01, 1000,
// Duration.ofSeconds(5), 3600);
// final MeassurementsGeneratorStage generator = new MeassurementsGeneratorStage(x -> 0.5 * x + 1000, 250, 0.01, 1000, Duration.ofSeconds(1),
// 900);
// final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(30), new MeanAggregator(), new
// ARIMAForecaster("192.168.99.100", 6311));
// final AnomalyDetectionStage anomalyDetector = new AnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),new
// RegressionForecaster());
final AnomalyDetectionStage anomalyDetector = new StorableAnomalyDetectionStage(Duration.ofHours(1), Duration.ofSeconds(5), new MeanAggregator(),
new ARIMAForecaster("192.168.99.100", 32777), new CassandraAdapter(session, "measurements", "temp2"));
final AlertPrinterStage alerter = new AlertPrinterStage();
final PrinterStage printer = new PrinterStage(); // TODO Temp
final JSONExportStage jsonExporter = new JSONExportStage(new File("values.json")); // TODO Temp
// Connect the stages
super.connectPorts(generator.getOutputPort(), anomalyDetector.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(0.9), alerter.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(), printer.getInputPort());
super.connectPorts(anomalyDetector.getNewOutputPort(), jsonExporter.getInputPort());
// cluster.close();
}
}
package teead;
import java.time.Duration;
import java.time.Instant;
import teead.aggregation.Aggregator;
import teead.forecast.Forecaster;
import teead.storage.StorageDriver;
import teead.timeseries.BoundedTimeSeries;
public class StorableAnomalyDetectionStage extends AnomalyDetectionStage {
public StorableAnomalyDetectionStage(final Duration slidingWindowDuration, final Duration normalizationDuration, final Aggregator aggregator,
final Forecaster forecaster, final StorageDriver storageDriver) {
super(new BoundedTimeSeries(slidingWindowDuration, storageDriver.retrieveTimeSeries(Instant.now().minus(slidingWindowDuration), Instant.now())),
normalizationDuration, aggregator, forecaster);
final StorageStage storageStage = new StorageStage(storageDriver);
super.connectPorts(super.getNewOutputPort(), storageStage.getInputPort());
}
}
package teead;
import java.time.Duration;
import java.time.Instant;
import teead.aggregation.Aggregator;
import teead.forecast.Forecaster;
import teead.storage.StorageAdapter;
import teead.timeseries.BoundedTimeSeries;
public class StorableAnomalyDetectionStage extends AnomalyDetectionStage {
public StorableAnomalyDetectionStage(final Duration slidingWindowDuration, final Duration normalizationDuration, final Aggregator aggregator,
final Forecaster forecaster, final StorageAdapter storageDriver) {
super(new BoundedTimeSeries(slidingWindowDuration, storageDriver.retrieveTimeSeries(Instant.now().minus(slidingWindowDuration), Instant.now())),
normalizationDuration, aggregator, forecaster);
final StorageStage storageStage = new StorageStage(storageDriver);
super.connectPorts(super.getNewOutputPort(), storageStage.getInputPort());
}
}
package teead;
import teead.measurement.AnomalyScoredMeasurement;
import teead.storage.StorageDriver;
import teetime.framework.AbstractConsumerStage;
/**
* This stage stores incoming {@code Measurement}s using a given {@code StorageDriver}.
*
* @author Sören Henning
*
*/
public class StorageStage extends AbstractConsumerStage<AnomalyScoredMeasurement> {
private final StorageDriver storageDriver;
public StorageStage(final StorageDriver storageDriver) {
this.storageDriver = storageDriver;
}
@Override
protected void execute(final AnomalyScoredMeasurement measurement) {
storageDriver.storeMeasurement(measurement);
}
}
package teead;
import teead.measurement.AnomalyScoredMeasurement;
import teead.storage.StorageAdapter;
import teetime.framework.AbstractConsumerStage;
/**
* This stage stores incoming {@code Measurement}s using a given {@code StorageDriver}.
*
* @author Sören Henning
*
*/
public class StorageStage extends AbstractConsumerStage<AnomalyScoredMeasurement> {
private final StorageAdapter storageDriver;
public StorageStage(final StorageAdapter storageDriver) {
this.storageDriver = storageDriver;
}
@Override
protected void execute(final AnomalyScoredMeasurement measurement) {
storageDriver.storeMeasurement(measurement);
}
}
package teead.storage;
import java.time.Instant;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import teead.measurement.AnomalyScoredMeasurement;
import teead.timeseries.TimeSeries;
import teead.timeseries.TimeSeriesPoint;
public class CassandraDriver implements StorageDriver {
private final Session session;
private final String table;
private final String seriesId;
private String seriesIdColumn = "series_id";
private String timeColumn = "time";
private String nanoColumn = "nanos";
private String measurementColumn = "measurement";
private String predictionColumn = "prediction";
private String anomalyscoreColumn = "anomalyscore";
public CassandraDriver(final Session session, final String table, final String seriesId) {
this.session = session;
this.table = table;
this.seriesId = seriesId;
if (session.getLoggedKeyspace() == null) {
throw new IllegalArgumentException("No keyspace set.");
}
createTableIfNotExists();
}
public String getSeriesIdColumn() {
return seriesIdColumn;
}
public void setSeriesIdColumn(final String seriesIdColumn) {
this.seriesIdColumn = seriesIdColumn;
}
public String getTimeColumn() {
return timeColumn;
}
public void setTimeColumn(final String timeColumn) {
this.timeColumn = timeColumn;
}
public String getNanoColumn() {
return nanoColumn;
}
public void setNanoColumn(final String nanoColumn) {
this.nanoColumn = nanoColumn;
}
public String getMeasurementColumn() {
return measurementColumn;
}
public void setMeasurementColumn(final String measurementColumn) {
this.measurementColumn = measurementColumn;
}
public String getPredictionColumn() {
return predictionColumn;
}
public void setPredictionColumn(final String predictionColumn) {
this.predictionColumn = predictionColumn;
}
public String getAnomalyscoreColumn() {
return anomalyscoreColumn;
}
public void setAnomalyscoreColumn(final String anomalyscoreColumn) {
this.anomalyscoreColumn = anomalyscoreColumn;
}
@Override
public TimeSeries retrieveTimeSeries(final Instant start, final Instant end) {
// Retrieving time series just works on milliseconds precision
final Select statement = QueryBuilder.select(this.timeColumn, this.nanoColumn, this.measurementColumn)
.from(this.table)
.where(QueryBuilder.eq(this.seriesIdColumn, this.seriesId))
.and(QueryBuilder.gte(this.timeColumn, start.toEpochMilli()))
.and(QueryBuilder.lte(this.timeColumn, end.toEpochMilli()))
.orderBy(QueryBuilder.asc(this.timeColumn));
final ResultSet results = session.execute(statement);
final TimeSeries timeSeries = new TimeSeries();
for (Row row : results) {
final Instant millisTime = row.getTimestamp(this.timeColumn).toInstant();
final Instant time = millisTime.plusNanos(row.getInt(this.nanoColumn));
final double value = row.getDouble(this.measurementColumn);
timeSeries.appendEnd(new TimeSeriesPoint(time, value));
}
return timeSeries;
}
@Override
public void storeMeasurement(final AnomalyScoredMeasurement measurement) {
final Insert statement = QueryBuilder
.insertInto(this.table)
.value(this.seriesIdColumn, this.seriesId)
.value(this.timeColumn, measurement.getTime().toEpochMilli())
.value(this.nanoColumn, measurement.getTime().getNano() % 1_000_000)
.value(this.measurementColumn, measurement.getValue())
.value(this.predictionColumn, measurement.getPrediction())
.value(this.anomalyscoreColumn, measurement.getAnomalyScore());
session.execute(statement);
}
private void createTableIfNotExists() {
session.execute(
"CREATE TABLE IF NOT EXISTS " + this.table + " (" +
this.seriesIdColumn + " text," +
this.timeColumn + " timestamp," +
this.nanoColumn + " int," +
this.measurementColumn + " double," +
this.predictionColumn + " double," +
this.anomalyscoreColumn + " double," +
"PRIMARY KEY (" + this.seriesIdColumn + ", " + this.timeColumn + ", " + this.nanoColumn + ")" +
");");
}
}
package teead.storage;
import java.time.Instant;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import com.datastax.driver.core.querybuilder.Insert;
import com.datastax.driver.core.querybuilder.QueryBuilder;
import com.datastax.driver.core.querybuilder.Select;
import teead.measurement.AnomalyScoredMeasurement;
import teead.timeseries.TimeSeries;
import teead.timeseries.TimeSeriesPoint;
public class CassandraAdapter implements StorageAdapter {
private final CassandraManager manager;
private final String table;
private final String seriesId;
private String seriesIdColumn = "series_id";
private String timeColumn = "time";
private String nanoColumn = "nanos";
private String measurementColumn = "measurement";
private String predictionColumn = "prediction";
private String anomalyscoreColumn = "anomalyscore";
public CassandraAdapter(final Session session, final String table, final String seriesId) {
this(new StaticInstanceCassandraManager(session), table, seriesId);
}
public CassandraAdapter(final CassandraManager manager, final String table, final String seriesId) {
this.manager = manager;
this.table = table;
this.seriesId = seriesId;
createTableIfNotExists();
}
public String getSeriesIdColumn() {
return seriesIdColumn;
}
public void setSeriesIdColumn(final String seriesIdColumn) {
this.seriesIdColumn = seriesIdColumn;
}
public String getTimeColumn() {
return timeColumn;
}
public void setTimeColumn(final String timeColumn) {
this.timeColumn = timeColumn;
}
public String getNanoColumn() {
return nanoColumn;
}
public void setNanoColumn(final String nanoColumn) {
this.nanoColumn = nanoColumn;
}
public String getMeasurementColumn() {
return measurementColumn;
}
public void setMeasurementColumn(final String measurementColumn) {
this.measurementColumn = measurementColumn;
}
public String getPredictionColumn() {
return predictionColumn;
}
public void setPredictionColumn(final String predictionColumn) {
this.predictionColumn = predictionColumn;
}
public String getAnomalyscoreColumn() {
return anomalyscoreColumn;
}
public void setAnomalyscoreColumn(final String anomalyscoreColumn) {
this.anomalyscoreColumn = anomalyscoreColumn;
}
@Override
public TimeSeries retrieveTimeSeries(final Instant start, final Instant end) {
// Retrieving time series just works on milliseconds precision
final Select statement = QueryBuilder.select(this.timeColumn, this.nanoColumn, this.measurementColumn)
.from(this.table)
.where(QueryBuilder.eq(this.seriesIdColumn, this.seriesId))
.and(QueryBuilder.gte(this.timeColumn, start.toEpochMilli()))
.and(QueryBuilder.lte(this.timeColumn, end.toEpochMilli()))
.orderBy(QueryBuilder.asc(this.timeColumn));
final ResultSet results = this.manager.getSession().execute(statement);
final TimeSeries timeSeries = new TimeSeries();
for (Row row : results) {
final Instant millisTime = row.getTimestamp(this.timeColumn).toInstant();
final Instant time = millisTime.plusNanos(row.getInt(this.nanoColumn));
final double value = row.getDouble(this.measurementColumn);
timeSeries.appendEnd(new TimeSeriesPoint(time, value));
}
return timeSeries;
}
@Override
public void storeMeasurement(final AnomalyScoredMeasurement measurement) {
final Insert statement = QueryBuilder
.insertInto(this.table)
.value(this.seriesIdColumn, this.seriesId)
.value(this.timeColumn, measurement.getTime().toEpochMilli())
.value(this.nanoColumn, measurement.getTime().getNano() % 1_000_000)
.value(this.measurementColumn, measurement.getValue())
.value(this.predictionColumn, measurement.getPrediction())
.value(this.anomalyscoreColumn, measurement.getAnomalyScore());
this.manager.getSession().execute(statement);
}
private void createTableIfNotExists() {
this.manager.getSession().execute(
"CREATE TABLE IF NOT EXISTS " + this.table + " (" +
this.seriesIdColumn + " text," +
this.timeColumn + " timestamp," +
this.nanoColumn + " int," +
this.measurementColumn + " double," +
this.predictionColumn + " double," +
this.anomalyscoreColumn + " double," +
"PRIMARY KEY (" + this.seriesIdColumn + ", " + this.timeColumn + ", " + this.nanoColumn + ")" +
");");
}
}
package teead.storage;
import com.datastax.driver.core.Session;
/**
* Classes that implements this interface must provide logic to return a
* {@link Session} on a {@link #getSession()} method call.
*
* @author Sören Henning
*
*/
public interface CassandraManager {
public Session getSession();
}
package teead.storage;
import com.datastax.driver.core.Session;
/**
* {@link CassandraManager} that holds a given instance of a Cassandra
* {@link Session}, returns it on {@link #getSession()} and does nothing more.
*
* It is required that the {@link Session} has a set keyspace.
*
* @author Sören Henning
*
*/
public class StaticInstanceCassandraManager implements CassandraManager {
private final Session session;
public StaticInstanceCassandraManager(final Session session) {
if (session.getLoggedKeyspace() == null) {
throw new IllegalArgumentException("No keyspace set.");
}
this.session = session;
}
@Override
public Session getSession() {
return this.session;
}
}
package teead.storage;
import java.time.Instant;
import teead.measurement.AnomalyScoredMeasurement;
import teead.timeseries.TimeSeries;
public interface StorageDriver {
public TimeSeries retrieveTimeSeries(final Instant start, final Instant end);
public void storeMeasurement(final AnomalyScoredMeasurement measurement);
}
package teead.storage;
import java.time.Instant;
import teead.measurement.AnomalyScoredMeasurement;
import teead.timeseries.TimeSeries;
public interface StorageAdapter {
public TimeSeries retrieveTimeSeries(final Instant start, final Instant end);
public void storeMeasurement(final AnomalyScoredMeasurement measurement);
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment