Commit f1097240 authored by Maximilian	Brütt's avatar Maximilian Brütt

WorkStealingPipes added. NF for multi steal

parent 1055c338
......@@ -17,7 +17,7 @@
<attribute name="maven.pomderived" value="true"/>
</attributes>
</classpathentry>
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.6">
<classpathentry kind="con" path="org.eclipse.jdt.launching.JRE_CONTAINER/org.eclipse.jdt.internal.debug.ui.launcher.StandardVMType/JavaSE-1.7">
<attributes>
<attribute name="maven.pomderived" value="true"/>
</attributes>
......
......@@ -7,9 +7,9 @@ org.eclipse.jdt.core.compiler.annotation.nullable=org.eclipse.jdt.annotation.Nul
org.eclipse.jdt.core.compiler.annotation.nullanalysis=disabled
org.eclipse.jdt.core.compiler.codegen.inlineJsrBytecode=enabled
org.eclipse.jdt.core.compiler.codegen.methodParameters=do not generate
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.6
org.eclipse.jdt.core.compiler.codegen.targetPlatform=1.7
org.eclipse.jdt.core.compiler.codegen.unusedLocal=preserve
org.eclipse.jdt.core.compiler.compliance=1.6
org.eclipse.jdt.core.compiler.compliance=1.7
org.eclipse.jdt.core.compiler.debug.lineNumber=generate
org.eclipse.jdt.core.compiler.debug.localVariable=generate
org.eclipse.jdt.core.compiler.debug.sourceFile=generate
......@@ -97,7 +97,7 @@ org.eclipse.jdt.core.compiler.problem.unusedPrivateMember=warning
org.eclipse.jdt.core.compiler.problem.unusedTypeParameter=ignore
org.eclipse.jdt.core.compiler.problem.unusedWarningToken=warning
org.eclipse.jdt.core.compiler.problem.varargsArgumentNeedCast=warning
org.eclipse.jdt.core.compiler.source=1.6
org.eclipse.jdt.core.compiler.source=1.7
org.eclipse.jdt.core.compiler.taskCaseSensitive=enabled
org.eclipse.jdt.core.compiler.taskPriorities=NORMAL,HIGH,NORMAL,LOW
org.eclipse.jdt.core.compiler.taskTags=TODO,FIXME,XXX,BETTER
......
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import teetime.framework.AbstractSynchedPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.StageState;
import teetime.framework.exceptionHandling.TerminateException;
import teetime.util.framework.concurrent.queue.ObservableSpMcArrayQueue;
public class BoundedSynchedMcPipe<T> extends AbstractSynchedPipe<T> implements IMonitorablePipe {
// private static final Logger LOGGER = LoggerFactory.getLogger(SpScPipe.class);
private final ObservableSpMcArrayQueue<Object> queue;
// statistics
private int numWaits;
public BoundedSynchedMcPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int capacity) {
super(sourcePort, targetPort, capacity);
this.queue = new ObservableSpMcArrayQueue<Object>(capacity);
}
public BoundedSynchedMcPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort) {
this(sourcePort, targetPort, 4);
}
// BETTER introduce a QueueIsFullStrategy
@Override
public boolean add(final Object element) {
while (!addNonBlocking(element)) {
// the following sending*-related lines are commented out since they are computationally too expensive
// this.getSourcePort().getOwningStage().sendingFailed();
// Thread.yield();
if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED ||
Thread.currentThread().isInterrupted()) {
throw TerminateException.INSTANCE;
}
this.numWaits++;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw TerminateException.INSTANCE;
}
}
// this.getSourcePort().getOwningStage().sendingSucceeded();
// this.reportNewElement();
return true;
}
@Override
public int capacity() {
return this.queue.capacity();
}
@Override
public boolean addNonBlocking(final Object element) {
return this.queue.offer(element);
}
@Override
public Object removeLast() {
return this.queue.poll();
}
@Override
public boolean isEmpty() {
return this.queue.isEmpty();
}
@Override
public int size() {
return this.queue.size();
}
@Override
public int getNumWaits() {
return this.numWaits;
}
@Override
public long getPushThroughput() {
return queue.getNumPushes();
}
@Override
public long getPullThroughput() {
return queue.getNumPulls();
}
@Override
public long getNumPushes() {
return queue.getNumPushesSinceAppStart();
}
@Override
public long getNumPulls() {
return queue.getNumPullsSinceAppStart();
}
}
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.framework.pipe;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import teetime.framework.AbstractSynchedPipe;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
import teetime.framework.StageState;
import teetime.framework.exceptionHandling.TerminateException;
import teetime.util.framework.concurrent.queue.ObservableSpMcArrayQueue;
public final class WorkStealingPipe<T> extends AbstractSynchedPipe<T> implements IMonitorablePipe {
private final ObservableSpMcArrayQueue<Object> queue;
private final List<WorkStealingPipe<T>> peers;
// statistics
private int numWaits;
private int numSteals;
// TODO strategy
// private final int WORK_STEALING_MAX_DEPTH = 1;
private final int haulSize;
protected WorkStealingPipe(final OutputPort<? extends T> sourcePort, final InputPort<T> targetPort, final int haulSize) {
super(sourcePort, targetPort, 1024);
this.peers = new ArrayList<WorkStealingPipe<T>>();
this.queue = new ObservableSpMcArrayQueue<Object>(1024);
this.haulSize = haulSize;
}
@Override
public Object removeLast() {
Object obj = this.queue.poll();
if (obj != null) {
return obj;
} else { // when the pipe's queue is empty...
// choose victim
WorkStealingPipe<T> victim = peers.get((new Random()).nextInt(peers.size()));
// int maxSteal = Math.min(victim.size(), haulSize);
for (int i = 0; i < haulSize; i++) {
// this.queue.add(victim.removeLast());
Object haul = victim.steal();
if (haul != null) {
numSteals++;
this.queue.offer(haul);
} else {
if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED ||
Thread.currentThread().isInterrupted()) {
throw TerminateException.INSTANCE;
}
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw TerminateException.INSTANCE;
}
break;
}
}
return this.queue.poll();
// return null;
}
}
private Object steal() {
return this.queue.poll();
}
public int getNumSteals() {
return numSteals;
}
protected void addPeer(final WorkStealingPipe<T> pipe) {
peers.add(pipe);
}
/******************/
@Override
public boolean add(final Object element) {
while (!addNonBlocking(element)) {
// the following sending*-related lines are commented out since they are computationally too expensive
// this.getSourcePort().getOwningStage().sendingFailed();
// Thread.yield();
if (this.cachedTargetStage.getCurrentState() == StageState.TERMINATED ||
Thread.currentThread().isInterrupted()) {
throw TerminateException.INSTANCE;
}
numWaits++;
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw TerminateException.INSTANCE;
}
}
// this.getSourcePort().getOwningStage().sendingSucceeded();
// this.reportNewElement();
return true;
}
@Override
public int capacity() {
return this.queue.capacity();
}
@Override
public boolean addNonBlocking(final Object element) {
return this.queue.offer(element);
}
@Override
public boolean isEmpty() {
return this.queue.isEmpty();
}
@Override
public int size() {
return this.queue.size();
}
@Override
public long getPushThroughput() {
return queue.getNumPushes();
}
@Override
public long getPullThroughput() {
return queue.getNumPulls();
}
@Override
public long getNumPushes() {
return queue.getNumPushesSinceAppStart();
}
@Override
public long getNumPulls() {
return queue.getNumPullsSinceAppStart();
}
@Override
public int getNumWaits() {
return numWaits;
}
}
package teetime.framework.pipe;
import java.util.ArrayList;
import java.util.List;
import teetime.framework.InputPort;
import teetime.framework.OutputPort;
/**
* Factory for {@link WorkStealingPipe}s. Each family of such pipes needs a separate factory as the available victims are kept track of here.
*/
public class WorkStealingPipeFactory<T> {
private final List<WorkStealingPipe<T>> instances = new ArrayList<WorkStealingPipe<T>>();
public WorkStealingPipe<T> create(final OutputPort<? extends T> source, final InputPort<T> target, final int haulSize) {
WorkStealingPipe<T> newPipe = new WorkStealingPipe<T>(source, target, haulSize);
for (WorkStealingPipe<T> pipe : instances) {
pipe.addPeer(newPipe);
newPipe.addPeer(pipe);
}
instances.add(newPipe);
return newPipe;
}
public WorkStealingPipe<T> create(final OutputPort<? extends T> source, final InputPort<T> target) {
return create(source, target, 1);
}
}
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.stage.basic.distributor.strategy;
import java.util.List;
import teetime.framework.OutputPort;
public class NoDistributionStrategy implements IDistributorStrategy {
@SuppressWarnings("unchecked")
@Override
public <T> boolean distribute(final List<OutputPort<?>> outputPorts, final T element) {
final OutputPort<T> outputPort = (OutputPort<T>) outputPorts.get(0);
outputPort.send(element);
return true;
}
@Override
public void onPortRemoved(final OutputPort<?> removedOutputPort) {
// Distributor<?> distributor = (Distributor<?>) removedOutputPort.getOwningStage();
// correct the index if it is out-of-bounds
// this.index = this.index % distributor.getOutputPorts().size();
}
}
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.util.framework.concurrent.queue;
import org.jctools.queues.SpmcArrayQueue;
public final class ObservableSpMcArrayQueue<E> extends SpmcArrayQueue<E> {
private transient long lastProducerIndex, lastConsumerIndex;
public ObservableSpMcArrayQueue(final int capacity) {
super(capacity);
}
/**
* @return the number of pushes to this queue since application start
*/
public long getNumPushesSinceAppStart() {
return currentProducerIndex();
}
/**
* @return the number of pulls from this queue since application start
*/
public long getNumPullsSinceAppStart() {
return currentConsumerIndex();
}
/**
* @return the number of pushes to this queue since last method call
*/
public long getNumPushes() {
final long currentProducerIndex = getNumPushesSinceAppStart();
long diff = currentProducerIndex - lastProducerIndex;
lastProducerIndex = currentProducerIndex;
return diff;
}
/**
* @return the number of pulls to this queue since last method call
*/
public long getNumPulls() {
final long currentConsumerIndex = getNumPullsSinceAppStart();
long diff = currentConsumerIndex - lastConsumerIndex;
lastConsumerIndex = currentConsumerIndex;
return diff;
}
}
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.examples.wordcounter;
import java.io.File;
import java.util.ArrayList;
import java.util.List;
import teetime.framework.AbstractPort;
import teetime.framework.Configuration;
import teetime.framework.MonitoringThread;
import teetime.framework.pipe.WorkStealingPipeFactory;
import teetime.stage.CountingMapMerger;
import teetime.stage.InitialElementProducer;
import teetime.stage.basic.distributor.Distributor;
import teetime.stage.basic.distributor.strategy.NoDistributionStrategy;
import teetime.stage.basic.distributor.strategy.NonBlockingRoundRobinStrategy;
import teetime.stage.basic.merger.Merger;
import teetime.stage.io.File2SeqOfWords;
import teetime.stage.string.WordCounter;
import teetime.stage.util.CountingMap;
/**
* A simple configuration, which counts the words of a set of files.
* The execution of this configuration is demonstrated in {@link WordCountingTest}.
*
* This configuration is divided into three parts. The first part reads files and distributes them to different {@link WordCounter} instances.
* The second part are a certain number of WordCounter instances. On construction of this class the number of concurrent WordCounter instances is specified with the
* threads parameter.
* The third and last part collects the results from all WordCounter instances and merges them. This final result can be read afterwards.
*
*
* @author Nelson Tavares de Sousa
*
*/
public class WordCounterWorkStealingConfiguration extends Configuration {
// Last stage is saved as field, to retrieve the result after execution.
private final CountingMapMerger<String> result = new CountingMapMerger<String>();
private final List<AbstractPort<?>> distributorPorts = new ArrayList<AbstractPort<?>>();
private final List<AbstractPort<?>> mergerPorts = new ArrayList<AbstractPort<?>>();
private final MonitoringThread monitoringThread;
private final Distributor<String> distributor;
private final WorkStealingPipeFactory<String> factory = new WorkStealingPipeFactory<String>();
private final boolean BALANCED = true;
private final boolean WS = true;
public WordCounterWorkStealingConfiguration(final int threads, final File... input) {
// First part of the config
final InitialElementProducer<File> init = new InitialElementProducer<File>(input);
// final File2Lines f2b = new File2Lines();
final File2SeqOfWords f2b = new File2SeqOfWords("UTF-8", 512);
if (BALANCED) {
distributor = new Distributor<String>(new NonBlockingRoundRobinStrategy());
} else {
distributor = new Distributor<String>(new NoDistributionStrategy());
}
// last part
final Merger<CountingMap<String>> merger = new Merger<CountingMap<String>>();
// CountingMapMerger (already as field)
// Connecting the stages of the first part of the config
connectPorts(init.getOutputPort(), f2b.getInputPort());
connectPorts(f2b.getOutputPort(), distributor.getInputPort());
monitoringThread = new MonitoringThread();
// Middle part... multiple instances of WordCounter are created and connected to the merger and distrubuter stages
for (int i = 0; i < threads; i++) {
// final InputPortSizePrinter<String> inputPortSizePrinter = new InputPortSizePrinter<String>();
final WordCounter wc = new WordCounter();
// intraFact.create(inputPortSizePrinter.getOutputPort(), wc.getInputPort());
final WordCounter threadableStage = wc;
if (WS) {
registerCustomPipe(factory.create(distributor.getNewOutputPort(), threadableStage.getInputPort()));
} else {
connectPorts(distributor.getNewOutputPort(), threadableStage.getInputPort(), 1024);
}
connectPorts(wc.getOutputPort(), merger.getNewInputPort());
// Add WordCounter as a threadable stage, so it runs in its own thread
threadableStage.getInputPort().getOwningStage().declareActive();
distributorPorts.add(threadableStage.getInputPort());
mergerPorts.add(wc.getOutputPort());
monitoringThread.addPort(threadableStage.getInputPort());
}
// Connect the stages of the last part
connectPorts(merger.getOutputPort(), result.getInputPort());
// Add the first and last part to the threadable stages
init.declareActive();
merger.declareActive();
}
public MonitoringThread getMonitoringThread() {
return monitoringThread;
}
// Further methods are allowed. For e.g. it is possible to read data from certain stages.
public CountingMap<String> getResult() {
return result.getResult();
}
public List<AbstractPort<?>> getDistributorPorts() {
return distributorPorts;
}
public List<AbstractPort<?>> getMergerPorts() {
return mergerPorts;
}
public Distributor<String> getDistributor() {
return distributor;
}
}
/**
* Copyright © 2015 Christian Wulf, Nelson Tavares de Sousa (http://teetime-framework.github.io)
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package teetime.examples.wordcounter;
import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.PrintStream;
import java.io.UnsupportedEncodingException;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.base.Joiner;
import com.google.common.primitives.Longs;
import teetime.framework.AbstractPort;
import teetime.framework.Execution;
import teetime.framework.pipe.IMonitorablePipe;
import teetime.framework.pipe.WorkStealingPipe;
import teetime.util.StopWatch;
public class WordCounterWorkStealingTest {
private static final Logger LOGGER = LoggerFactory.getLogger(WordCounterWorkStealingTest.class);
public static void writeTimingsToFile(final File outputFile, final long[] timings) throws UnsupportedEncodingException, FileNotFoundException {
final PrintStream ps = new PrintStream(new BufferedOutputStream(new FileOutputStream(outputFile, true), 8192 * 8), false, "UTF-8");
try {
final Joiner joiner = com.google.common.base.Joiner.on(' ');
final String timingsString = joiner.join(Longs.asList(timings));
ps.println(timingsString);
} finally {
ps.close();
}
}
public static void main(final String[] args) throws UnsupportedEncodingException, FileNotFoundException {
int numWorkerThreads;
try {
numWorkerThreads = Integer.valueOf(args[0]);
} catch (final NumberFormatException e) {
numWorkerThreads = 3;
}
LOGGER.info("# worker threads: " + numWorkerThreads);
int numWarmUps;
try {
numWarmUps = Integer.valueOf(args[1]);
} catch (final NumberFormatException e) {
numWarmUps = 1;
}
LOGGER.info("# warm ups: " + numWarmUps);
int numLoops;
try {
numLoops = Integer.valueOf(args[2]);
} catch (final NumberFormatException e) {
numLoops = 1;
}
LOGGER.info("# loops: " + numLoops);
final long[] timings = new long[1];
final String fileName = args[3];
final File testFile = new File(fileName);
final StopWatch stopWatch = new StopWatch();
for (int i = 0; i < numWarmUps; i++) {
LOGGER.info("Warm up #" + i);
final WordCounterConfiguration wcc = new WordCounterConfiguration(numWorkerThreads, testFile);
final Execution<?> analysis = new Execution<WordCounterConfiguration>(wcc);
stopWatch.start();
analysis.executeBlocking();
stopWatch.end();
LOGGER.info("duration: " + TimeUnit.NANOSECONDS.toSeconds(stopWatch.getDurationInNs()) + " secs");
}
LOGGER.info("Starting batch analysis...");
for (int i = 0; i < numLoops; i++) {
final WordCounterWorkStealingConfiguration wcc = new WordCounterWorkStealingConfiguration(numWorkerThreads, testFile);
final Execution<?> analysis = new Execution<WordCounterWorkStealingConfiguration>(wcc);
wcc.getMonitoringThread().start();
stopWatch.start();
analysis.executeBlocking();
stopWatch.end();
wcc.getMonitoringThread().terminate();