Java puzzle, low latency queue

There a number of ways to use queues. A simple and very fast queue is to use AtomicReference. (It stores 0 or 1 element)

1. Write a producer and consumer thread which communicates via an AtomicReference. [Click for sample solution]

2. Have the consumer send messages back to the producer using another AtomicReference. [left as an exercise]
The average round trip time taken was 156 ns if you have one message at a time, or 98 ns if you have two messages at a time.

3. Time the throughput for one second repeatedly. What variation do you see?

4. Pad the AtomicReference to make sure they are at least 64 bytes apart in memory and measure again.

Comments

  1. In your example, there is a while loop in the take method without any backoff time. Is this due to low latency? Would you use it in real world code?

    ReplyDelete
  2. @gulyasm, Rather than backing of the thread would do something else. As this is a simple example, there is nothing else to do. This is for low latency and is only useful when you have busy waiting or you use it combination with LockSupport.pask and unpark.

    ReplyDelete
  3. Peter,

    I appreciate your blog a lot. Thanks for your effort.
    One question: where do you place the padding longs?

    regards,
    Zeljko

    ReplyDelete
  4. @Zeiko K. You can create a sub-class of AtomicReference which has these fields. Make them public so the JIT doesn't decide you don't really need them. ;)

    ReplyDelete
  5. Peter,

    It looks that 10*1000*1000 is too small number and to minimize result I increased it up to 100*1000*1000 (and launched with -server JVM option).

    Also when I replaced AtomicReference by ConcurrentLinkedQueue and lines:

    ...
    while (!queue.compareAndSet(null, l)) ;
    ...
    t = queue.getAndSet(null);
    ...

    by following:

    ...
    queue.offer(l);
    ...
    t = queue.poll();
    ...

    and got the same result as with AtomicReference, but with lesser contention when using of multiple producers.

    Best regards,
    Andriy

    ReplyDelete
  6. @Andriy, I increased the test to 100 M and with -verbosegc the last two lines are

    [GC 56912K->336K(313344K), 0.0017300 secs]
    Took an average of 57 ns per object

    If I use ConcurrentLinkedQueue with -verbosegc I get the following output.

    [GC 4247104K->4248320K(7096512K), 6.6324890 secs]
    Took an average of 582 ns per object

    If I get -mx4g or lower it gets an OutOfMemoryError

    ReplyDelete
  7. Peter,

    Possible on your environment the producer outpace the consumer, and it lead to unlimited growing of queue length.
    It looks that this stuff highly depends on environment (hardware / OS / JVM).
    In my environment (Core 2 Duo, 3HGz / Windows 7, 32-bit / JDK 1.6.0_29) the -verbosegc option help ConcurrentLinkedQueue implementation to outperform implementation with AtomicReference.
    Adding some kind of back pressure delay when size of queue outgrow some optimal size can save from OutOfMemoryError.
    Simplest (but sure not best) implementation of back pressure can be checking queue size, and sleeping of producer thread for time required to process all incoming messages for this interval:
    if (queue.size() > 1000) {
    try {
    Thread.sleep(1);
    } catch (InterruptedException e) {
    throw new RuntimeException(e);
    }
    }
    But it will decrease throughput and add more jitter to latency.
    Sure to avoid reinventing the wheel and saving time in rush for best latency and throughput for more complicated cases as multiple producers or consumers it is better to use proven solutions like the Disruptor pattern: http://code.google.com/p/disruptor

    ReplyDelete
  8. @Andriy, The queue.size() method appears to be O(n) and just testing it slows down the solution dramatically.

    int runs =100 * 1000;
    for (long i = 0; i < runs; i++) {
    queue.offer(i);
    queue.size();
    }

    prints

    Took an average of 106,713 ns per object

    The disruptor pattern is a powerful solution for certain cases, however if all you wanted to do was pass an object between two threads, the disruptor appears to be overkill compared with an AtomicReference.

    ReplyDelete
  9. @Andriy, BTW: I appreciate the feedback. Thank you very much. :)

    ReplyDelete
  10. Very interesting post.

    I found for my quad core MacBookPro running Lion with 64 bit JDK7 server VM that the original code hung. I modified the code by adding Thread.sleep(0) and added in other variations:

    public final class HangingTest {
    private static final long poison = -1;

    private static final int loops = 4 * 1000 * 1000;

    public static void main( final String... notUsed ) {
    test( new SynchronousPT(), true );
    test( new SynchronousPT(), false );
    test( new BlockingPT(), true );
    test( new BlockingPT(), false );
    test( new AtomicPT(), true );
    test( new AtomicPT(), false );
    test( new VolatilePT(), true );
    test( new VolatilePT(), false );
    test( new NothingPT(), true );
    test( new NothingPT(), false );
    }

    private static void test( final AbstractPT pc, final boolean noError ) {
    System.gc();
    System.gc();
    final Callable put =
    new Callable() {
    @Override public Void call() throws InterruptedException {
    for ( int i = 0; i < loops; i++ ) {
    pc.put( noError ? i : 0 ); // Deliberate error pass 0; should pass i.
    }
    pc.put( poison );
    return null;
    }
    };
    final Callable take =
    new Callable() {
    @Override public Void call() throws InterruptedException, IllegalStateException {
    for ( int expected = 0;; expected++ ) {
    final long value = pc.take();
    if ( value == poison ) { return null; }
    if ( value != expected ) { throw new IllegalStateException(); }
    }
    }
    };
    final ExecutorService pool = Executors.newFixedThreadPool( 2 );
    final long start = System.nanoTime();
    try {
    // pool.invokeAny( Arrays.asList( put, take ) ); // Does not work! Hence loop etc. below.
    final Future takeFuture = pool.submit( put );
    final Future putFuture = pool.submit( take );
    for ( ;; ) {
    if ( putFuture.isDone() || takeFuture.isDone() ) {
    putFuture.cancel( true );
    takeFuture.cancel( true );
    putFuture.get(); // Test for any exceptions
    takeFuture.get(); // Test for any exceptions
    break;
    }
    Thread.sleep( 0 ); // Wait for other threads
    }
    } catch ( final Exception e ) {
    System.err.println( e );
    }
    final long finish = System.nanoTime();
    System.out.println( pc.toString() + " (" + noError + ")" + " took on average " +
    ( ( finish - start ) / loops ) + " ns" );
    pool.shutdown();
    }

    See next post - too long for 1!

    ReplyDelete
  11. Continued ...

    private abstract static class AbstractPT {
    abstract void put( final long value ) throws InterruptedException;

    abstract long take() throws InterruptedException;

    @Override public String toString() { return getClass().getSimpleName(); }
    }

    private static final class SynchronousPT extends AbstractPT {
    final BlockingQueue queue = new SynchronousQueue();

    @Override void put( final long value ) throws InterruptedException { queue.put( value ); }

    @Override long take() throws InterruptedException { return queue.take(); }
    }

    private static final class BlockingPT extends AbstractPT {
    final BlockingQueue queue = new ArrayBlockingQueue( 1 );

    @Override void put( final long value ) throws InterruptedException { queue.put( value ); }

    @Override long take() throws InterruptedException { return queue.take(); }
    }

    private static final class AtomicPT extends AbstractPT {
    final AtomicReference queue = new AtomicReference();

    @Override void put( final long value ) throws InterruptedException {
    while ( !queue.compareAndSet( null, value ) ) {
    Thread.sleep( 0 ); // Wait for other thread
    }
    }

    @Override long take() throws InterruptedException {
    Long t;
    do {
    Thread.sleep( 0 ); // Wait for other thread
    t = queue.getAndSet( null );
    } while ( t == null );
    return t;
    }
    }

    private static final class VolatilePT extends AbstractPT {
    volatile Long queue = null;

    @Override void put( final long value ) throws InterruptedException {
    while ( queue != null ) {
    Thread.sleep( 0 ); // Wait for other thread
    }
    queue = value;
    }

    @Override long take() throws InterruptedException {
    Long t;
    do {
    Thread.sleep( 0 ); // Wait for other thread
    t = queue;
    } while ( t == null );
    queue = null;
    return t;
    }
    }

    private static final class NothingPT extends AbstractPT {
    Long queue = null;

    @Override void put( final long value ) throws InterruptedException {
    while ( queue != null ) {
    Thread.sleep( 0 ); // Wait for other thread
    }
    queue = value;
    }

    @Override long take() throws InterruptedException {
    Long t;
    do {
    Thread.sleep( 0 ); // Wait for other thread
    t = queue;
    } while ( t == null );
    queue = null;
    return t;
    }
    }
    }

    The above gives:

    SynchronousPT (true) took on average 1387 ns
    SynchronousPT (false) took on average 0 ns
    java.util.concurrent.ExecutionException: java.lang.IllegalStateException
    BlockingPT (true) took on average 6921 ns
    BlockingPT (false) took on average 0 ns
    java.util.concurrent.ExecutionException: java.lang.IllegalStateException
    AtomicPT (true) took on average 270 ns
    AtomicPT (false) took on average 0 ns
    java.util.concurrent.ExecutionException: java.lang.IllegalStateException
    VolatilePT (true) took on average 292 ns
    VolatilePT (false) took on average 0 ns
    java.util.concurrent.ExecutionException: java.lang.IllegalStateException
    NothingPT (true) took on average 278 ns
    NothingPT (false) took on average 0 ns
    java.util.concurrent.ExecutionException: java.lang.IllegalStateException
    BUILD SUCCESSFUL (total time: 36 seconds)

    When run from Netbeans via Ant. I noticed that:

    1. Atomic, Volatile, and Nothing were all about the same time
    2. I don't no why Volatile and Nothing work!
    3. I don't no why sleep is needed to prevent hanging
    4. I don't no why invokeAny didn't work

    Can anyone shed light on the above notes? Do you get the same results on your machine?

    Cheers,

    -- Howard.

    ReplyDelete
  12. Yes, spin locks is quite fast when contend is not very high, however, I don't think this queue is as scalable as as other blocking queue in jdk.

    1 producer, 1 consumer:

    LinkedTransferQueue : 100 ns per transfer
    LinkedBlockingQueue : 138 ns per transfer
    LinkedBlockingQueue(cap) : 98 ns per transfer
    LinkedBlockingDeque : 201 ns per transfer
    ArrayBlockingQueue : 172 ns per transfer
    SynchronousQueue : 205 ns per transfer
    SynchronousQueue(fair) : 217 ns per transfer
    LinkedTransferQueue(xfer) : 162 ns per transfer
    LinkedTransferQueue(half) : 156 ns per transfer
    PriorityBlockingQueue : 620 ns per transfer
    ArrayBlockingQueue(fair) : 13990 ns per transfer
    AtomicReferenceQueque(yours): : 87 ns per transfer

    2 produers, 2 consumers
    LinkedTransferQueue : 101 ns per transfer
    LinkedBlockingQueue : 125 ns per transfer
    LinkedBlockingQueue(cap) : 139 ns per transfer
    LinkedBlockingDeque : 176 ns per transfer
    ArrayBlockingQueue : 247 ns per transfer
    SynchronousQueue : 175 ns per transfer
    SynchronousQueue(fair) : 298 ns per transfer
    LinkedTransferQueue(xfer) : 162 ns per transfer
    LinkedTransferQueue(half) : 198 ns per transfer
    PriorityBlockingQueue : 535 ns per transfer
    ArrayBlockingQueue(fair) : 15472 ns per transfer
    AtomicReferenceQueque : 228 ns per transfer

    4 producers and 4 consumers
    LinkedTransferQueue : 150 ns per transfer
    LinkedBlockingQueue : 155 ns per transfer
    LinkedBlockingQueue(cap) : 123 ns per transfer
    LinkedBlockingDeque : 180 ns per transfer
    ArrayBlockingQueue : 338 ns per transfer
    SynchronousQueue : 359 ns per transfer
    SynchronousQueue(fair) : 217 ns per transfer
    LinkedTransferQueue(xfer) : 236 ns per transfer
    LinkedTransferQueue(half) : 229 ns per transfer
    PriorityBlockingQueue : 318 ns per transfer
    ArrayBlockingQueue(fair) : 16437 ns per transfer
    AtomicReferenceQueque : 620 ns per transfer

    I did the test on my laptop (i5, 4 core)
    JDK version 1.6

    ReplyDelete

Post a Comment

Popular posts from this blog

Low Latency Microservices, A Retrospective

Unusual Java: StackTrace Extends Throwable

System wide unique nanosecond timestamps