Producer Consumer Thread


DevX Home    Today's Headlines   Articles Archive   Tip Bank   Forums   

Results 1 to 7 of 7

Thread: Producer Consumer Thread

  1. #1
    Chirag Guest

    Producer Consumer Thread


    Hello all,

    i've got a problem can't resolve it, hope someone can help.
    In few words i have a shared buffer, where both producer and consumer, put
    and retreive items.
    now say i have 4 producer thread producing 5 items each, and 3 consumer taking
    those items out of the buffer.
    now the prob is i don't know how to tell the consumer to stop retrieving
    these items when all producer are done, it just waits forever.
    bare in mind that comsumer doesn't know how many item are produced by the
    procucer threads, the only thing they have in common, is the buffer.
    i have tried few methods( i.e check wehn the buffer is empty, but this is
    no working, coz it turns out to be true when a consumer takes the item as
    soon as it's been produce) or is there a way i can put some sort of flag
    in the buffer to say that the last has been retreived.

    hope it makes sense,i'm just too confuse myself,

    regards,
    Chirag

  2. #2
    Paul Clapham Guest

    Re: Producer Consumer Thread

    You would not put that flag in the buffer itself, you would put it in the
    object that contains the buffer as a separate variable. And you would set
    it to true to mean "There will never be any more data added to the buffer",
    right?

    PC2

    "Chirag" <chiragvaland@hotmail.com> wrote in message
    news:3cadb43f$1@10.1.10.29...
    >
    > Hello all,
    >
    > i've got a problem can't resolve it, hope someone can help.
    > In few words i have a shared buffer, where both producer and consumer, put
    > and retreive items.
    > now say i have 4 producer thread producing 5 items each, and 3 consumer

    taking
    > those items out of the buffer.
    > now the prob is i don't know how to tell the consumer to stop retrieving
    > these items when all producer are done, it just waits forever.
    > bare in mind that comsumer doesn't know how many item are produced by the
    > procucer threads, the only thing they have in common, is the buffer.
    > i have tried few methods( i.e check wehn the buffer is empty, but this is
    > no working, coz it turns out to be true when a consumer takes the item as
    > soon as it's been produce) or is there a way i can put some sort of flag
    > in the buffer to say that the last has been retreived.
    >
    > hope it makes sense,i'm just too confuse myself,
    >
    > regards,
    > Chirag




  3. #3
    Chirag Guest

    Re: Producer Consumer Thread


    Wouldn't this only work if u had 1 producer and 1 consumer? i tried to implement
    it the following way, i have a flag inside my producer thread which is set
    to true when it has finnish putting items in the buffer, and i also have
    a method that returns the status of this flag. Now on the cosumer side, i
    pass this flag as an argument to it's constuctor, also i loop to retrieve
    items until the flag is set to true. Now the main prog who runs these threads
    would ideally have a variable to store in it the status of the flag inside
    producer thread and pass that to the consumer as an argument( so eveytime
    it loops to retreive items until flag is set to true)
    well this is what i understood from your explanation. If u think it's wrong
    could you please let us know what is the right way of doing this,

    Regads,
    Chirag


    "Paul Clapham" <pclapham@core-mark.com> wrote:
    >You would not put that flag in the buffer itself, you would put it in the
    >object that contains the buffer as a separate variable. And you would set
    >it to true to mean "There will never be any more data added to the buffer",
    >right?
    >
    >PC2
    >
    >"Chirag" <chiragvaland@hotmail.com> wrote in message
    >news:3cadb43f$1@10.1.10.29...
    >>
    >> Hello all,
    >>
    >> i've got a problem can't resolve it, hope someone can help.
    >> In few words i have a shared buffer, where both producer and consumer,

    put
    >> and retreive items.
    >> now say i have 4 producer thread producing 5 items each, and 3 consumer

    >taking
    >> those items out of the buffer.
    >> now the prob is i don't know how to tell the consumer to stop retrieving
    >> these items when all producer are done, it just waits forever.
    >> bare in mind that comsumer doesn't know how many item are produced by

    the
    >> procucer threads, the only thing they have in common, is the buffer.
    >> i have tried few methods( i.e check wehn the buffer is empty, but this

    is
    >> no working, coz it turns out to be true when a consumer takes the item

    as
    >> soon as it's been produce) or is there a way i can put some sort of flag
    >> in the buffer to say that the last has been retreived.
    >>
    >> hope it makes sense,i'm just too confuse myself,
    >>
    >> regards,
    >> Chirag

    >
    >



  4. #4
    Paul Clapham Guest

    Re: Producer Consumer Thread

    No, you have to have a **single** flag that is set when **all** producers
    have finished. Not one flag for each producer. If the producers are
    independent then I don't know how you could have them set such a flag. How
    do you know when they have all finished?

    PC2

    "Chirag" <chiragvaland@hotmail.com> wrote in message
    news:3cadfeda$1@10.1.10.29...
    >
    > Wouldn't this only work if u had 1 producer and 1 consumer? i tried to

    implement
    > it the following way, i have a flag inside my producer thread which is set
    > to true when it has finnish putting items in the buffer, and i also have
    > a method that returns the status of this flag. Now on the cosumer side, i
    > pass this flag as an argument to it's constuctor, also i loop to retrieve
    > items until the flag is set to true. Now the main prog who runs these

    threads
    > would ideally have a variable to store in it the status of the flag inside
    > producer thread and pass that to the consumer as an argument( so eveytime
    > it loops to retreive items until flag is set to true)
    > well this is what i understood from your explanation. If u think it's

    wrong
    > could you please let us know what is the right way of doing this,
    >
    > Regads,
    > Chirag





  5. #5
    Kent Guest

    Re: Producer Consumer Thread


    One way to solve this is to ensure the object containing the buffer (BufferObject)
    also has a reference to ALL producer threads. The producer threads will need
    a method to indicate whether they will produce anymore. The BufferObject
    can then contain a method which checks whether all producer threads have
    stopped producing.

    The question is how do you ensure that every producer must register itself
    with the BufferObject? One way is to put a method in BufferObject:

    Buffer requestBuffer(Producer who)

    which will return the buffer if the producer is valid otherwise null. If
    the producer needs to be added to the internal set of producers it can be.

    Hope this helps,
    Kent

    "Paul Clapham" <pclapham@core-mark.com> wrote:
    >No, you have to have a **single** flag that is set when **all** producers
    >have finished. Not one flag for each producer. If the producers are
    >independent then I don't know how you could have them set such a flag.

    How
    >do you know when they have all finished?
    >
    >PC2
    >
    >"Chirag" <chiragvaland@hotmail.com> wrote in message
    >news:3cadfeda$1@10.1.10.29...
    >>
    >> Wouldn't this only work if u had 1 producer and 1 consumer? i tried to

    >implement
    >> it the following way, i have a flag inside my producer thread which is

    set
    >> to true when it has finnish putting items in the buffer, and i also have
    >> a method that returns the status of this flag. Now on the cosumer side,

    i
    >> pass this flag as an argument to it's constuctor, also i loop to retrieve
    >> items until the flag is set to true. Now the main prog who runs these

    >threads
    >> would ideally have a variable to store in it the status of the flag inside
    >> producer thread and pass that to the consumer as an argument( so eveytime
    >> it loops to retreive items until flag is set to true)
    >> well this is what i understood from your explanation. If u think it's

    >wrong
    >> could you please let us know what is the right way of doing this,
    >>
    >> Regads,
    >> Chirag

    >
    >
    >



  6. #6
    Paul Clapham Guest

    Re: Producer Consumer Thread

    Another way would be to have the producer thread increment a counter in the
    BufferObject when they register, and to decrement that counter when they
    stop (so they must de-register too). Then when that counter becomes zero,
    there are no more producers producing.

    PC2

    "Kent" <kb@essential.com.au> wrote in message news:3cb256ae$1@10.1.10.29...
    >
    > One way to solve this is to ensure the object containing the buffer

    (BufferObject)
    > also has a reference to ALL producer threads. The producer threads will

    need
    > a method to indicate whether they will produce anymore. The BufferObject
    > can then contain a method which checks whether all producer threads have
    > stopped producing.
    >
    > The question is how do you ensure that every producer must register itself
    > with the BufferObject? One way is to put a method in BufferObject:
    >
    > Buffer requestBuffer(Producer who)
    >
    > which will return the buffer if the producer is valid otherwise null. If
    > the producer needs to be added to the internal set of producers it can be.
    >
    > Hope this helps,
    > Kent





  7. #7
    Chirag Guest

    Re: Producer Consumer Thread


    Dear friends,

    Thanks for sharing your thoughts, i have ried to implement ur ideas(both
    combined) i think i have started to get eh logic right but still have difficulties,
    the problem occuring now is that the consumer is only retrieving one item
    from the buffer and then stops. I think i'll be better of giving you the
    listing of my program because it's becomming a bit hard to explain what is
    happening here!!!

    Best Regards,
    Chirag

    public interface Counter
    {
    public final int INIT_VALUE = 0; // final constant variables
    public final int FINAL_VALUE = 5;
    public int counter = 0;

    public void set (int value);
    public int value ();
    public int getFirstValue ();
    public int getLastValue ();
    public void next ();

    }
    ///////////////////////////////////////////////////////////////////
    public class ModuloCounter implements Counter
    {
    private int firstValue;
    private int lastValue;
    private int counter;


    ModuloCounter ()
    {
    this ( INIT_VALUE, FINAL_VALUE );
    }

    ModuloCounter ( int fValue)
    {
    this ( INIT_VALUE ,fValue );
    }

    ModuloCounter ( int fValue, int lValue )
    {
    firstValue = fValue;
    lastValue = lValue;
    }

    public void set ( int value )
    {
    if(value >= firstValue && value < lastValue)
    {
    System.out.println ( "\nCounter set to " + value + "\n" );
    counter = value;
    }
    else
    System.out.println ( "\nCounter set to " + value + " Error!! Out of
    Range\n" );
    }

    public void next ()
    {
    ++counter;
    counter = counter % lastValue;
    }

    public int value ()
    {
    return counter;
    }

    public int getFirstValue ()
    {
    return firstValue;
    }

    public int getLastValue ()
    {
    return lastValue;
    }
    }
    //////////////////////////////////////////////////////////////////////
    public interface Buffer
    {
    public final int BUFFER_SIZE = 8;

    public void put( Object itemBuffered );
    public Object get();
    public boolean isEmpty ();
    public boolean isFull();

    }
    //////////////////////////////////////////////////////////////////////
    public class CircularBuffer implements Buffer
    {
    private ModuloCounter in;
    private ModuloCounter out;

    private int counter;
    public Object circularBuffer[];

    private int bufferSize ;
    private Object sharedObject = null;


    public CircularBuffer()
    {
    this( BUFFER_SIZE ) ;
    }

    public CircularBuffer( int b )
    {
    counter = 0;
    bufferSize = b ;
    in = new ModuloCounter( bufferSize );
    out = new ModuloCounter( bufferSize ) ;
    circularBuffer = new Object[ bufferSize ];

    //initialise buffer to +
    for (int i = 0; i < bufferSize ; i++)
    {
    circularBuffer[i] = null;
    }
    }

    public boolean isFull()
    {
    if ( counter == bufferSize )
    return true;
    else
    return false;
    }

    public boolean isEmpty()
    {
    if ( counter == 0 )
    return true;
    else
    return false;
    }

    public void put (Object itemBuffer)
    {
    if ( !isFull() )
    {
    sharedObject = itemBuffer;
    circularBuffer[in.value()] = itemBuffer;
    in.next();
    counter++;
    }
    }

    public Object get()
    {
    if (!isEmpty())
    {
    circularBuffer[out.value()] = null;
    out.next();
    counter--;
    }
    return sharedObject;
    }

    public int getBufferSize()
    {
    return bufferSize;
    }


    public String toString()
    {
    String output = "";
    String bufferContent = "";

    for (int i = 0; i < bufferSize; i++ )
    {
    if (circularBuffer[ i ] == null)
    {
    bufferContent += " _";
    }
    else
    {
    bufferContent += " *";
    }
    }
    output += "[COUNT = " + counter + " | IN = " + (in.value()+1)
    + " | OUT = " + (out.value()+1) + " | <" + bufferContent + " >] ";

    return output;
    }

    }
    //////////////////////////////////////////////////////////////////////
    public class SharedBuffer extends CircularBuffer
    {
    private Object sharedObject = null;

    private boolean writeable = true;
    private boolean readable = false;
    private int pCounter = 0;
    private int pItems = 0;
    private boolean StopProducing;
    public SharedBuffer(int b)
    {
    super(b);
    }

    // put an item in the buffer
    public synchronized void put (Object itemBuffer, int i, boolean sp)
    {
    while ( !writeable ) {
    try {
    System.out.println(" Waiting to Produce ");
    wait();
    }
    catch ( InterruptedException e ) {
    System.out.println( e.toString() );
    }
    }



    sharedObject = itemBuffer;
    pItems = i;
    StopProducing = sp;
    super.put (sharedObject);
    pCounter++;
    readable = true;

    System.out.println("SharedBuffer.Put: " + super.toString() + "\n");

    if ( isFull() )
    {
    writeable = false;
    System.out.println( "Buffer Full" );
    }

    notifyAll();
    }

    // remove an item from buffer
    public synchronized Object get()
    {

    while ( !readable ) {
    try {
    System.out.println( "Waiting to consume" );
    wait();
    }
    catch ( InterruptedException e ) {
    System.err.println(e.toString() );
    }
    }

    writeable = true;

    super.get();

    if(StopProducing)
    {
    pCounter--;
    }
    else
    {
    pCounter = pCounter - pItems;
    }


    System.out.println("SharedBuffer.get: " + super.toString());

    if ( isEmpty() )
    {
    readable = false;
    System.out.println( "Buffer empty" );
    }

    notifyAll();
    return sharedObject;
    }

    public int getPCounter()
    {
    return pCounter;
    }

    }
    ////////////////////////////////////////////////////////////////////////
    import java.util.Calendar;

    public class TimeStampServer extends Thread
    {
    // Create a TimeStamp

    Calendar now = Calendar.getInstance();

    public void run()
    {
    setDaemon( true );
    while (true) {}

    }

    public String getTime()
    {
    String timestamp = new String( now.get(Calendar.HOUR_OF_DAY) + ":" +
    now.get(Calendar.MINUTE) + ":" +
    now.get(Calendar.SECOND) + ":" +
    now.get(Calendar.MILLISECOND));

    return timestamp;
    }
    }
    /////////////////////////////////////////////////////////////////////
    public class ThreadDescriptor
    {
    private String id;
    private String group;
    private int pri;
    private String time;

    public ThreadDescriptor(Thread td, String t, ThreadGroup grp)
    {
    id = td.getName();
    group = grp.getName();
    pri = grp.getMaxPriority();
    time = t;
    }

    public String toString()
    {
    String output = "";

    output += "[ID = " + id + " | GRP = " + group + " | PRI = " + pri
    + " | TIME = " + time + "]";

    return output;
    }
    }
    ///////////////////////////////////////////////////////////////////////
    public class Producer extends Thread
    {
    private SharedBuffer pSharedBuffer;
    private int pItems;
    private TimeStampServer pTimeStampServer;
    private int seconds;
    private ThreadGroup pThreadGroup;
    private boolean StopProducing = false;


    public Producer(String name, SharedBuffer sb, int items, TimeStampServer
    tss, int sec, ThreadGroup tg)
    {
    super(name);
    pSharedBuffer = sb;
    pItems = items;
    pTimeStampServer = tss;
    seconds = sec;
    pThreadGroup = tg;
    }

    // activate the thread
    public void run()
    {
    System.out.println(getName() + " Thread Started\n" );

    for (int i = 0; i < pItems; i++ )
    {
    try {
    Thread.sleep( (int) ( Math.random()* (seconds * 1000) ) );
    }
    catch ( InterruptedException e ) {
    System.err.println( e.toString() );
    }

    ThreadDescriptor td = new ThreadDescriptor( this, pTimeStampServer.getTime(),
    pThreadGroup );
    System.out.println(getName()+ ".put: " + td.toString());
    pSharedBuffer.put(td, pItems, getStopProducing());


    }
    System.out.println( getName() + " produced " + pItems + " items \n");

    StopProducing = true;
    }

    public boolean getStopProducing()
    {
    return StopProducing;
    }

    }
    /////////////////////////////////////////////////////////////////////
    class Consumer extends Thread
    {

    private SharedBuffer cSharedBuffer;
    private ThreadGroup cThreadGroup;
    private int cItems = 0;

    public Consumer(String str, SharedBuffer sb, ThreadGroup tg )
    {
    super(str);
    cSharedBuffer = sb;
    cThreadGroup = tg;
    }

    public void run()
    {
    System.out.println(getName() + " Thread Started\n" );

    while (cSharedBuffer.getPCounter() == 0)
    {
    try {
    Thread.sleep( (int) ( Math.random()* 5000 ) );
    }
    catch ( InterruptedException e ) {
    System.err.println( e.toString() );
    }

    System.out.println(getName()+ ".get: " + cSharedBuffer.get() + "\n");
    yield();
    cItems++;
    }

    System.out.println( getName() + " retrieved " + cItems + " items \n");
    }
    }
    //////////////////////////////////////////////////////////////////////
    public class ThreadManager
    {
    public static void main(String args[])
    {

    SharedBuffer sb = new SharedBuffer(5);

    TimeStampServer tss = new TimeStampServer();

    ThreadGroup groupProd1 = new ThreadGroup("ProdHigh");
    ThreadGroup groupProd2 = new ThreadGroup("ProdLow");

    groupProd1.setMaxPriority(8);
    groupProd2.setMaxPriority(3);


    Producer prod1 = new Producer( "Prod1", sb, 5, tss, 3, groupProd1 );
    Producer prod2 = new Producer( "Prod2", sb, 5, tss, 2, groupProd1 );
    Producer prod3 = new Producer( "Prod3", sb, 5, tss, 3, groupProd2 );
    Producer prod4 = new Producer( "Prod4", sb, 5, tss, 1, groupProd2 );

    ThreadGroup groupCons = new ThreadGroup("Consumer");

    Consumer cons1 = new Consumer( "Consumer1", sb, groupCons );
    Consumer cons2 = new Consumer( "Consumer2", sb, groupCons );
    Consumer cons3 = new Consumer( "Consumer3", sb, groupCons );

    prod1.start();
    prod2.start();
    prod3.start();
    prod4.start();

    cons1.start();
    cons2.start();
    cons3.start();

    }
    }
    /////////////////////////////////////////////////////////////////////
    Hope you can all read this if anything let me know where are lost and might
    be able to tell you a bit more about it....

    "Paul Clapham" <pclapham@core-mark.com> wrote:
    >Another way would be to have the producer thread increment a counter in

    the
    >BufferObject when they register, and to decrement that counter when they
    >stop (so they must de-register too). Then when that counter becomes zero,
    >there are no more producers producing.
    >
    >PC2
    >
    >"Kent" <kb@essential.com.au> wrote in message news:3cb256ae$1@10.1.10.29...
    >>
    >> One way to solve this is to ensure the object containing the buffer

    >(BufferObject)
    >> also has a reference to ALL producer threads. The producer threads will

    >need
    >> a method to indicate whether they will produce anymore. The BufferObject
    >> can then contain a method which checks whether all producer threads have
    >> stopped producing.
    >>
    >> The question is how do you ensure that every producer must register itself
    >> with the BufferObject? One way is to put a method in BufferObject:
    >>
    >> Buffer requestBuffer(Producer who)
    >>
    >> which will return the buffer if the producer is valid otherwise null.

    If
    >> the producer needs to be added to the internal set of producers it can

    be.
    >>
    >> Hope this helps,
    >> Kent

    >
    >
    >



Posting Permissions

  • You may not post new threads
  • You may not post replies
  • You may not post attachments
  • You may not edit your posts
  •  
HTML5 Development Center
 
 
FAQ
Latest Articles
Java
.NET
XML
Database
Enterprise
Questions? Contact us.
C++
Web Development
Wireless
Latest Tips
Open Source


   Development Centers

   -- Android Development Center
   -- Cloud Development Project Center
   -- HTML5 Development Center
   -- Windows Mobile Development Center