I wanted to write some more JNI code and decided to (re)implement a semaphore for MacOSX. In order to assert somehow correctness of my implementation I hereby supply a (multi)consumer/(multi)producer demo.
semaphore_impl.h (autogenerated by javah
):
#include <jni.h> #ifndef MACOSX_SEMAPHORE_IMPL_H #define MACOSX_SEMAPHORE_IMPL_H #ifdef __cplusplus extern "C" { #endif JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_init(JNIEnv*, jobject, jint); JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_clean(JNIEnv*, jobject); JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_lock(JNIEnv*, jobject); JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_unlock(JNIEnv*, jobject); #ifdef __cplusplus } #endif #endif // MACOSX_SEMAPHORE_IMPL_H
semaphore_impl.cpp:
#include "semaphore_impl.h" #include <mach/task.h> #include <mach/semaphore.h> #include <mach/mach.h> static const char* CLASS = "net/coderodde/util/concurrent/MacOSXSemaphoreImpl"; static const char* FIELD = "semaphoreHandle"; static semaphore_t get_semaphore_handle(JNIEnv* env, jobject obj) { jclass clazz = env->FindClass(CLASS); jfieldID fid = env->GetFieldID(clazz, FIELD, "I"); return env->GetIntField(obj, fid); } JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_init(JNIEnv* env, jobject obj, jint count) { // Create a handle to a semaphore. semaphore_t semaphore; semaphore_create(current_task(), &semaphore, SYNC_POLICY_FIFO, count); // Get to the 'semaphoreHandle' field. const jclass clazz = env->FindClass(CLASS); const jfieldID semaphore_field_id = env->GetFieldID(clazz, FIELD, "I"); // Store the value of the semaphore to the Java semaphore object. env->SetIntField(obj, semaphore_field_id, semaphore); } JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_clean(JNIEnv* env, jobject obj) { semaphore_destroy(get_semaphore_handle(env, obj), current_task()); } JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_lock(JNIEnv* env, jobject obj) { semaphore_wait(get_semaphore_handle(env, obj)); } JNIEXPORT void JNICALL Java_net_coderodde_util_concurrent_MacOSXSemaphoreImpl_unlock(JNIEnv* env, jobject obj) { semaphore_signal(get_semaphore_handle(env, obj)); }
Makefile:
libsemaphore.jnilib: semaphore_impl.o g++ -dynamiclib -o libsemaphore.jnilib semaphore_impl.o cp libsemaphore.jnilib /usr/local/lib semaphore_impl.o: semaphore_impl.cpp g++ -std=c++11 -O3 -I/Developer/SDKs/MacOSX10.6.sdk/System/Library/Frameworks/JavaVM.framework/Versions/A/Headers/ -c semaphore_impl.cpp
MacOSXSemaphoreImpl.java:
package net.coderodde.util.concurrent; import java.io.File; /** * This class implements the semaphore type interfacing with MacOSX. * * @author Rodion "rodde" Efremov * @version 1.6 */ final class MacOSXSemaphoreImpl implements SemaphoreImpl { static { try { // Putting the .jnilib into working directory is // a safe choice. 'System.getProperty("user.dir")'. System.loadLibrary("semaphore"); } catch (UnsatisfiedLinkError error) { error.printStackTrace(); System.exit(1); } } /** * Holds the handle to a semaphore. */ private int semaphoreHandle; /** * Constructs a semaphore with <code>counter</code> permits. * * @param counter the amount of permits. */ MacOSXSemaphoreImpl(int counter) { init(counter); } /** * Creates the semaphore and loads its handle into <code>semaphoreId</code>. */ @Override public native void init(int counter); /** * Releases resources. */ @Override public native void clean(); /** * Acquires this semaphore. If the current counter of this semaphore is * zero, the calling thread is blocked. */ @Override public native void lock(); /** * Releases this semaphore. Effectively increments the counter of this * semaphore so that other threads may acquire this semaphore. */ @Override public native void unlock(); /** * Release the resources associated with this semaphore. */ @Override protected void finalize() { try { super.finalize(); } catch (Throwable t) {} clean(); } }
SemaphoreImpl.java:
package net.coderodde.util.concurrent; /** * This package-private interface defines the API for semaphore implementation * types. * * @author Rodion "rodde" Efremov * @version 1.6 */ interface SemaphoreImpl { /** * Initialization routine. * * @param counter the amount of threads that can pass without blocking. */ void init(int counter); /** * Releases all the resources. */ void clean(); /** * Locks the implementing semaphore. */ void lock(); /** * Unlocks the implementing semaphore. */ void unlock(); }
Semaphore.java:
package net.coderodde.util.concurrent; /** * This class implements a semaphore type ported to MacOSX. * * @author Rodion "rodde" Efremov * @version 1.6 */ public class Semaphore { /** * Holds the actual native implementation of this semaphore. */ private final SemaphoreImpl impl; /** * Constructs a new semaphore with <code>counter</code> permits. * * @param counter the amount of threads that can lock this semaphore without * blocking. */ public Semaphore(int counter) { checkCounter(counter); this.impl = new MacOSXSemaphoreImpl(counter); } /** * Acquires this semaphore. */ public void lock() { impl.lock(); } /** * Releases this semaphore. */ public void unlock() { impl.unlock(); } /** * Checks the sanity of <code>counter</code>. * * @param counter the initial amount of permits. */ private static void checkCounter(int counter) { if (counter < 0) { throw new IllegalArgumentException( "The semaphore counter too small: " + counter + ", " + "should be at least 0."); } } }
Demo.java:
package net.coderodde.util.concurrent; import java.util.Collections; import java.util.HashSet; import java.util.Random; import java.util.Set; /** * This class implements a demonstration for the semaphore. * * @author Rodion "rodde" Efremov * @version 1.6 */ public class Demo { /** * This character denotes so called "poison pill" for communicating to the * consumers that they should exit. */ private static final Character TERMINATION_SENTINEL = '\u2622'; /** * Implements a consumer thread. */ static class Consumer extends Thread { /** * The buffer to consume from. */ private final ConcurrentBuffer<Character> buffer; /** * Constructs this consumer thread. * * @param buffer the concurrent buffer to consume from. * @param id the ID of this consumer thread. */ Consumer(ConcurrentBuffer<Character> buffer, int id) { this.buffer = buffer; this.setName("Consumer " + id); } /** * The actual code of this consumer thread. */ @Override public void run() { for (;;) { final Character c = buffer.remove(); if (c.equals(TERMINATION_SENTINEL)) { // We have a poison pill. Put it back in the buffer and // terminate this thread. buffer.add(c); return; } } } } /** * This class implements producer threads. */ static class Producer extends Thread { /** * The concurrent set holding all active producers. */ private Set<Producer> activeProducers; /** * The actual concurrent buffer to produce to. */ private final ConcurrentBuffer<Character> buffer; /** * Constructs this producer thread. * * @param buffer the buffer for producing items. * @param id the ID of this producer thread. */ Producer(ConcurrentBuffer<Character> buffer, int id) { this.buffer = buffer; this.setName("Producer " + id); } /** * Sets the set of active producer threads. * * @param set a set of threads. */ void setProducerSet(Set<Producer> set) { activeProducers = set; } /** * The actual code for this producer thread. */ @Override public void run() { final Random rnd = new Random(); for (int i = 0; i < 50; ++i) { final Character c = (char)('A' + rnd.nextInt(26)); buffer.add(c); } activeProducers.remove(this); if (activeProducers.isEmpty()) { // The last thread terminates the consumers. buffer.add(TERMINATION_SENTINEL); } } } /** * Implements a concurrent buffer queue. * * @param <E> the actual type of elements. */ static class ConcurrentBuffer<E> { /** * The default capacity of this buffer. */ private static final int DEFAULT_CAPACITY = 20; /** * A binary semaphore (mutex) for synchronizing the access to internals * of this buffer. */ private final Semaphore mutex; /** * Guards against the empty buffer. */ private final Semaphore fillCount; /** * Guards against the full buffer. */ private final Semaphore emptyCount; /** * The actual storage array. */ private final Object[] storage; /** * The index of the head element. */ private int index; /** * The size of this buffer. */ private int size; /** * Constructs this buffer. */ ConcurrentBuffer() { this.mutex = new Semaphore(1); this.fillCount = new Semaphore(0); this.emptyCount = new Semaphore(DEFAULT_CAPACITY); this.storage = new Object[DEFAULT_CAPACITY]; } /** * Appends <code>element</code> to the tail of this buffer. If this * buffer is full, blocks the calling thread until some space becomes * available. * * @param element the element to append. */ void add(E element) { emptyCount.lock(); mutex.lock(); storage[(index + size) % storage.length] = element; ++size; System.out.println(Thread.currentThread().getName() + " produced " + element + ": " + this); mutex.unlock(); fillCount.unlock(); } /** * Removes the element at the head of this buffer. If this buffer is * empty, blocks the calling thread until some content appears in this * buffer. * * @return the element at the head of this buffer. */ E remove() { fillCount.lock(); mutex.lock(); final E ret = (E) storage[index % storage.length]; index = (index + 1) % storage.length; --size; System.out.println(Thread.currentThread().getName() + " consumed " + ret + ": " + this); mutex.unlock(); emptyCount.unlock(); return ret; } /** * Returns the string representation of the contents of this buffer. * This method is not synchronized. * * @return a string. */ @Override public String toString() { final StringBuilder sb = new StringBuilder("["); for (int i = index, j = 0; j < size; ++j, i = (i + 1) % storage.length) { sb.append(storage[i]); if (j < size - 1) { sb.append(", "); } } return sb.append("]").toString(); } } /** * Implements a demonstration. * * @param consumerAmount the amount of consumers to use. * @param producerAmount the amount of producers to use. */ public static void run(int consumerAmount, int producerAmount) { final Consumer[] consumers = new Consumer[consumerAmount]; final Producer[] producers = new Producer[producerAmount]; final ConcurrentBuffer<Character> buffer = new ConcurrentBuffer<>(); for (int i = 0; i < consumerAmount; ++i) { consumers[i] = new Consumer(buffer, i); } for (int i = 0; i < producerAmount; ++i) { producers[i] = new Producer(buffer, i); } final Set<Producer> producerSet = new HashSet<>(producerAmount); for (final Producer p : producers) { producerSet.add(p); } final Set<Producer> synchronizedSet = Collections.synchronizedSet(producerSet); for (final Producer p : producers) { p.setProducerSet(synchronizedSet); } for (final Producer p : producers) { p.start(); } for (final Consumer c : consumers) { c.start(); } } public static void main(String... args) { run(2, 3); } }
So, what do you think?