Commit 941be7e5 authored by Maximilian	Brütt's avatar Maximilian Brütt

PWS added. Comments. Multi Steal still bugged.

parent edb4c297
package teetime.framework;
import java.util.ArrayList;
import java.util.Deque;
import java.util.LinkedList;
/**
* Decorator for work stealing
*
* @param <T>
* type of the tasks the stage processes
*/
// BETTER Could be typed for a <T extends AbstractStage> if there were no CompositeStages and the like?
public class ThievishStage<T> extends AbstractStage {
// private final OutputPort<T> output = createOutputPort();
private final InputPort<T> input = createInputPort();
private final AbstractStage decoratedStage;
private final ArrayList<ThievishStage<T>> peers = new ArrayList<ThievishStage<T>>();
private final Deque<T> tasks = new LinkedList<T>();
// TODO strategy
public ThievishStage(final AbstractStage stage) {
decoratedStage = stage;
}
public void addPeer(final ThievishStage<T> stage) {
peers.add(stage);
}
@Override
protected void execute() {
// TODO Access the decorated stage's input?
// -> tasks.add(input)
// output.send(tasks.pollFirst());
decoratedStage.execute();
}
/**
* Intended to be called by other thieves.
*/
public T getTask() {
return tasks.pollLast();
}
/**
* Attempt to steal a task from a fellow thief.
*
* @return whether the theft was successful
*/
private boolean attemptTheft() {
int index = (int) (peers.size() * Math.random());
ThievishStage<T> victim = peers.get(index);
try {
T haul = victim.getTask();
if (haul != null) {
tasks.add(haul);
return true;
}
} catch (Exception e) {
peers.remove(index);
}
return false;
}
/**
* Signal intercept
*/
@Override
public void onTerminating() throws Exception {
if (attemptTheft()) {
// ignore?
} else { // own deque is empty and no tasks to steal
super.onTerminating();
}
}
}
......@@ -26,6 +26,9 @@ import teetime.framework.StageState;
import teetime.framework.exceptionHandling.TerminateException;
import teetime.util.framework.concurrent.queue.ObservableSpMcArrayQueue;
/**
* Instances of this class should be created by the {@link WorkStealingPipeFactory} included in delivery.
*/
public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements IMonitorablePipe {
private final ObservableSpMcArrayQueue<Object> queue;
......@@ -34,19 +37,22 @@ public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements
// statistics
private int numWaits;
private int numSteals;
// TODO strategy implementation to slim the paramters
// TODO strategy implementation to slim the parameters and attributes. Extension vs IStrategy?
private final int haulSize;
private final int instanceIndex;
private final int indexTheftBound;
private final boolean busyCheck;
protected WorkStealingPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int haulSize, final int instanceIndex,
final int indexTheftBound) {
final int indexTheftBound, final boolean busyCheck) {
super(sourcePort, targetPort, 1024);
this.peers = new ArrayList<WorkStealingPipe<T>>();
this.queue = new ObservableSpMcArrayQueue<Object>(1024);
this.haulSize = haulSize;
this.instanceIndex = instanceIndex;
this.indexTheftBound = indexTheftBound;
this.busyCheck = busyCheck;
}
@Override
......@@ -56,13 +62,18 @@ public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements
return obj;
} else {
int index;
// indexTheftBound of 0 equals unbounded and thus randomized stealing behaviour.
if (indexTheftBound == 0) {
index = (new Random()).nextInt(peers.size());
WorkStealingPipe<T> victim;
if (busyCheck) {
victim = getBusiestPeer();
} else {
index = (instanceIndex + (new Random()).nextInt(indexTheftBound)) % peers.size();
// indexTheftBound of 0 equals unbounded and thus randomized stealing behaviour.
if (indexTheftBound == 0) {
index = (new Random()).nextInt(peers.size());
} else {
index = (instanceIndex + (new Random()).nextInt(indexTheftBound)) % peers.size();
}
victim = peers.get(index);
}
WorkStealingPipe<T> victim = peers.get(index);
// int maxSteal = Math.min(victim.size(), haulSize);
for (int i = 0; i < haulSize; i++) {
......@@ -88,10 +99,14 @@ public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements
}
}
/**
* Used to avoid recursion in the method call above.
*/
private Object steal() {
return this.queue.poll();
}
// statistics
public int getNumSteals() {
return numSteals;
}
......@@ -100,7 +115,21 @@ public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements
peers.add(pipe);
}
/******************/
/**
* Return the peer with the most tasks in its queue.
*/
private WorkStealingPipe<T> getBusiestPeer() {
WorkStealingPipe<T> max = peers.get(0), tmp;
for (int i = 1; i < peers.size(); i++) {
tmp = peers.get(i);
if (tmp.size() > max.size()) {
max = tmp;
}
}
return max;
}
// TODO Everything below was copied from the BoundedSynchedPipe. A cleanup could be a good idea.
@Override
public boolean add(final Object element) {
......
......@@ -27,6 +27,7 @@ import teetime.framework.OutputPort;
public class WorkStealingPipeFactory<T> {
private final List<WorkStealingPipe<T>> instances = new ArrayList<WorkStealingPipe<T>>();
// Serves as index in the created pipes itself. Used for offsetting the pipe's peer group if GWS is chosen below.
private int numOfInstances = 0;
/**
......@@ -43,8 +44,9 @@ public class WorkStealingPipeFactory<T> {
* The number of peers the pipe should be able to steal from. Enter 0 for default (randomized) behaviour.
* @return The pipe, ready to use.
*/
public WorkStealingPipe<T> create(final OutputPort<? extends T> source, final InputPort<T> target, final int theftBound, final int haulSize) {
WorkStealingPipe<T> newPipe = new WorkStealingPipe<T>(source, target, haulSize, numOfInstances++, theftBound);
private WorkStealingPipe<T> create(final OutputPort<? extends T> source, final InputPort<T> target, final int theftBound, final int haulSize,
final boolean busyCheck) {
WorkStealingPipe<T> newPipe = new WorkStealingPipe<T>(source, target, haulSize, numOfInstances++, theftBound, busyCheck);
for (WorkStealingPipe<T> pipe : instances) {
pipe.addPeer(newPipe);
......@@ -54,12 +56,30 @@ public class WorkStealingPipeFactory<T> {
return newPipe;
}
public WorkStealingPipe<T> create(final OutputPort<? extends T> source, final InputPort<T> target, final int theftBound) {
return create(source, target, theftBound, 1);
/**
* Create a pipe utilizing Randomized Work Stealing.
* It chooses its victim for stealing work from by utter randomness.
*/
public WorkStealingPipe<T> createRWSPipe(final OutputPort<? extends T> source, final InputPort<T> target) {
return create(source, target, 0, 1, false);
}
/**
* Create a pipe utilizing Grouped Work Stealing with a group of size (theftBound+1).
* It chooses its victim for stealing work from within [myPosition, myPosition+theftBound),
* meaning a group size of 1 will result in exactly one eligible victim.
* Note the myPosition mentioned above does not refer to the pipe itself as a WorkStealingPipe never is its own peer.
*/
public WorkStealingPipe<T> createGWSPipe(final OutputPort<? extends T> source, final InputPort<T> target, final int theftBound) {
return create(source, target, theftBound, 1, false);
}
public WorkStealingPipe<T> create(final OutputPort<? extends T> source, final InputPort<T> target) {
return create(source, target, 0, 1);
/**
* Create a pipe utilizing Prioritized Work Stealing, always attempting to steal from the busiest peer it has.
* Peer business is determined by the {@link IPipe.size()} call.
*/
public WorkStealingPipe<T> createPWSPipe(final OutputPort<? extends T> source, final InputPort<T> target) {
return create(source, target, 0, 1, true);
}
}
package teetime.stage;
import java.util.Deque;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import teetime.framework.AbstractStage;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.stage.basic.distributor.Distributor;
/**
* Should be considered a part of the preceding {@link Distributor} in terms of thread membership
*/
public class WorkstealingBalancer<T> extends AbstractStage {
private Class<T> type;
// output from the preceding stage is mapped to the input of the following up stage
private Map<OutputPort<T>, InputPort<T>> portMap;
// each input port is connected to its corresponding output via a deque
private List<Deque<T>> deques;
// TODO implement work stealing strategy as interface
/*
* public WorkstealingBalancer(final List<OutputPort<T>> sourcePorts, final List<InputPort<T>> targetPorts) throws Exception {
* // source ports have to match target ports in amount
* if (sourcePorts.size() != targetPorts.size()) {
* throw new Exception("WorkstealingBalancer: Argument counts don't match!");
* }
* for (int i = 0; i < sourcePorts.size(); i++) {
* portMap.put(sourcePorts.get(i), targetPorts.get(i));
* deques.add(new LinkedList<T>());
* }
* }
*/
public void addPorts(final OutputPort<T> source, final InputPort<T> target) {
// portMap.put(source, target);
portMap.put(this.createOutputPort(type), this.createInputPort(type));
deques.add(new LinkedList<T>());
}
@Override
protected void execute() {
}
}
......@@ -34,8 +34,8 @@ import teetime.stage.string.WordCounter;
import teetime.stage.util.CountingMap;
/**
* A simple configuration, which counts the words of a set of files.
* The execution of this configuration is demonstrated in {@link WordCountingTest}.
* A configuration which counts the words of a set of files utilizing work stealing pipes.
* The execution of this configuration is demonstrated in {@link WordCounterWorkStealingTest}.
*
* This configuration is divided into three parts. The first part reads files and distributes them to different {@link WordCounter} instances.
* The second part are a certain number of WordCounter instances. On construction of this class the number of concurrent WordCounter instances is specified with the
......@@ -59,7 +59,10 @@ public class WordCounterWorkStealingConfiguration extends Configuration {
private final Distributor<String> distributor;
private final WorkStealingPipeFactory<String> factory = new WorkStealingPipeFactory<String>();
private final boolean BALANCED = true;
// Distribute the work via NonBlockingRoundRobinStrategy / with no strategy at all
private final boolean BALANCED = false;
// Simple switch for standard issue WordCounterExecution / work stealing as specified below
private final boolean WS = true;
public WordCounterWorkStealingConfiguration(final int threads, final File... input) {
......@@ -91,8 +94,9 @@ public class WordCounterWorkStealingConfiguration extends Configuration {
// intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort());
final WordCounter threadableStage = wc;
// This is where the magic happens. Ugly though, might be changed for the final release into something more... flexible.
if (WS) {
registerCustomPipe(factory.create(distributor.getNewOutputPort(), threadableStage.getInputPort(), 3));
registerCustomPipe(factory.createPWSPipe(distributor.getNewOutputPort(), threadableStage.getInputPort()));
} else {
connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1024);
}
......
......@@ -32,7 +32,6 @@ import com.google.common.primitives.Longs;
import teetime.framework.AbstractPort;
import teetime.framework.Execution;
import teetime.framework.pipe.IMonitorablePipe;
import teetime.framework.pipe.WorkStealingPipe;
import teetime.util.StopWatch;
public class WordCounterWorkStealingTest {
......@@ -121,8 +120,10 @@ public class WordCounterWorkStealingTest {
for (final AbstractPort<?> port : wcc.getDistributorPorts()) {
final IMonitorablePipe spscPipe = (IMonitorablePipe) port.getPipe();
System.out.println("numWaits: " + spscPipe.getNumWaits());
final WorkStealingPipe wsPipe = (WorkStealingPipe) port.getPipe();
System.out.println("numSteals: " + wsPipe.getNumSteals());
// Code below does not work if no WorkStealingPipes are used. Because illegal casts.
// final WorkStealingPipe wsPipe = (WorkStealingPipe) port.getPipe();
// System.out.println("numSteals: " + wsPipe.getNumSteals());
}
System.out.println("merger pipes:");
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment