Commit 1055c338 authored by Maximilian	Brütt's avatar Maximilian Brütt

decorator and facadish approaches

parent f94895a2
package teetime.framework;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
/**
* Decorator for work stealing
*
* @param <T>
* type of the tasks the stage processes
*/
// BETTER Could be typed for a <T extends AbstractStage> if there were no CompositeStages and the like?
public class ThievishStage<T> extends AbstractStage {
// private final OutputPort<T> output = createOutputPort();
private final InputPort<T> input = createInputPort();
private final AbstractStage decoratedStage;
private final ArrayList<ThievishStage<T>> peers = new ArrayList<ThievishStage<T>>();
private final Deque<T> tasks = new LinkedList<T>();
// TODO strategy
public ThievishStage(final AbstractStage stage) {
decoratedStage = stage;
}
public void addPeer(final ThievishStage<T> stage) {
peers.add(stage);
}
@Override
protected void execute() {
// TODO Access the decorated stage's input?
// -> tasks.add(input)
// output.send(tasks.pollFirst());
decoratedStage.execute();
}
/**
* Intended to be called by other thieves.
*/
public T getTask() {
return tasks.pollLast();
}
/**
* Attempt to steal a task from a fellow thief.
*
* @return whether the theft was successful
*/
private boolean attemptTheft() {
int index = (int) (peers.size() * Math.random());
ThievishStage<T> victim = peers.get(index);
try {
T haul = victim.getTask();
if (haul != null) {
tasks.add(haul);
return true;
}
} catch (Exception e) {
peers.remove(index);
}
return false;
}
/**
* Signal intercept
*/
@Override
public void onTerminating() throws Exception {
if (attemptTheft()) {
// ignore?
} else { // own deque is empty and no tasks to steal
super.onTerminating();
}
}
}
package teetime.stage;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import teetime.framework.AbstractStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.stage.basic.distributor.Distributor;
/**
* Should be considered a part of the preceding {@link Distributor} in terms of thread membership
*/
public class WorkstealingBalancer<T> extends AbstractStage {
private Class<T> type;
// output from the preceding stage is mapped to the input of the following up stage
private Map<OutputPort<T>, InputPort<T>> portMap;
// each input port is connected to its corresponding output via a deque
private List<Deque<T>> deques;
// TODO implement work stealing strategy as interface
/*
* public WorkstealingBalancer(final List<OutputPort<T>> sourcePorts, final List<InputPort<T>> targetPorts) throws Exception {
* // source ports have to match target ports in amount
* if (sourcePorts.size() != targetPorts.size()) {
* throw new Exception("WorkstealingBalancer: Argument counts don't match!");
* }
* for (int i = 0; i < sourcePorts.size(); i++) {
* portMap.put(sourcePorts.get(i), targetPorts.get(i));
* deques.add(new LinkedList<T>());
* }
* }
*/
public void addPorts(final OutputPort<T> source, final InputPort<T> target) {
// portMap.put(source, target);
portMap.put(this.createOutputPort(type), this.createInputPort(type));
deques.add(new LinkedList<T>());
}
@Override
protected void execute() {
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment