The Exchanger and GC-less Java

Overview

The Exchanger class is very efficient at passing work between thread and recycling the objects used. AFAIK, It is also one of the least used Concurrency classes.
As @Marksim Sipos points out, if you don't need GC less logging using an ArrayBlockingQueue is much simpler.

Exchanger class

The Exchanger class is useful for passing data back and forth between two threads. e.g. Producer/Consumer. It has the property of naturally recycling the data structures used to pass the work and supports GC-less sharing of work in an efficient manner.
Here is an example, passing logs to a background logger.
Work (a log entry) is batched into LogEntries and passed to a background thread which later passes it back to the thread so it can add more work. Provided the background thread is always finished before the batch is full, it is almost transparent. Increasing the size of the batch reduces how often the batch is full but increase the number of unprocessed entries waiting at any one time. Calling flush() can push out the data.
The key line is the following which exchanges the batch in the current thread with the batch in the other thread. The producer fills up the batch while the consumer is emptying it.
The exchange when it occurs typically takes 1-4 micro-seconds. In this case, once every 64 lines.

entries = logEntriesExchanger.exchange(entries);

How does this compare to the LMAX disruptor pattern

This approach has similar principles to the Disruptor. No GC using recycled, pre-allocated buffers and lock free operations (The Exchanger not completely lock free and doesn't busy wait, but it could)
Two keys difference are:
  • there is only one producer/consumer in this case, the disruptor supports multiple consumers. 
  • this approach re-uses a much smaller buffer efficiently. If you are using ByteBuffer (as I have in the past) an optimal size might be 32 KB.  The disruptor library was designed to exploit large amounts of memory on the assumption it is relative cheap and can use medium sized (MBs) to very large buffers (GBs). e.g. it was design for servers with 144 GB.  I am sure it works well on much smaller servers. ;)
Thank you @Doug, for reminding me to mention the Disruptor pattern.

If you have dozen logs files (for different purposes) and you want to minimise memory foot print and you prefer the consuming thread to be blocking rather than busy waiting which consumes 100% of a thread (which adds a small latency of up to 10 us) then the Exchanger is better suited.

