/*
 % scalac bounded-buffers.scala ; % scala BoundedBufferThreads; ^C to kill
 * The println and sleep statements are here only to help demo.
 * pmateti@wright.edu 2014 */

class BoundedBuffer(N: Int) {
  val buf = new Array[Int](N)           // circular buf of N integers
  var in, out = 0                       // indices of buf
  var n = 0                             // #items in buf

  def put(x: Int) = synchronized {
    while (n >= N) {
      println("++ buf full")
      wait()
    }
    buf(in) = x ; in = (in + 1) % N ; n += 1
    if (n == 1) notifyAll()
  }

  def get: Int = synchronized {
    while (n == 0) {
      println("-- buf empty")
      wait()
    }
    val x = buf(out) ; out = (out + 1) % N ; n -= 1
    if (n == N - 1) notifyAll()
    x
  }
}

object BoundedBufferThreads {
  val buf = new BoundedBuffer(20)

  def sleepTime(x: Int) = if (x > 0) x % 500 else 100

  def produceItem: Int =
    {val x = util.Random.nextInt(); Thread sleep sleepTime(x); x}
  def consumeItem(x: Int) = {Thread sleep sleepTime(x)}

  def newProducer = {
    new Thread { 
      override 
      def run() = {
        while (true) {
          val s = produceItem ; buf.put(s)
          println("+producer " + this + " " + buf.n + ": " + s)
        }}}}

  def newConsumer = {
    new Thread {
      override 
      def run() = {
        while (true) {
          val s = buf.get; consumeItem(s)
          println("-consumer " + this + ": " + s)
        }}}}

  def main(args: Array[String]) {
    val ap = new Array[Thread](5)       // producers array
    val ac = new Array[Thread](10)      // consumers array

    for (i <- 0 to 4) {ap(i) = newProducer; ap(i).start}
    for (i <- 0 to 9) {ac(i) = newConsumer; ac(i).start}
  }
}

// -eof-