Java puzzle, low latency queue
There a number of ways to use queues. A simple and very fast queue is to use AtomicReference. (It stores 0 or 1 element)
1. Write a producer and consumer thread which communicates via an AtomicReference.
[Click for sample solution]
2. Have the consumer send messages back to the producer using another AtomicReference. [left as an exercise]
The average round trip time taken was 156 ns if you have one message at a time, or 98 ns if you have two messages at a time. 3. Time the throughput for one second repeatedly. What variation do you see? 4. Pad the AtomicReference to make sure they are at least 64 bytes apart in memory and measure again.
The average round trip time taken was 156 ns if you have one message at a time, or 98 ns if you have two messages at a time. 3. Time the throughput for one second repeatedly. What variation do you see? 4. Pad the AtomicReference to make sure they are at least 64 bytes apart in memory and measure again.
In your example, there is a while loop in the take method without any backoff time. Is this due to low latency? Would you use it in real world code?
ReplyDelete@gulyasm, Rather than backing of the thread would do something else. As this is a simple example, there is nothing else to do. This is for low latency and is only useful when you have busy waiting or you use it combination with LockSupport.pask and unpark.
ReplyDeletePeter,
ReplyDeleteI appreciate your blog a lot. Thanks for your effort.
One question: where do you place the padding longs?
regards,
Zeljko
@Zeiko K. You can create a sub-class of AtomicReference which has these fields. Make them public so the JIT doesn't decide you don't really need them. ;)
ReplyDeletePeter,
ReplyDeleteIt looks that 10*1000*1000 is too small number and to minimize result I increased it up to 100*1000*1000 (and launched with -server JVM option).
Also when I replaced AtomicReference by ConcurrentLinkedQueue and lines:
...
while (!queue.compareAndSet(null, l)) ;
...
t = queue.getAndSet(null);
...
by following:
...
queue.offer(l);
...
t = queue.poll();
...
and got the same result as with AtomicReference, but with lesser contention when using of multiple producers.
Best regards,
Andriy
@Andriy, I increased the test to 100 M and with -verbosegc the last two lines are
ReplyDelete[GC 56912K->336K(313344K), 0.0017300 secs]
Took an average of 57 ns per object
If I use ConcurrentLinkedQueue with -verbosegc I get the following output.
[GC 4247104K->4248320K(7096512K), 6.6324890 secs]
Took an average of 582 ns per object
If I get -mx4g or lower it gets an OutOfMemoryError
Peter,
ReplyDeletePossible on your environment the producer outpace the consumer, and it lead to unlimited growing of queue length.
It looks that this stuff highly depends on environment (hardware / OS / JVM).
In my environment (Core 2 Duo, 3HGz / Windows 7, 32-bit / JDK 1.6.0_29) the -verbosegc option help ConcurrentLinkedQueue implementation to outperform implementation with AtomicReference.
Adding some kind of back pressure delay when size of queue outgrow some optimal size can save from OutOfMemoryError.
Simplest (but sure not best) implementation of back pressure can be checking queue size, and sleeping of producer thread for time required to process all incoming messages for this interval:
if (queue.size() > 1000) {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
But it will decrease throughput and add more jitter to latency.
Sure to avoid reinventing the wheel and saving time in rush for best latency and throughput for more complicated cases as multiple producers or consumers it is better to use proven solutions like the Disruptor pattern: http://code.google.com/p/disruptor
@Andriy, The queue.size() method appears to be O(n) and just testing it slows down the solution dramatically.
ReplyDeleteint runs =100 * 1000;
for (long i = 0; i < runs; i++) {
queue.offer(i);
queue.size();
}
prints
Took an average of 106,713 ns per object
The disruptor pattern is a powerful solution for certain cases, however if all you wanted to do was pass an object between two threads, the disruptor appears to be overkill compared with an AtomicReference.
@Andriy, BTW: I appreciate the feedback. Thank you very much. :)
ReplyDeleteVery interesting post.
ReplyDeleteI found for my quad core MacBookPro running Lion with 64 bit JDK7 server VM that the original code hung. I modified the code by adding Thread.sleep(0) and added in other variations:
public final class HangingTest {
private static final long poison = -1;
private static final int loops = 4 * 1000 * 1000;
public static void main( final String... notUsed ) {
test( new SynchronousPT(), true );
test( new SynchronousPT(), false );
test( new BlockingPT(), true );
test( new BlockingPT(), false );
test( new AtomicPT(), true );
test( new AtomicPT(), false );
test( new VolatilePT(), true );
test( new VolatilePT(), false );
test( new NothingPT(), true );
test( new NothingPT(), false );
}
private static void test( final AbstractPT pc, final boolean noError ) {
System.gc();
System.gc();
final Callable put =
new Callable() {
@Override public Void call() throws InterruptedException {
for ( int i = 0; i < loops; i++ ) {
pc.put( noError ? i : 0 ); // Deliberate error pass 0; should pass i.
}
pc.put( poison );
return null;
}
};
final Callable take =
new Callable() {
@Override public Void call() throws InterruptedException, IllegalStateException {
for ( int expected = 0;; expected++ ) {
final long value = pc.take();
if ( value == poison ) { return null; }
if ( value != expected ) { throw new IllegalStateException(); }
}
}
};
final ExecutorService pool = Executors.newFixedThreadPool( 2 );
final long start = System.nanoTime();
try {
// pool.invokeAny( Arrays.asList( put, take ) ); // Does not work! Hence loop etc. below.
final Future takeFuture = pool.submit( put );
final Future putFuture = pool.submit( take );
for ( ;; ) {
if ( putFuture.isDone() || takeFuture.isDone() ) {
putFuture.cancel( true );
takeFuture.cancel( true );
putFuture.get(); // Test for any exceptions
takeFuture.get(); // Test for any exceptions
break;
}
Thread.sleep( 0 ); // Wait for other threads
}
} catch ( final Exception e ) {
System.err.println( e );
}
final long finish = System.nanoTime();
System.out.println( pc.toString() + " (" + noError + ")" + " took on average " +
( ( finish - start ) / loops ) + " ns" );
pool.shutdown();
}
See next post - too long for 1!
Continued ...
ReplyDeleteprivate abstract static class AbstractPT {
abstract void put( final long value ) throws InterruptedException;
abstract long take() throws InterruptedException;
@Override public String toString() { return getClass().getSimpleName(); }
}
private static final class SynchronousPT extends AbstractPT {
final BlockingQueue queue = new SynchronousQueue();
@Override void put( final long value ) throws InterruptedException { queue.put( value ); }
@Override long take() throws InterruptedException { return queue.take(); }
}
private static final class BlockingPT extends AbstractPT {
final BlockingQueue queue = new ArrayBlockingQueue( 1 );
@Override void put( final long value ) throws InterruptedException { queue.put( value ); }
@Override long take() throws InterruptedException { return queue.take(); }
}
private static final class AtomicPT extends AbstractPT {
final AtomicReference queue = new AtomicReference();
@Override void put( final long value ) throws InterruptedException {
while ( !queue.compareAndSet( null, value ) ) {
Thread.sleep( 0 ); // Wait for other thread
}
}
@Override long take() throws InterruptedException {
Long t;
do {
Thread.sleep( 0 ); // Wait for other thread
t = queue.getAndSet( null );
} while ( t == null );
return t;
}
}
private static final class VolatilePT extends AbstractPT {
volatile Long queue = null;
@Override void put( final long value ) throws InterruptedException {
while ( queue != null ) {
Thread.sleep( 0 ); // Wait for other thread
}
queue = value;
}
@Override long take() throws InterruptedException {
Long t;
do {
Thread.sleep( 0 ); // Wait for other thread
t = queue;
} while ( t == null );
queue = null;
return t;
}
}
private static final class NothingPT extends AbstractPT {
Long queue = null;
@Override void put( final long value ) throws InterruptedException {
while ( queue != null ) {
Thread.sleep( 0 ); // Wait for other thread
}
queue = value;
}
@Override long take() throws InterruptedException {
Long t;
do {
Thread.sleep( 0 ); // Wait for other thread
t = queue;
} while ( t == null );
queue = null;
return t;
}
}
}
The above gives:
SynchronousPT (true) took on average 1387 ns
SynchronousPT (false) took on average 0 ns
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
BlockingPT (true) took on average 6921 ns
BlockingPT (false) took on average 0 ns
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
AtomicPT (true) took on average 270 ns
AtomicPT (false) took on average 0 ns
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
VolatilePT (true) took on average 292 ns
VolatilePT (false) took on average 0 ns
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
NothingPT (true) took on average 278 ns
NothingPT (false) took on average 0 ns
java.util.concurrent.ExecutionException: java.lang.IllegalStateException
BUILD SUCCESSFUL (total time: 36 seconds)
When run from Netbeans via Ant. I noticed that:
1. Atomic, Volatile, and Nothing were all about the same time
2. I don't no why Volatile and Nothing work!
3. I don't no why sleep is needed to prevent hanging
4. I don't no why invokeAny didn't work
Can anyone shed light on the above notes? Do you get the same results on your machine?
Cheers,
-- Howard.
Yes, spin locks is quite fast when contend is not very high, however, I don't think this queue is as scalable as as other blocking queue in jdk.
ReplyDelete1 producer, 1 consumer:
LinkedTransferQueue : 100 ns per transfer
LinkedBlockingQueue : 138 ns per transfer
LinkedBlockingQueue(cap) : 98 ns per transfer
LinkedBlockingDeque : 201 ns per transfer
ArrayBlockingQueue : 172 ns per transfer
SynchronousQueue : 205 ns per transfer
SynchronousQueue(fair) : 217 ns per transfer
LinkedTransferQueue(xfer) : 162 ns per transfer
LinkedTransferQueue(half) : 156 ns per transfer
PriorityBlockingQueue : 620 ns per transfer
ArrayBlockingQueue(fair) : 13990 ns per transfer
AtomicReferenceQueque(yours): : 87 ns per transfer
2 produers, 2 consumers
LinkedTransferQueue : 101 ns per transfer
LinkedBlockingQueue : 125 ns per transfer
LinkedBlockingQueue(cap) : 139 ns per transfer
LinkedBlockingDeque : 176 ns per transfer
ArrayBlockingQueue : 247 ns per transfer
SynchronousQueue : 175 ns per transfer
SynchronousQueue(fair) : 298 ns per transfer
LinkedTransferQueue(xfer) : 162 ns per transfer
LinkedTransferQueue(half) : 198 ns per transfer
PriorityBlockingQueue : 535 ns per transfer
ArrayBlockingQueue(fair) : 15472 ns per transfer
AtomicReferenceQueque : 228 ns per transfer
4 producers and 4 consumers
LinkedTransferQueue : 150 ns per transfer
LinkedBlockingQueue : 155 ns per transfer
LinkedBlockingQueue(cap) : 123 ns per transfer
LinkedBlockingDeque : 180 ns per transfer
ArrayBlockingQueue : 338 ns per transfer
SynchronousQueue : 359 ns per transfer
SynchronousQueue(fair) : 217 ns per transfer
LinkedTransferQueue(xfer) : 236 ns per transfer
LinkedTransferQueue(half) : 229 ns per transfer
PriorityBlockingQueue : 318 ns per transfer
ArrayBlockingQueue(fair) : 16437 ns per transfer
AtomicReferenceQueque : 620 ns per transfer
I did the test on my laptop (i5, 4 core)
JDK version 1.6