Exchanger example

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class BackgroundLogger implements Runnable {
  static final int ENTRIES = 64;

  static class LogEntry {
    long time;
    int level;
    final StringBuilder text = new StringBuilder();
  }

  static class LogEntries {
    final LogEntry[] lines = new LogEntry[ENTRIES];
    int used = 0;
  }

  private final ExecutorService executor = Executors.newSingleThreadExecutor();
  final Exchanger<LogEntries> logEntriesExchanger = new Exchanger<LogEntries>();
  LogEntries entries = new LogEntries();

  BackgroundLogger() {
    executor.submit(this);
  }

  public StringBuilder log(int level) {
    try {
      if (entries.used == ENTRIES)
        entries = logEntriesExchanger.exchange(entries);
      LogEntry le = entries.lines[entries.used++];
      le.time = System.currentTimeMillis();
      le.level = level;
      return le.text;

    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  public void flush() throws InterruptedException {
    if(entries.used > 0)
        entries = logEntriesExchanger.exchange(entries);
  }

  public void stop() {
    try {
      flush();
    } catch (InterruptedException e) {
      e.printStackTrace(); // use standard logging.
    }
    executor.shutdownNow();
  }

  @Override
  public void run() {
    LogEntries entries = new LogEntries();
    try {
      while (!Thread.interrupted()) {
        entries = logEntriesExchanger.exchange(entries);
            for (int i = 0; i < entries.used; i++) {
              bgLog(entries.lines[i]);
              entries.lines[i].text.delete(0, entries.lines[i].text.length());
        }
        entries.used = 0;
      }
    } catch (InterruptedException ignored) {

    } finally {
      System.out.println("Warn: logger stopping."); // use standard logging.
    }
  }

  private void bgLog(LogEntry line) {
    // log the entry to a file.
  }
}

Comments

  1. Shouldn't you be using Exchanger ?

    ReplyDelete
  2. Oops, it lost my angle brackets. That should have been Exchanger<LogEntries>

    ReplyDelete
  3. @Don, Thank you. I forgot generics appears as HTML tags which are ignored even if they are in a <pre> block.

    ReplyDelete
  4. For the purpose of having the Producer produce log entries, and Worker consume them, wouldn't it be more appropriate to use ArrayBlockingQueue?

    ReplyDelete
  5. @Maksim, A good question.
    ArrayBlockingQueues are simpler however, you have to create a new objects (3 in this example) to add to the queue and every time the consumer has to wait it adds another object (to record its a waiting thread)

    There is no simple way to recycle the objects by returning them back to the producer without creating more objects. (Defeating the purpose)

    If you want a GC-less application, (though most people don't) the Exchanger is a better solution.

    ReplyDelete
  6. @Maksim, I have added a comment at the start to include your point.

    ReplyDelete
  7. @Peter: Thanks, I missed the point about GC.

    Good post and interesting blog!

    ReplyDelete
  8. This reminds of an article about a Disruptor framework I read awhile ago http://code.google.com/p/disruptor/

    Basically, the idea that makes the Disruptor so fast (the authors claim, not me) is that it uses:
    1. Lock-free data structure (queue)
    2. Avoid GC by create a huge array with entries created initially and *only* modify the entries. By not having to create/delete the entries the Disruptor thus avoids GC.

    ReplyDelete
  9. @Doug, I have added a section on a comparison with the Disruptor framework.

    ReplyDelete
  10. @Peter, i didn't get your point about BlockingQueue.
    What are those 3 objects? Are they details of queue implementation? waiting for produced entries to consume also good for me. In BackgroundLogger, if producer is faster it will ends with OutOfBounds exception in log(), which is unlikely desirable.

    Thank you, Alex

    ReplyDelete
  11. @Alexander, The three objects in this example are the objects added to the queue. In your case it might not be 3, but it will be a minimum of one. The consumer added object is due to the queue implementation.

    If the producer is consistently too fast for the consumer, you have a design problem. You are likely to be trying to write logs faster than your underlying hardware can support. In this case buy a fast disk subsystem, or a better solution is to write less logs.

    If you have high bursts of activity, the buffer should be large enough that the producer is not waiting for the consumer. On the rare occasion, the buffer is still not big enough, the producer will block instead of the consumer. This may be undesirable, but making the buffer too large can also be undesirable.

    ReplyDelete
  12. Peter,

    In the log( ... ), shouldn't these operations be atomic?

    LogEntry le = entries.lines[entries.used++];
    le.time = System.currentTimeMillis();
    le.level = level;

    Cheers
    Vic

    ReplyDelete
  13. @Unknown excellent question. The way it works is that each thread has it own buffer its working on. When exchange() is called the buffers are swapped.

    ReplyDelete
  14. "If the producer is consistently too fast for the consumer ..."

    Peter, I am curious if writing faster than the underlying hardware is a likely scenario?

    As a test, I took your sample code and incessantly called the log( .. ) a million times.
    My backing LogEntries array had a capacity of 1000 and buffers were exchanged when 25% of the buffer was full (rather than waiting till the buffer was 100% full).

    Further, I was logging to the console (which is slower than logging to a File) and it worked perfectly.

    Perhaps one could fill the buffer if multiple threads are calling log ( ... ), even then one can increase the buffer size and exchange quickly ( when 15% of the buffer is full? ).

    Any thoughts?

    Cheers,
    Vic

    ReplyDelete
  15. Exchanging when the buffer is 25% full is the same as exchange when the buffer is 100% but 1/4 of the size.

    If the producer is faster than the consumer, the producer is forced to wait until the consumer is ready to exchange. Similarly if you write directly to the screen, the producer is slowed down to never be far ahead of the consumer.

    In some models you can't allow the producer to be delayed and you need to use a different model.

    ReplyDelete
  16. Peter,

    What modification would it require to support multiple producers?

    Thanks

    ReplyDelete

Post a Comment

Popular posts from this blog

Java is Very Fast, If You Don’t Create Many Objects

System wide unique nanosecond timestamps

Comparing Approaches to Durability in Low Latency Messaging Queues