/* % scalac bounded-buffers.scala ; % scala BoundedBufferThreads; ^C to kill * The println and sleep statements are here only to help demo. * pmateti@wright.edu 2014 */ class BoundedBuffer(N: Int) { val buf = new Array[Int](N) // circular buf of N integers var in, out = 0 // indices of buf var n = 0 // #items in buf def put(x: Int) = synchronized { while (n >= N) { println("++ buf full") wait() } buf(in) = x ; in = (in + 1) % N ; n += 1 if (n == 1) notifyAll() } def get: Int = synchronized { while (n == 0) { println("-- buf empty") wait() } val x = buf(out) ; out = (out + 1) % N ; n -= 1 if (n == N - 1) notifyAll() x } } object BoundedBufferThreads { val buf = new BoundedBuffer(20) def sleepTime(x: Int) = if (x > 0) x % 500 else 100 def produceItem: Int = {val x = util.Random.nextInt(); Thread sleep sleepTime(x); x} def consumeItem(x: Int) = {Thread sleep sleepTime(x)} def newProducer = { new Thread { override def run() = { while (true) { val s = produceItem ; buf.put(s) println("+producer " + this + " " + buf.n + ": " + s) }}}} def newConsumer = { new Thread { override def run() = { while (true) { val s = buf.get; consumeItem(s) println("-consumer " + this + ": " + s) }}}} def main(args: Array[String]) { val ap = new Array[Thread](5) // producers array val ac = new Array[Thread](10) // consumers array for (i <- 0 to 4) {ap(i) = newProducer; ap(i).start} for (i <- 0 to 9) {ac(i) = newConsumer; ac(i).start} } } // -eof-