Semaphore?

5 posts / 0 new
Last post
TheVinn
Offline
Last seen: 2 weeks 5 days ago
Joined: 29 Aug 2009 - 11:31
Semaphore?

No Semaphore in Juce?

How does juce::ThreadPool operate correctly without a proper semaphore?

What happens if two threads call WaitableEvent::wait() at the same time on the same object?

Oh...this is disappointing:

juce_ThreadPool.cpp line 72

    void run()
    {
        while (! threadShouldExit())
        {
            if (! pool.runNextJob())
                wait (500);
        }
    }
jules
Online
Last seen: 2 sec ago
Joined: 29 Apr 2013 - 18:37
Re: Semaphore?

No, there's no semaphore.

But that code will work perfectly well with negligible overhead. When multiple threads wait on a WaitableEvent, they all get woken up when it fires, and one of them will be first to grab the next job from the queue. Using a semaphore would be conceptually slightly neater, but would make no practical difference to anyone using the class.

TheVinn
Offline
Last seen: 2 weeks 5 days ago
Joined: 29 Aug 2009 - 11:31
Re: Semaphore?

1) It turns out that I dont need a semaphore, because....

2) ThreadPool is not exactly the right structure I'm looking for. I need a pool of threads, yes, but when I execute a "Job" I actually need the functor to run on all the threads at the same time, not just one.

3) I cobbled together this Semaphore class with some help from #algorithms (Freenode). Not sure if it works though (but it looks right)

Semaphore.h

/**
  @ingroup vf_core

  @brief A semaphore.

  This provides a traditional semaphore synchronization primitive. There is no
  upper limit on the number of signals.

  @note There is no tryWait() or timeout facility for acquiring a resource.
*/
class Semaphore
{
public:
  /** Create a semaphore with the specified number of resources.

      @param initialCount The starting number of resources.
  */
  explicit Semaphore (int initialCount);

  ~Semaphore ();

  /** Increase the number of available resources.

      @param amount The number of new resources available.
  */
  void signal (int amount = 1);

  /** Wait for a resource.
  */
  void wait ();

private:
  class WaitingThread
    : public LockFreeStack <WaitingThread>::Node
    , LeakChecked <WaitingThread>
  {
  public:
    WaitingThread ();

    WaitableEvent m_event;
  };

  typedef SpinLock LockType;

  LockType m_mutex;
  Atomic <int> m_counter;
  LockFreeStack <WaitingThread> m_waitingThreads;
  LockFreeStack <WaitingThread> m_deleteList;
};

Semaphore.cpp

Semaphore::WaitingThread::WaitingThread ()
  : m_event (false) // auto-reset
{
}

//==============================================================================

Semaphore::Semaphore (int initialCount)
  : m_counter (initialCount)
{
}

Semaphore::~Semaphore ()
{
  // Can't delete the semaphore while threads are waiting on it!!
  jassert (m_waitingThreads.pop_front () == nullptr);

  for (;;)
  {
    WaitingThread* waitingThread = m_deleteList.pop_front ();

    if (waitingThread != nullptr)
      delete waitingThread;
    else
      break;
  }
}

void Semaphore::signal (int amount)
{
  jassert (amount > 0);

  while (amount--)
  {
    // Make counter and list operations atomic.
    LockType::ScopedLockType lock (m_mutex);

    if (++m_counter <= 0)
    {
      WaitingThread* waitingThread = m_waitingThreads.pop_front ();

      jassert (waitingThread != nullptr);

      waitingThread->m_event.signal ();
    }
  }
}

void Semaphore::wait ()
{
  // Always prepare the WaitingThread object first, either
  // from the delete list or through a new allocation.
  //
  WaitingThread* waitingThread = m_deleteList.pop_front ();
  if (waitingThread == nullptr)
    waitingThread = new WaitingThread;

  {
    // Make counter and list operations atomic.
    LockType::ScopedLockType lock (m_mutex);

    if (--m_counter >= 0)
    {
      // Acquired the resource so put waitingThread back.
      m_deleteList.push_front (waitingThread);

      waitingThread = nullptr;
    }
    else
    {
      // Out of resources, go on to the waiting list.
      m_waitingThreads.push_front (waitingThread);
    }
  }

  // Do we need to wait?
  if (waitingThread != nullptr)
  {
    // Yes so do it.
    waitingThread->m_event.wait ();

    // If the wait is satisfied, then we've been taken off the
    // waiting list so put waitingThread back in the delete list.
    //
    m_deleteList.push_front (waitingThread);
  }
}

To make this Semaphore implementation AudioIOCallback-friendly, it needs to keep the WaitingThread objects on a list so it can recycle them instead of allocating/deleting all the time.

TheVinn
Offline
Last seen: 2 weeks 5 days ago
Joined: 29 Aug 2009 - 11:31
Re: Semaphore?

jules wrote:
But that code will work perfectly well with negligible overhead.

Ah, yes, I see what you mean. To be honest, I thought wait() was sleep(), which wouldn't be so great!

TheVinn
Offline
Last seen: 2 weeks 5 days ago
Joined: 29 Aug 2009 - 11:31
Re: Semaphore?

As was to be expected when writing concurrent code, the Semaphore implementation I posted had numerous subtle bugs. After wasting the better part of a day I think I got all the kinks worked out - I updated the posted code (and its in VFLib too).